src/client/http_client.rs - BackupClient: use async

This commit is contained in:
Dietmar Maurer 2019-09-04 14:52:19 +02:00
parent d4a085e564
commit 2a1e6d7dea

View File

@ -575,36 +575,38 @@ impl BackupClient {
Arc::new(Self { h2, canceller }) Arc::new(Self { h2, canceller })
} }
pub fn get( pub async fn get(
&self, &self,
path: &str, path: &str,
param: Option<Value>, param: Option<Value>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
self.h2.get(path, param) self.h2.get(path, param).await
} }
pub fn put( pub async fn put(
&self, &self,
path: &str, path: &str,
param: Option<Value>, param: Option<Value>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
self.h2.put(path, param) self.h2.put(path, param).await
} }
pub fn post( pub async fn post(
&self, &self,
path: &str, path: &str,
param: Option<Value>, param: Option<Value>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
self.h2.post(path, param) self.h2.post(path, param).await
} }
pub fn finish(self: Arc<Self>) -> impl Future<Output = Result<(), Error>> { pub async fn finish(self: Arc<Self>) -> Result<(), Error> {
self.h2.clone() let h2 = self.h2.clone();
.post("finish", None)
h2.post("finish", None)
.map_ok(move |_| { .map_ok(move |_| {
self.canceller.cancel(); self.canceller.cancel();
}) })
.await
} }
pub fn force_close(self) { pub fn force_close(self) {
@ -1090,74 +1092,74 @@ impl H2Client {
Self { h2 } Self { h2 }
} }
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Output = Result<Value, Error>> { pub async fn get(
&self,
path: &str,
param: Option<Value>
) -> Result<Value, Error> {
let req = Self::request_builder("localhost", "GET", path, param).unwrap(); let req = Self::request_builder("localhost", "GET", path, param).unwrap();
self.request(req) self.request(req).await
} }
pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Output = Result<Value, Error>> { pub async fn put(
&self,
path: &str,
param: Option<Value>
) -> Result<Value, Error> {
let req = Self::request_builder("localhost", "PUT", path, param).unwrap(); let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
self.request(req) self.request(req).await
} }
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Output = Result<Value, Error>> { pub async fn post(
&self,
path: &str,
param: Option<Value>
) -> Result<Value, Error> {
let req = Self::request_builder("localhost", "POST", path, param).unwrap(); let req = Self::request_builder("localhost", "POST", path, param).unwrap();
self.request(req) self.request(req).await
} }
pub async fn download<W: Write + Send>( pub async fn download<W: Write + Send>(
&self, &self,
path: &str, path: &str,
param: Option<Value>, param: Option<Value>,
output: W, mut output: W,
) -> Result<W, Error> { ) -> Result<W, Error> {
let request = Self::request_builder("localhost", "GET", path, param).unwrap(); let request = Self::request_builder("localhost", "GET", path, param).unwrap();
self.send_request(request, None) let response_future = self.send_request(request, None).await?;
.and_then(move |response| {
response let resp = response_future.await?;
.map_err(Error::from)
.and_then(move |resp| {
let status = resp.status(); let status = resp.status();
if !status.is_success() { if !status.is_success() {
future::Either::Left(
H2Client::h2api_response(resp) H2Client::h2api_response(resp)
.map(|_| Err(format_err!("unknown error"))) .map(|_| Err(format_err!("unknown error")))
) .await?;
} else { unreachable!();
}
let mut body = resp.into_body(); let mut body = resp.into_body();
let release_capacity = body.release_capacity().clone(); let mut release_capacity = body.release_capacity().clone();
future::Either::Right( while let Some(chunk) = body.try_next().await? {
body
.map_err(Error::from)
.try_fold(output, move |mut acc, chunk| {
let mut release_capacity = release_capacity.clone();
async move {
let _ = release_capacity.release_capacity(chunk.len()); let _ = release_capacity.release_capacity(chunk.len());
acc.write_all(&chunk)?; output.write_all(&chunk)?;
Ok::<_, Error>(acc)
}
})
)
}
})
})
.await
} }
pub fn upload( Ok(output)
}
pub async fn upload(
&self, &self,
path: &str, path: &str,
param: Option<Value>, param: Option<Value>,
data: Vec<u8>, data: Vec<u8>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
let request = Self::request_builder("localhost", "POST", path, param).unwrap(); let request = Self::request_builder("localhost", "POST", path, param).unwrap();
self.h2.clone() let mut send_request = self.h2.clone().ready().await?;
.ready()
.map_err(Error::from)
.and_then(move |mut send_request| {
let (response, stream) = send_request.send_request(request, false).unwrap(); let (response, stream) = send_request.send_request(request, false).unwrap();
PipeToSendStream::new(bytes::Bytes::from(data), stream) PipeToSendStream::new(bytes::Bytes::from(data), stream)
.and_then(|_| { .and_then(|_| {
@ -1165,13 +1167,13 @@ impl H2Client {
.map_err(Error::from) .map_err(Error::from)
.and_then(Self::h2api_response) .and_then(Self::h2api_response)
}) })
}) .await
} }
fn request( async fn request(
&self, &self,
request: Request<()>, request: Request<()>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
self.send_request(request, None) self.send_request(request, None)
.and_then(move |response| { .and_then(move |response| {
@ -1179,6 +1181,7 @@ impl H2Client {
.map_err(Error::from) .map_err(Error::from)
.and_then(Self::h2api_response) .and_then(Self::h2api_response)
}) })
.await
} }
fn send_request( fn send_request(