src/client/http_client.rs: use async for download_chunk_list

This commit is contained in:
Dietmar Maurer 2019-09-05 13:12:03 +02:00
parent 2f831baec0
commit c2a5a9f353

View File

@ -828,45 +828,37 @@ impl BackupClient {
(verify_queue_tx, verify_result_rx) (verify_queue_tx, verify_result_rx)
} }
fn download_chunk_list( async fn download_chunk_list(
h2: H2Client, h2: H2Client,
path: &str, path: &str,
archive_name: &str, archive_name: &str,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> impl Future<Output = Result<(), Error>> { ) -> Result<(), Error> {
let param = json!({ "archive-name": archive_name }); let param = json!({ "archive-name": archive_name });
let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap(); let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap();
h2.send_request(request, None) let h2request = h2.send_request(request, None).await?;
.and_then(move |response| { let resp = h2request.await?;
response
.map_err(Error::from)
.and_then(move |resp| {
let status = resp.status();
if !status.is_success() { let status = resp.status();
future::Either::Left(
H2Client::h2api_response(resp)
.map(|_| Err(format_err!("unknown error")))
)
} else {
future::Either::Right(future::ok(resp.into_body()))
}
})
.and_then(move |mut body| {
let mut release_capacity = body.release_capacity().clone(); if !status.is_success() {
H2Client::h2api_response(resp).await?; // raise error
unreachable!();
}
DigestListDecoder::new(body.map_err(Error::from)) let mut body = resp.into_body();
.try_for_each(move |chunk| { let mut release_capacity = body.release_capacity().clone();
let _ = release_capacity.release_capacity(chunk.len());
println!("GOT DOWNLOAD {}", digest_to_hex(&chunk)); DigestListDecoder::new(body.map_err(Error::from))
known_chunks.lock().unwrap().insert(chunk); .try_for_each(move |chunk| {
futures::future::ok(()) let _ = release_capacity.release_capacity(chunk.len());
}) println!("GOT DOWNLOAD {}", digest_to_hex(&chunk));
}) known_chunks.lock().unwrap().insert(chunk);
futures::future::ok(())
}) })
.await
} }
fn upload_chunk_info_stream( fn upload_chunk_info_stream(