src/client/http_client.rs: fix request pipelining

This commit is contained in:
Dietmar Maurer 2019-05-27 07:24:32 +02:00
parent 624362226e
commit 174ad378d8

View File

@ -503,7 +503,7 @@ impl BackupClient {
} }
fn upload_chunk_queue(h2: H2Client, wid: u64) -> ( fn upload_chunk_queue(h2: H2Client, wid: u64) -> (
mpsc::Sender<MergedChunkInfo>, mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
sync::oneshot::Receiver<Result<(), Error>> sync::oneshot::Receiver<Result<(), Error>>
) { ) {
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
@ -514,33 +514,22 @@ impl BackupClient {
hyper::rt::spawn( hyper::rt::spawn(
verify_queue_rx verify_queue_rx
.map_err(Error::from) .map_err(Error::from)
.and_then(move |merged_chunk_info| { .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
match merged_chunk_info { match (response, merged_chunk_info) {
MergedChunkInfo::New(chunk_info) => { (Some(response), MergedChunkInfo::Known(list)) => {
let offset = chunk_info.offset;
let digest = chunk_info.digest;
println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest),
chunk_info.data.len(), offset);
let param = json!({ "wid": wid, "offset": offset, "size" : chunk_info.data.len() });
let request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
let upload_data = Some(chunk_info.data.freeze());
future::Either::A( future::Either::A(
h2.send_request(request, upload_data)
.and_then(move |response| {
response response
.map_err(Error::from) .map_err(Error::from)
.and_then(H2Client::h2api_response) .and_then(H2Client::h2api_response)
.and_then(move |result| { .and_then(move |result| {
Ok(MergedChunkInfo::Known(vec![(offset, digest)])) Ok(MergedChunkInfo::Known(list))
}) })
})
.map_err(|err| format_err!("pipelined request failed: {}", err))
) )
} }
MergedChunkInfo::Known(list) => { (None, MergedChunkInfo::Known(list)) => {
future::Either::B(future::ok(MergedChunkInfo::Known(list))) future::Either::B(future::ok(MergedChunkInfo::Known(list)))
} }
_ => unreachable!(),
} }
}) })
.merge_known_chunks() .merge_known_chunks()
@ -654,9 +643,36 @@ impl BackupClient {
}) })
.merge_known_chunks() .merge_known_chunks()
.for_each(move |merged_chunk_info| { .for_each(move |merged_chunk_info| {
upload_queue.clone().send(merged_chunk_info)
if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
let offset = chunk_info.offset;
let digest = chunk_info.digest;
let upload_queue = upload_queue.clone();
println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest),
chunk_info.data.len(), offset);
let param = json!({ "wid": wid, "offset": offset, "size" : chunk_info.data.len() });
let request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
let upload_data = Some(chunk_info.data.freeze());
let new_info = MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)]);
future::Either::A(
h2.send_request(request, upload_data)
.and_then(move |response| {
upload_queue.clone().send((new_info, Some(response)))
.map(|_| ()).map_err(Error::from) .map(|_| ()).map_err(Error::from)
}) })
)
} else {
future::Either::B(
upload_queue.clone().send((merged_chunk_info, None))
.map(|_| ()).map_err(Error::from)
)
}
})
.then(move |result| { .then(move |result| {
println!("RESULT {:?}", result); println!("RESULT {:?}", result);
upload_result.map_err(Error::from).and_then(|upload1_result| { upload_result.map_err(Error::from).and_then(|upload1_result| {