src/client/merge_known_chunks.rs: use MergedChunkInfo as input

This commit is contained in:
Dietmar Maurer 2019-05-26 10:52:56 +02:00
parent 05cba08c9c
commit 624362226e
2 changed files with 45 additions and 43 deletions

View File

@ -514,7 +514,6 @@ impl BackupClient {
hyper::rt::spawn( hyper::rt::spawn(
verify_queue_rx verify_queue_rx
.map_err(Error::from) .map_err(Error::from)
//.for_each(|response: h2::client::ResponseFuture| {
.and_then(move |merged_chunk_info| { .and_then(move |merged_chunk_info| {
match merged_chunk_info { match merged_chunk_info {
MergedChunkInfo::New(chunk_info) => { MergedChunkInfo::New(chunk_info) => {
@ -544,6 +543,7 @@ impl BackupClient {
} }
} }
}) })
.merge_known_chunks()
.and_then(move |merged_chunk_info| { .and_then(move |merged_chunk_info| {
match merged_chunk_info { match merged_chunk_info {
MergedChunkInfo::Known(chunk_list) => { MergedChunkInfo::Known(chunk_list) => {
@ -642,9 +642,17 @@ impl BackupClient {
.map(move |chunk_info| { .map(move |chunk_info| {
repeat.fetch_add(1, Ordering::SeqCst); repeat.fetch_add(1, Ordering::SeqCst);
stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst); stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst);
chunk_info
let mut known_chunks = known_chunks.lock().unwrap();
let chunk_is_known = known_chunks.contains(&chunk_info.digest);
if chunk_is_known {
MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)])
} else {
known_chunks.insert(chunk_info.digest);
MergedChunkInfo::New(chunk_info)
}
}) })
.merge_known_chunks(known_chunks.clone()) .merge_known_chunks()
.for_each(move |merged_chunk_info| { .for_each(move |merged_chunk_info| {
upload_queue.clone().send(merged_chunk_info) upload_queue.clone().send(merged_chunk_info)
.map(|_| ()).map_err(Error::from) .map(|_| ()).map_err(Error::from)

View File

@ -1,7 +1,5 @@
use failure::*; use failure::*;
use futures::*; use futures::*;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
pub struct ChunkInfo { pub struct ChunkInfo {
pub digest: [u8; 32], pub digest: [u8; 32],
@ -15,25 +13,24 @@ pub enum MergedChunkInfo {
} }
pub trait MergeKnownChunks: Sized { pub trait MergeKnownChunks: Sized {
fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self>; fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self>;
} }
pub struct MergeKnownChunksQueue<S> { pub struct MergeKnownChunksQueue<S> {
input: S, input: S,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
buffer: Option<MergedChunkInfo>, buffer: Option<MergedChunkInfo>,
} }
impl <S> MergeKnownChunks for S impl <S> MergeKnownChunks for S
where S: Stream<Item=ChunkInfo, Error=Error>, where S: Stream<Item=MergedChunkInfo, Error=Error>,
{ {
fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self> { fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self> {
MergeKnownChunksQueue { input: self, known_chunks, buffer: None } MergeKnownChunksQueue { input: self, buffer: None }
} }
} }
impl <S> Stream for MergeKnownChunksQueue<S> impl <S> Stream for MergeKnownChunksQueue<S>
where S: Stream<Item=ChunkInfo, Error=Error>, where S: Stream<Item=MergedChunkInfo, Error=Error>,
{ {
type Item = MergedChunkInfo; type Item = MergedChunkInfo;
type Error = Error; type Error = Error;
@ -54,45 +51,42 @@ impl <S> Stream for MergeKnownChunksQueue<S>
return Ok(Async::Ready(None)); return Ok(Async::Ready(None));
} }
} }
Ok(Async::Ready(Some(chunk_info))) => { Ok(Async::Ready(Some(mergerd_chunk_info))) => {
let mut known_chunks = self.known_chunks.lock().unwrap(); match mergerd_chunk_info {
let chunk_is_known = known_chunks.contains(&chunk_info.digest); MergedChunkInfo::Known(list) => {
if chunk_is_known { let last = self.buffer.take();
let last = self.buffer.take(); match last {
None => {
match last { self.buffer = Some(MergedChunkInfo::Known(list));
None => { // continue
self.buffer = Some(MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)]));
// continue
}
Some(MergedChunkInfo::Known(mut list)) => {
list.push((chunk_info.offset, chunk_info.digest));
let len = list.len();
self.buffer = Some(MergedChunkInfo::Known(list));
if len >= 64 {
return Ok(Async::Ready(self.buffer.take()));
} }
// continue Some(MergedChunkInfo::Known(mut last_list)) => {
last_list.extend_from_slice(&list);
let len = last_list.len();
self.buffer = Some(MergedChunkInfo::Known(last_list));
} if len >= 64 {
Some(MergedChunkInfo::New(_)) => { return Ok(Async::Ready(self.buffer.take()));
self.buffer = Some(MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)])); }
return Ok(Async::Ready(last)); // continue
}
Some(MergedChunkInfo::New(_)) => {
self.buffer = Some(MergedChunkInfo::Known(list));
return Ok(Async::Ready(last));
}
} }
} }
MergedChunkInfo::New(chunk_info) => {
} else { let new = MergedChunkInfo::New(chunk_info);
known_chunks.insert(chunk_info.digest); if let Some(last) = self.buffer.take() {
let new = MergedChunkInfo::New(chunk_info); self.buffer = Some(new);
if let Some(last) = self.buffer.take() { return Ok(Async::Ready(Some(last)));
self.buffer = Some(new); } else {
return Ok(Async::Ready(Some(last))); return Ok(Async::Ready(Some(new)));
} else { }
return Ok(Async::Ready(Some(new)));
} }
} }
} }