From 2a1e6d7dea15b3c4bb40dbbe6185d26eb7140093 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Wed, 4 Sep 2019 14:52:19 +0200 Subject: [PATCH] src/client/http_client.rs - BackupClient: use async --- src/client/http_client.rs | 131 +++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 64 deletions(-) diff --git a/src/client/http_client.rs b/src/client/http_client.rs index fd50ef01..b76939be 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -575,36 +575,38 @@ impl BackupClient { Arc::new(Self { h2, canceller }) } - pub fn get( + pub async fn get( &self, path: &str, param: Option, - ) -> impl Future> { - self.h2.get(path, param) + ) -> Result { + self.h2.get(path, param).await } - pub fn put( + pub async fn put( &self, path: &str, param: Option, - ) -> impl Future> { - self.h2.put(path, param) + ) -> Result { + self.h2.put(path, param).await } - pub fn post( + pub async fn post( &self, path: &str, param: Option, - ) -> impl Future> { - self.h2.post(path, param) + ) -> Result { + self.h2.post(path, param).await } - pub fn finish(self: Arc) -> impl Future> { - self.h2.clone() - .post("finish", None) + pub async fn finish(self: Arc) -> Result<(), Error> { + let h2 = self.h2.clone(); + + h2.post("finish", None) .map_ok(move |_| { self.canceller.cancel(); }) + .await } pub fn force_close(self) { @@ -1090,88 +1092,88 @@ impl H2Client { Self { h2 } } - pub fn get(&self, path: &str, param: Option) -> impl Future> { + pub async fn get( + &self, + path: &str, + param: Option + ) -> Result { let req = Self::request_builder("localhost", "GET", path, param).unwrap(); - self.request(req) + self.request(req).await } - pub fn put(&self, path: &str, param: Option) -> impl Future> { + pub async fn put( + &self, + path: &str, + param: Option + ) -> Result { let req = Self::request_builder("localhost", "PUT", path, param).unwrap(); - self.request(req) + self.request(req).await } - pub fn post(&self, path: &str, param: Option) -> impl Future> { + pub async fn post( + &self, + path: &str, + param: Option + ) -> Result { let req = Self::request_builder("localhost", "POST", path, param).unwrap(); - self.request(req) + self.request(req).await } pub async fn download( &self, path: &str, param: Option, - output: W, + mut output: W, ) -> Result { let request = Self::request_builder("localhost", "GET", path, param).unwrap(); - self.send_request(request, None) - .and_then(move |response| { - response - .map_err(Error::from) - .and_then(move |resp| { - let status = resp.status(); - if !status.is_success() { - future::Either::Left( - H2Client::h2api_response(resp) - .map(|_| Err(format_err!("unknown error"))) - ) - } else { - let mut body = resp.into_body(); - let release_capacity = body.release_capacity().clone(); + let response_future = self.send_request(request, None).await?; - future::Either::Right( - body - .map_err(Error::from) - .try_fold(output, move |mut acc, chunk| { - let mut release_capacity = release_capacity.clone(); - async move { - let _ = release_capacity.release_capacity(chunk.len()); - acc.write_all(&chunk)?; - Ok::<_, Error>(acc) - } - }) - ) - } - }) - }) - .await + let resp = response_future.await?; + + let status = resp.status(); + if !status.is_success() { + H2Client::h2api_response(resp) + .map(|_| Err(format_err!("unknown error"))) + .await?; + unreachable!(); + } + + let mut body = resp.into_body(); + let mut release_capacity = body.release_capacity().clone(); + + while let Some(chunk) = body.try_next().await? { + let _ = release_capacity.release_capacity(chunk.len()); + output.write_all(&chunk)?; + } + + Ok(output) } - pub fn upload( + pub async fn upload( &self, path: &str, param: Option, data: Vec, - ) -> impl Future> { + ) -> Result { let request = Self::request_builder("localhost", "POST", path, param).unwrap(); - self.h2.clone() - .ready() - .map_err(Error::from) - .and_then(move |mut send_request| { - let (response, stream) = send_request.send_request(request, false).unwrap(); - PipeToSendStream::new(bytes::Bytes::from(data), stream) - .and_then(|_| { - response - .map_err(Error::from) - .and_then(Self::h2api_response) - }) + let mut send_request = self.h2.clone().ready().await?; + + let (response, stream) = send_request.send_request(request, false).unwrap(); + PipeToSendStream::new(bytes::Bytes::from(data), stream) + .and_then(|_| { + response + .map_err(Error::from) + .and_then(Self::h2api_response) }) + .await } - fn request( + async fn request( &self, request: Request<()>, - ) -> impl Future> { + ) -> Result { self.send_request(request, None) .and_then(move |response| { @@ -1179,6 +1181,7 @@ impl H2Client { .map_err(Error::from) .and_then(Self::h2api_response) }) + .await } fn send_request(