diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index a7b57ebe..2471eeb3 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -270,7 +270,6 @@ fn dynamic_append ( let digest = crate::tools::hex_to_digest(digest_str)?; let offset = offset_list[i].as_u64().unwrap(); let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?; - env.dynamic_writer_append_chunk(wid, offset, size, &digest)?; env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid)); diff --git a/src/api2/admin/datastore/backup/upload_chunk.rs b/src/api2/admin/datastore/backup/upload_chunk.rs index 9dc4a6c5..c20559cd 100644 --- a/src/api2/admin/datastore/backup/upload_chunk.rs +++ b/src/api2/admin/datastore/backup/upload_chunk.rs @@ -96,7 +96,6 @@ fn upload_dynamic_chunk( let result = result.and_then(|(digest, size)| { env.register_chunk(digest, size)?; - env.dynamic_writer_append_chunk(wid, offset, size, &digest)?; Ok(json!(tools::digest_to_hex(&digest))) }); diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 9879a252..9b327324 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -502,6 +502,86 @@ impl BackupClient { (verify_queue_tx, verify_result_rx) } + fn upload_chunk_queue(h2: H2Client, wid: u64) -> ( + mpsc::Sender, + sync::oneshot::Receiver> + ) { + let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); + let (verify_result_tx, verify_result_rx) = sync::oneshot::channel(); + + let h2_2 = h2.clone(); + + hyper::rt::spawn( + verify_queue_rx + .map_err(Error::from) + //.for_each(|response: h2::client::ResponseFuture| { + .and_then(move |merged_chunk_info| { + match merged_chunk_info { + MergedChunkInfo::New(chunk_info) => { + let offset = chunk_info.offset; + let digest = chunk_info.digest; + println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest), + chunk_info.data.len(), offset); + let param = json!({ "wid": wid, "offset": offset, "size" : chunk_info.data.len() }); + let request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); + let upload_data = Some(chunk_info.data.freeze()); + + future::Either::A( + h2.send_request(request, upload_data) + .and_then(move |response| { + response + .map_err(Error::from) + .and_then(H2Client::h2api_response) + .and_then(move |result| { + Ok(MergedChunkInfo::Known(vec![(offset, digest)])) + }) + }) + .map_err(|err| format_err!("pipelined request failed: {}", err)) + ) + } + MergedChunkInfo::Known(list) => { + future::Either::B(future::ok(MergedChunkInfo::Known(list))) + } + } + }) + .and_then(move |merged_chunk_info| { + match merged_chunk_info { + MergedChunkInfo::Known(chunk_list) => { + let mut digest_list = vec![]; + let mut offset_list = vec![]; + for (offset, digest) in chunk_list { + //println!("append chunk {} (offset {})", tools::digest_to_hex(&digest), offset); + digest_list.push(tools::digest_to_hex(&digest)); + offset_list.push(offset); + } + println!("append chunks list len ({})", digest_list.len()); + let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }); + let mut request = H2Client::request_builder("localhost", "PUT", "dynamic_index", None).unwrap(); + request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json")); + let param_data = bytes::Bytes::from(param.to_string().as_bytes()); + let upload_data = Some(param_data); + h2_2.send_request(request, upload_data) + .and_then(move |response| { + response + .map_err(Error::from) + .and_then(H2Client::h2api_response) + .and_then(|_| Ok(())) + }) + .map_err(|err| format_err!("pipelined request failed: {}", err)) + } + _ => unreachable!(), + } + }) + .for_each(|_| Ok(())) + .then(|result| + verify_result_tx.send(result) + ) + .map_err(|_| { /* ignore closed channel */ }) + ); + + (verify_queue_tx, verify_result_rx) + } + fn download_chunk_list( h2: H2Client, path: &str, @@ -554,7 +634,7 @@ impl BackupClient { let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); let stream_len2 = stream_len.clone(); - let (upload_queue, upload_result) = Self::response_queue(); + let (upload_queue, upload_result) = Self::upload_chunk_queue(h2.clone(), wid); let start_time = std::time::Instant::now(); @@ -566,44 +646,8 @@ impl BackupClient { }) .merge_known_chunks(known_chunks.clone()) .for_each(move |merged_chunk_info| { - let h2 = h2.clone(); - - let upload_queue = upload_queue.clone(); - - let upload_data; - let mut request; - - match merged_chunk_info { - MergedChunkInfo::New(chunk_info) => { - println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&chunk_info.digest), - chunk_info.data.len(), chunk_info.offset); - let param = json!({ "wid": wid, "offset": chunk_info.offset, "size" : chunk_info.data.len() }); - request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); - upload_data = Some(chunk_info.data.freeze()); - } - MergedChunkInfo::Known(chunk_list) => { - let mut digest_list = vec![]; - let mut offset_list = vec![]; - for (offset, digest) in chunk_list { - println!("append chunk {} (offset {})", tools::digest_to_hex(&digest), offset); - digest_list.push(tools::digest_to_hex(&digest)); - offset_list.push(offset); - } - println!("append existing chunks list len ({})", digest_list.len()); - let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }); - request = H2Client::request_builder("localhost", "PUT", "dynamic_index", None).unwrap(); - request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json")); - let param_data = bytes::Bytes::from(param.to_string().as_bytes()); - println!("DATALEN {}", param_data.len()); - upload_data = Some(param_data); - } - } - - h2.send_request(request, upload_data) - .and_then(move |response| { - upload_queue.send(response) - .map(|_| ()).map_err(Error::from) - }) + upload_queue.clone().send(merged_chunk_info) + .map(|_| ()).map_err(Error::from) }) .then(move |result| { println!("RESULT {:?}", result);