From cf639a47de68e1f8c7748f621e870ee7733f822d Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Mon, 29 Apr 2019 11:57:58 +0200 Subject: [PATCH] rc/client/http_client.rs: add experimental h2 upgrade code --- Cargo.toml | 1 + src/api2/admin/datastore/h2upload.rs | 1 + src/client/http_client.rs | 44 ++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index cdc352f2..a71aab28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ serde_json = "1.0" serde_derive = "1.0" url = "1.7" futures = "0.1" +bytes = "0.4" tokio-threadpool = "0.1" tokio = "0.1" tokio-fs = "0.1" diff --git a/src/api2/admin/datastore/h2upload.rs b/src/api2/admin/datastore/h2upload.rs index dc7d9511..efdf0201 100644 --- a/src/api2/admin/datastore/h2upload.rs +++ b/src/api2/admin/datastore/h2upload.rs @@ -47,6 +47,7 @@ fn upgrade_h2upload( println!("upgrade done"); server::handshake(conn) .and_then(|h2| { + println!("Accept h2"); // Accept all inbound HTTP/2.0 streams sent over the // connection. h2.for_each(|(request, mut respond)| { diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 17ad0017..e24fd82b 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -232,6 +232,50 @@ impl HttpClient { self.request(req) } + pub fn h2upgrade(&mut self, path: &str) -> impl Future, Error=Error> { + + let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap(); + + let login = self.auth.listen(); + + let client = self.client.clone(); + + login.and_then(move |auth| { + + let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); + req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); + req.headers_mut().insert("UPGRADE", HeaderValue::from_str("proxmox-backup-protocol-h2").unwrap()); + + client.request(req) + .map_err(Error::from) + .and_then(|resp| { + + let status = resp.status(); + if status != http::StatusCode::SWITCHING_PROTOCOLS { + bail!("h2upgrade failed with status {:?}", status); + } + + Ok(resp.into_body().on_upgrade().map_err(Error::from)) + }) + .flatten() + .and_then(|upgraded| { + println!("upgraded"); + + h2::client::handshake(upgraded).map_err(Error::from) + }) + .and_then(|(h2, connection)| { + let connection = connection + .map_err(|_| panic!("HTTP/2.0 connection failed")); + + // Spawn a new task to drive the connection state + hyper::rt::spawn(connection); + + // Wait until the `SendRequest` handle has available capacity. + h2.ready().map_err(Error::from) + }) + }) + } + fn credentials( client: Client>, server: &str,