src/client/merge_known_chunks.rs: merge known chunks
To decrease the number of api calls required...
This commit is contained in:
parent
91320f0879
commit
aa1b2e04fe
@ -230,7 +230,10 @@ pub fn api_method_dynamic_append() -> ApiMethod {
|
|||||||
ApiMethod::new(
|
ApiMethod::new(
|
||||||
dynamic_append,
|
dynamic_append,
|
||||||
ObjectSchema::new("Append chunk to dynamic index writer.")
|
ObjectSchema::new("Append chunk to dynamic index writer.")
|
||||||
.required("digest", StringSchema::new("Chunk digest."))
|
.required("digest-list", ArraySchema::new(
|
||||||
|
"Chunk digest list.",
|
||||||
|
StringSchema::new("Chunk digest.").into())
|
||||||
|
)
|
||||||
.required("wid", IntegerSchema::new("Dynamic writer ID.")
|
.required("wid", IntegerSchema::new("Dynamic writer ID.")
|
||||||
.minimum(1)
|
.minimum(1)
|
||||||
.maximum(256)
|
.maximum(256)
|
||||||
@ -245,16 +248,21 @@ fn dynamic_append (
|
|||||||
) -> Result<Value, Error> {
|
) -> Result<Value, Error> {
|
||||||
|
|
||||||
let wid = tools::required_integer_param(¶m, "wid")? as usize;
|
let wid = tools::required_integer_param(¶m, "wid")? as usize;
|
||||||
let digest_str = tools::required_string_param(¶m, "digest")?;
|
let digest_list = tools::required_array_param(¶m, "digest-list")?;
|
||||||
|
|
||||||
|
println!("DIGEST LIST LEN {}", digest_list.len());
|
||||||
|
|
||||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||||
|
|
||||||
|
for item in digest_list {
|
||||||
|
let digest_str = item.as_str().unwrap();
|
||||||
let digest = crate::tools::hex_to_digest(digest_str)?;
|
let digest = crate::tools::hex_to_digest(digest_str)?;
|
||||||
let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
|
let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
|
||||||
|
|
||||||
env.dynamic_writer_append_chunk(wid, size, &digest)?;
|
env.dynamic_writer_append_chunk(wid, size, &digest)?;
|
||||||
|
|
||||||
env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid));
|
env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Value::Null)
|
Ok(Value::Null)
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
//! server using https.
|
//! server using https.
|
||||||
|
|
||||||
mod pipe_to_stream;
|
mod pipe_to_stream;
|
||||||
|
mod merge_known_chunks;
|
||||||
|
|
||||||
mod http_client;
|
mod http_client;
|
||||||
pub use http_client::*;
|
pub use http_client::*;
|
||||||
|
@ -21,6 +21,8 @@ use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
|
|||||||
|
|
||||||
use crate::tools::{self, BroadcastFuture, tty};
|
use crate::tools::{self, BroadcastFuture, tty};
|
||||||
use super::pipe_to_stream::*;
|
use super::pipe_to_stream::*;
|
||||||
|
use super::merge_known_chunks::*;
|
||||||
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct AuthInfo {
|
struct AuthInfo {
|
||||||
@ -405,11 +407,6 @@ pub struct BackupClient {
|
|||||||
h2: H2Client,
|
h2: H2Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ChunkInfo {
|
|
||||||
digest: [u8; 32],
|
|
||||||
data: bytes::BytesMut,
|
|
||||||
offset: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BackupClient {
|
impl BackupClient {
|
||||||
|
|
||||||
@ -562,32 +559,39 @@ impl BackupClient {
|
|||||||
let start_time = std::time::Instant::now();
|
let start_time = std::time::Instant::now();
|
||||||
|
|
||||||
stream
|
stream
|
||||||
.for_each(move |chunk_info| {
|
.map(move |chunk_info| {
|
||||||
let h2 = h2.clone();
|
|
||||||
|
|
||||||
repeat.fetch_add(1, Ordering::SeqCst);
|
repeat.fetch_add(1, Ordering::SeqCst);
|
||||||
stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst);
|
stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst);
|
||||||
|
chunk_info
|
||||||
|
})
|
||||||
|
.merge_known_chunks(known_chunks.clone())
|
||||||
|
.for_each(move |merged_chunk_info| {
|
||||||
|
let h2 = h2.clone();
|
||||||
|
|
||||||
let upload_queue = upload_queue.clone();
|
let upload_queue = upload_queue.clone();
|
||||||
|
|
||||||
let mut known_chunks = known_chunks.lock().unwrap();
|
|
||||||
let chunk_is_known = known_chunks.contains(&chunk_info.digest);
|
|
||||||
|
|
||||||
let upload_data;
|
let upload_data;
|
||||||
let request;
|
let request;
|
||||||
|
|
||||||
if chunk_is_known {
|
match merged_chunk_info {
|
||||||
println!("append existing chunk ({} bytes)", chunk_info.data.len());
|
MergedChunkInfo::New(chunk_info) => {
|
||||||
let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&chunk_info.digest) });
|
|
||||||
request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
|
|
||||||
upload_data = None;
|
|
||||||
} else {
|
|
||||||
println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&chunk_info.digest), chunk_info.data.len());
|
println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&chunk_info.digest), chunk_info.data.len());
|
||||||
known_chunks.insert(chunk_info.digest);
|
|
||||||
let param = json!({ "wid": wid, "size" : chunk_info.data.len() });
|
let param = json!({ "wid": wid, "size" : chunk_info.data.len() });
|
||||||
request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
|
request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
|
||||||
upload_data = Some(chunk_info.data.freeze());
|
upload_data = Some(chunk_info.data.freeze());
|
||||||
}
|
}
|
||||||
|
MergedChunkInfo::Known(chunk_list) => {
|
||||||
|
let mut digest_list = vec![];
|
||||||
|
for chunk_info in chunk_list {
|
||||||
|
//println!("append existing chunk ({} bytes)", chunk_info.data.len());
|
||||||
|
digest_list.push(tools::digest_to_hex(&chunk_info.digest));
|
||||||
|
}
|
||||||
|
println!("append existing chunks ({})", digest_list.len());
|
||||||
|
let param = json!({ "wid": wid, "digest-list": digest_list });
|
||||||
|
request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
|
||||||
|
upload_data = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
h2.send_request(request, upload_data)
|
h2.send_request(request, upload_data)
|
||||||
.and_then(move |response| {
|
.and_then(move |response| {
|
||||||
|
97
src/client/merge_known_chunks.rs
Normal file
97
src/client/merge_known_chunks.rs
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
use failure::*;
|
||||||
|
use futures::*;
|
||||||
|
use std::collections::{VecDeque, HashSet};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
pub struct ChunkInfo {
|
||||||
|
pub digest: [u8; 32],
|
||||||
|
pub data: bytes::BytesMut,
|
||||||
|
pub offset: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum MergedChunkInfo {
|
||||||
|
Known(Vec<ChunkInfo>),
|
||||||
|
New(ChunkInfo),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait MergeKnownChunks: Sized {
|
||||||
|
fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MergeKnownChunksQueue<S> {
|
||||||
|
input: S,
|
||||||
|
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
|
queue: VecDeque<MergedChunkInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <S> MergeKnownChunks for S
|
||||||
|
where S: Stream<Item=ChunkInfo, Error=Error>,
|
||||||
|
{
|
||||||
|
fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self> {
|
||||||
|
MergeKnownChunksQueue { input: self, known_chunks, queue: VecDeque::new() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <S> Stream for MergeKnownChunksQueue<S>
|
||||||
|
where S: Stream<Item=ChunkInfo, Error=Error>,
|
||||||
|
{
|
||||||
|
type Item = MergedChunkInfo;
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<MergedChunkInfo>, Error> {
|
||||||
|
loop {
|
||||||
|
|
||||||
|
if let Some(first) = self.queue.front() {
|
||||||
|
if let MergedChunkInfo::New(_) = first {
|
||||||
|
return Ok(Async::Ready(self.queue.pop_front()));
|
||||||
|
} else if self.queue.len() > 1 {
|
||||||
|
return Ok(Async::Ready(self.queue.pop_front()));
|
||||||
|
} else if let MergedChunkInfo::Known(list) = first {
|
||||||
|
if list.len() >= 64 {
|
||||||
|
return Ok(Async::Ready(self.queue.pop_front()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.input.poll() {
|
||||||
|
Err(err) => {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
if let Some(item) = self.queue.pop_front() {
|
||||||
|
return Ok(Async::Ready(Some(item)));
|
||||||
|
} else {
|
||||||
|
return Ok(Async::Ready(None));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(Some(chunk_info))) => {
|
||||||
|
|
||||||
|
let mut known_chunks = self.known_chunks.lock().unwrap();
|
||||||
|
let chunk_is_known = known_chunks.contains(&chunk_info.digest);
|
||||||
|
|
||||||
|
if chunk_is_known {
|
||||||
|
|
||||||
|
if let Some(last) = self.queue.back_mut() {
|
||||||
|
if let MergedChunkInfo::Known(list) = last {
|
||||||
|
list.push(chunk_info);
|
||||||
|
} else {
|
||||||
|
let result = MergedChunkInfo::Known(vec![chunk_info]);
|
||||||
|
self.queue.push_back(result);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let result = MergedChunkInfo::Known(vec![chunk_info]);
|
||||||
|
self.queue.push_back(result);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
known_chunks.insert(chunk_info.digest);
|
||||||
|
let result = MergedChunkInfo::New(chunk_info);
|
||||||
|
self.queue.push_back(result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user