From 174ad378d81521904ffc0a52ce2169904de65467 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Mon, 27 May 2019 07:24:32 +0200 Subject: [PATCH] src/client/http_client.rs: fix request pipelining --- src/client/http_client.rs | 64 ++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 1793e432..3f78b2e3 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -503,7 +503,7 @@ impl BackupClient { } fn upload_chunk_queue(h2: H2Client, wid: u64) -> ( - mpsc::Sender, + mpsc::Sender<(MergedChunkInfo, Option)>, sync::oneshot::Receiver> ) { let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); @@ -514,33 +514,22 @@ impl BackupClient { hyper::rt::spawn( verify_queue_rx .map_err(Error::from) - .and_then(move |merged_chunk_info| { - match merged_chunk_info { - MergedChunkInfo::New(chunk_info) => { - let offset = chunk_info.offset; - let digest = chunk_info.digest; - println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest), - chunk_info.data.len(), offset); - let param = json!({ "wid": wid, "offset": offset, "size" : chunk_info.data.len() }); - let request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); - let upload_data = Some(chunk_info.data.freeze()); - + .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { + match (response, merged_chunk_info) { + (Some(response), MergedChunkInfo::Known(list)) => { future::Either::A( - h2.send_request(request, upload_data) - .and_then(move |response| { - response - .map_err(Error::from) - .and_then(H2Client::h2api_response) - .and_then(move |result| { - Ok(MergedChunkInfo::Known(vec![(offset, digest)])) - }) + response + .map_err(Error::from) + .and_then(H2Client::h2api_response) + .and_then(move |result| { + Ok(MergedChunkInfo::Known(list)) }) - .map_err(|err| format_err!("pipelined request failed: {}", err)) ) } - MergedChunkInfo::Known(list) => { + (None, MergedChunkInfo::Known(list)) => { future::Either::B(future::ok(MergedChunkInfo::Known(list))) } + _ => unreachable!(), } }) .merge_known_chunks() @@ -654,8 +643,35 @@ impl BackupClient { }) .merge_known_chunks() .for_each(move |merged_chunk_info| { - upload_queue.clone().send(merged_chunk_info) - .map(|_| ()).map_err(Error::from) + + if let MergedChunkInfo::New(chunk_info) = merged_chunk_info { + let offset = chunk_info.offset; + let digest = chunk_info.digest; + let upload_queue = upload_queue.clone(); + + println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest), + chunk_info.data.len(), offset); + + let param = json!({ "wid": wid, "offset": offset, "size" : chunk_info.data.len() }); + let request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); + let upload_data = Some(chunk_info.data.freeze()); + + let new_info = MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)]); + + future::Either::A( + h2.send_request(request, upload_data) + .and_then(move |response| { + upload_queue.clone().send((new_info, Some(response))) + .map(|_| ()).map_err(Error::from) + }) + ) + } else { + + future::Either::B( + upload_queue.clone().send((merged_chunk_info, None)) + .map(|_| ()).map_err(Error::from) + ) + } }) .then(move |result| { println!("RESULT {:?}", result);