From aa1b2e04fe9e7a5f853d7dfcda81dca6b5446d6e Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Thu, 23 May 2019 12:29:33 +0200 Subject: [PATCH] src/client/merge_known_chunks.rs: merge known chunks To decrease the number of api calls required... --- src/api2/admin/datastore/backup.rs | 20 ++++-- src/client.rs | 1 + src/client/http_client.rs | 48 ++++++++------- src/client/merge_known_chunks.rs | 97 ++++++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 28 deletions(-) create mode 100644 src/client/merge_known_chunks.rs diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index a00d431b..e0f5055e 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -230,7 +230,10 @@ pub fn api_method_dynamic_append() -> ApiMethod { ApiMethod::new( dynamic_append, ObjectSchema::new("Append chunk to dynamic index writer.") - .required("digest", StringSchema::new("Chunk digest.")) + .required("digest-list", ArraySchema::new( + "Chunk digest list.", + StringSchema::new("Chunk digest.").into()) + ) .required("wid", IntegerSchema::new("Dynamic writer ID.") .minimum(1) .maximum(256) @@ -245,16 +248,21 @@ fn dynamic_append ( ) -> Result { let wid = tools::required_integer_param(¶m, "wid")? as usize; - let digest_str = tools::required_string_param(¶m, "digest")?; + let digest_list = tools::required_array_param(¶m, "digest-list")?; + + println!("DIGEST LIST LEN {}", digest_list.len()); let env: &BackupEnvironment = rpcenv.as_ref(); - let digest = crate::tools::hex_to_digest(digest_str)?; - let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?; + for item in digest_list { + let digest_str = item.as_str().unwrap(); + let digest = crate::tools::hex_to_digest(digest_str)?; + let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?; - env.dynamic_writer_append_chunk(wid, size, &digest)?; + env.dynamic_writer_append_chunk(wid, size, &digest)?; - env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid)); + env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid)); + } Ok(Value::Null) } diff --git a/src/client.rs b/src/client.rs index 4ba7a803..89b8e153 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,7 @@ //! server using https. mod pipe_to_stream; +mod merge_known_chunks; mod http_client; pub use http_client::*; diff --git a/src/client/http_client.rs b/src/client/http_client.rs index e9a39582..80920701 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -21,6 +21,8 @@ use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; use crate::tools::{self, BroadcastFuture, tty}; use super::pipe_to_stream::*; +use super::merge_known_chunks::*; + #[derive(Clone)] struct AuthInfo { @@ -405,11 +407,6 @@ pub struct BackupClient { h2: H2Client, } -struct ChunkInfo { - digest: [u8; 32], - data: bytes::BytesMut, - offset: u64, -} impl BackupClient { @@ -562,31 +559,38 @@ impl BackupClient { let start_time = std::time::Instant::now(); stream - .for_each(move |chunk_info| { - let h2 = h2.clone(); - + .map(move |chunk_info| { repeat.fetch_add(1, Ordering::SeqCst); stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst); + chunk_info + }) + .merge_known_chunks(known_chunks.clone()) + .for_each(move |merged_chunk_info| { + let h2 = h2.clone(); let upload_queue = upload_queue.clone(); - let mut known_chunks = known_chunks.lock().unwrap(); - let chunk_is_known = known_chunks.contains(&chunk_info.digest); - let upload_data; let request; - if chunk_is_known { - println!("append existing chunk ({} bytes)", chunk_info.data.len()); - let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&chunk_info.digest) }); - request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); - upload_data = None; - } else { - println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&chunk_info.digest), chunk_info.data.len()); - known_chunks.insert(chunk_info.digest); - let param = json!({ "wid": wid, "size" : chunk_info.data.len() }); - request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); - upload_data = Some(chunk_info.data.freeze()); + match merged_chunk_info { + MergedChunkInfo::New(chunk_info) => { + println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&chunk_info.digest), chunk_info.data.len()); + let param = json!({ "wid": wid, "size" : chunk_info.data.len() }); + request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); + upload_data = Some(chunk_info.data.freeze()); + } + MergedChunkInfo::Known(chunk_list) => { + let mut digest_list = vec![]; + for chunk_info in chunk_list { + //println!("append existing chunk ({} bytes)", chunk_info.data.len()); + digest_list.push(tools::digest_to_hex(&chunk_info.digest)); + } + println!("append existing chunks ({})", digest_list.len()); + let param = json!({ "wid": wid, "digest-list": digest_list }); + request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); + upload_data = None; + } } h2.send_request(request, upload_data) diff --git a/src/client/merge_known_chunks.rs b/src/client/merge_known_chunks.rs new file mode 100644 index 00000000..ce852fa9 --- /dev/null +++ b/src/client/merge_known_chunks.rs @@ -0,0 +1,97 @@ +use failure::*; +use futures::*; +use std::collections::{VecDeque, HashSet}; +use std::sync::{Arc, Mutex}; + +pub struct ChunkInfo { + pub digest: [u8; 32], + pub data: bytes::BytesMut, + pub offset: u64, +} + +pub enum MergedChunkInfo { + Known(Vec), + New(ChunkInfo), +} + +pub trait MergeKnownChunks: Sized { + fn merge_known_chunks(self, known_chunks: Arc>>) -> MergeKnownChunksQueue; +} + +pub struct MergeKnownChunksQueue { + input: S, + known_chunks: Arc>>, + queue: VecDeque, +} + +impl MergeKnownChunks for S + where S: Stream, +{ + fn merge_known_chunks(self, known_chunks: Arc>>) -> MergeKnownChunksQueue { + MergeKnownChunksQueue { input: self, known_chunks, queue: VecDeque::new() } + } +} + +impl Stream for MergeKnownChunksQueue + where S: Stream, +{ + type Item = MergedChunkInfo; + type Error = Error; + + fn poll(&mut self) -> Poll, Error> { + loop { + + if let Some(first) = self.queue.front() { + if let MergedChunkInfo::New(_) = first { + return Ok(Async::Ready(self.queue.pop_front())); + } else if self.queue.len() > 1 { + return Ok(Async::Ready(self.queue.pop_front())); + } else if let MergedChunkInfo::Known(list) = first { + if list.len() >= 64 { + return Ok(Async::Ready(self.queue.pop_front())); + } + } + } + + match self.input.poll() { + Err(err) => { + return Err(err); + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + } + Ok(Async::Ready(None)) => { + if let Some(item) = self.queue.pop_front() { + return Ok(Async::Ready(Some(item))); + } else { + return Ok(Async::Ready(None)); + } + } + Ok(Async::Ready(Some(chunk_info))) => { + + let mut known_chunks = self.known_chunks.lock().unwrap(); + let chunk_is_known = known_chunks.contains(&chunk_info.digest); + + if chunk_is_known { + + if let Some(last) = self.queue.back_mut() { + if let MergedChunkInfo::Known(list) = last { + list.push(chunk_info); + } else { + let result = MergedChunkInfo::Known(vec![chunk_info]); + self.queue.push_back(result); + } + } else { + let result = MergedChunkInfo::Known(vec![chunk_info]); + self.queue.push_back(result); + } + } else { + known_chunks.insert(chunk_info.digest); + let result = MergedChunkInfo::New(chunk_info); + self.queue.push_back(result); + } + } + } + } + } +}