From 1a7a0e74c053c335735f7244ded30aed56c930c5 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Wed, 4 Sep 2019 12:47:01 +0200 Subject: [PATCH] src/client/http_client.rs: use async for more functions --- src/client/http_client.rs | 203 +++++++++++++++++--------------------- 1 file changed, 92 insertions(+), 111 deletions(-) diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 60faa69e..5478e69f 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -143,7 +143,7 @@ impl HttpClient { }) } - /// Login future + /// Login /// /// Login is done on demand, so this is onyl required if you need /// access to authentication data in 'AuthInfo'. @@ -188,97 +188,85 @@ impl HttpClient { .build::<_, Body>(https) } - pub fn request(&self, mut req: Request) -> impl Future> { - - let login = self.auth.listen(); + pub async fn request(&self, mut req: Request) -> Result { let client = self.client.clone(); - login.and_then(move |auth| { + let auth = self.login().await?; - let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); - req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); - req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap()); + let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); + req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); + req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap()); - let request = Self::api_request(client, req); - - request - }) + Self::api_request(client, req).await } - pub fn get( + pub async fn get( &self, path: &str, data: Option, - ) -> impl Future> { + ) -> Result { let req = Self::request_builder(&self.server, "GET", path, data).unwrap(); - self.request(req) + self.request(req).await } - pub fn delete( + pub async fn delete( &mut self, path: &str, data: Option, - ) -> impl Future> { + ) -> Result { let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap(); - self.request(req) + self.request(req).await } - pub fn post( + pub async fn post( &mut self, path: &str, data: Option, - ) -> impl Future> { + ) -> Result { let req = Self::request_builder(&self.server, "POST", path, data).unwrap(); - self.request(req) + self.request(req).await } - pub fn download( + pub async fn download( &mut self, path: &str, - output: W, - ) -> impl Future> { + output: &mut (dyn Write + Send), + ) -> Result<(), Error> { let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap(); - let login = self.auth.listen(); - let client = self.client.clone(); - login.and_then(move |auth| { + let auth = self.login().await?; - let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); - req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); + let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); + req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); - client.request(req) + let resp = client.request(req).await?; + let status = resp.status(); + if !status.is_success() { + HttpClient::api_response(resp) + .map(|_| Err(format_err!("unknown error"))) + .await? + } else { + resp.into_body() .map_err(Error::from) - .and_then(|resp| { - let status = resp.status(); - if !status.is_success() { - future::Either::Left( - HttpClient::api_response(resp) - .map(|_| Err(format_err!("unknown error"))) - ) - } else { - future::Either::Right( - resp.into_body() - .map_err(Error::from) - .try_fold(output, move |mut acc, chunk| async move { - acc.write_all(&chunk)?; - Ok::<_, Error>(acc) - }) - ) - } + .try_fold(output, move |acc, chunk| async move { + acc.write_all(&chunk)?; + Ok::<_, Error>(acc) }) - }) + .await?; + } + Ok(()) } - pub fn upload( + pub async fn upload( &mut self, content_type: &str, body: Body, path: &str, data: Option, - ) -> impl Future> { + ) -> Result { let path = path.trim_matches('/'); let mut url = format!("https://{}:8007/{}", &self.server, path); @@ -298,17 +286,17 @@ impl HttpClient { .header("Content-Type", content_type) .body(body).unwrap(); - self.request(req) + self.request(req).await } - pub fn start_backup( + pub async fn start_backup( &self, datastore: &str, backup_type: &str, backup_id: &str, backup_time: DateTime, debug: bool, - ) -> impl Future, Error>> { + ) -> Result, Error> { let param = json!({ "backup-type": backup_type, @@ -320,18 +308,19 @@ impl HttpClient { let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap(); - self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())) - .map_ok(|(h2, canceller)| BackupClient::new(h2, canceller)) + let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?; + + Ok(BackupClient::new(h2, canceller)) } - pub fn start_backup_reader( + pub async fn start_backup_reader( &self, datastore: &str, backup_type: &str, backup_id: &str, backup_time: DateTime, debug: bool, - ) -> impl Future, Error>> { + ) -> Result, Error> { let param = json!({ "backup-type": backup_type, @@ -342,72 +331,63 @@ impl HttpClient { }); let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap(); - self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) - .map_ok(|(h2, canceller)| BackupReader::new(h2, canceller)) + let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?; + + Ok(BackupReader::new(h2, canceller)) } - pub fn start_h2_connection( + pub async fn start_h2_connection( &self, mut req: Request, protocol_name: String, - ) -> impl Future> { + ) -> Result<(H2Client, Canceller), Error> { - let login = self.auth.listen(); + let auth = self.login().await?; let client = self.client.clone(); - login.and_then(move |auth| { + let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); + req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); + req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap()); - let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); - req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); - req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap()); + let resp = client.request(req).await?; + let status = resp.status(); - client.request(req) - .map_err(Error::from) - .and_then(|resp| { + if status != http::StatusCode::SWITCHING_PROTOCOLS { + Self::api_response(resp) + .map(|_| Err(format_err!("unknown error"))) + .await?; + unreachable!(); + } - let status = resp.status(); - if status != http::StatusCode::SWITCHING_PROTOCOLS { - future::Either::Left( - Self::api_response(resp) - .map(|_| Err(format_err!("unknown error"))) - ) - } else { - future::Either::Right( - resp - .into_body() - .on_upgrade() - .map_err(Error::from) - ) - } - }) - .and_then(|upgraded| { - let max_window_size = (1 << 31) - 2; + let upgraded = resp + .into_body() + .on_upgrade() + .await?; - h2::client::Builder::new() - .initial_connection_window_size(max_window_size) - .initial_window_size(max_window_size) - .max_frame_size(4*1024*1024) - .handshake(upgraded) - .map_err(Error::from) - }) - .and_then(|(h2, connection)| async move { - let connection = connection - .map_err(|_| panic!("HTTP/2.0 connection failed")); + let max_window_size = (1 << 31) - 2; - let (connection, canceller) = cancellable(connection)?; - // A cancellable future returns an Option which is None when cancelled and - // Some when it finished instead, since we don't care about the return type we - // need to map it away: - let connection = connection.map(|_| ()); + let (h2, connection) = h2::client::Builder::new() + .initial_connection_window_size(max_window_size) + .initial_window_size(max_window_size) + .max_frame_size(4*1024*1024) + .handshake(upgraded) + .await?; - // Spawn a new task to drive the connection state - hyper::rt::spawn(connection); + let connection = connection + .map_err(|_| panic!("HTTP/2.0 connection failed")); - // Wait until the `SendRequest` handle has available capacity. - let c = h2.ready().await?; - Ok((H2Client::new(c), canceller)) - }.boxed()) - }) + let (connection, canceller) = cancellable(connection)?; + // A cancellable future returns an Option which is None when cancelled and + // Some when it finished instead, since we don't care about the return type we + // need to map it away: + let connection = connection.map(|_| ()); + + // Spawn a new task to drive the connection state + hyper::rt::spawn(connection); + + // Wait until the `SendRequest` handle has available capacity. + let c = h2.ready().await?; + Ok((H2Client::new(c), canceller)) } async fn credentials( @@ -450,14 +430,15 @@ impl HttpClient { } } - fn api_request( + async fn api_request( client: Client, req: Request - ) -> impl Future> { + ) -> Result { client.request(req) .map_err(Error::from) .and_then(Self::api_response) + .await } pub fn request_builder(server: &str, method: &str, path: &str, data: Option) -> Result, Error> { @@ -953,7 +934,7 @@ impl BackupClient { let csum = guard.as_mut().unwrap(); let chunk_end = offset + chunk_len as u64; - + csum.update(&chunk_end.to_le_bytes()); csum.update(digest);