diff --git a/src/client/merge_known_chunks.rs b/src/client/merge_known_chunks.rs index ce852fa9..8010302d 100644 --- a/src/client/merge_known_chunks.rs +++ b/src/client/merge_known_chunks.rs @@ -1,6 +1,6 @@ use failure::*; use futures::*; -use std::collections::{VecDeque, HashSet}; +use std::collections::HashSet; use std::sync::{Arc, Mutex}; pub struct ChunkInfo { @@ -21,14 +21,14 @@ pub trait MergeKnownChunks: Sized { pub struct MergeKnownChunksQueue { input: S, known_chunks: Arc>>, - queue: VecDeque, + buffer: Option, } impl MergeKnownChunks for S where S: Stream, { fn merge_known_chunks(self, known_chunks: Arc>>) -> MergeKnownChunksQueue { - MergeKnownChunksQueue { input: self, known_chunks, queue: VecDeque::new() } + MergeKnownChunksQueue { input: self, known_chunks, buffer: None } } } @@ -40,19 +40,6 @@ impl Stream for MergeKnownChunksQueue fn poll(&mut self) -> Poll, Error> { loop { - - if let Some(first) = self.queue.front() { - if let MergedChunkInfo::New(_) = first { - return Ok(Async::Ready(self.queue.pop_front())); - } else if self.queue.len() > 1 { - return Ok(Async::Ready(self.queue.pop_front())); - } else if let MergedChunkInfo::Known(list) = first { - if list.len() >= 64 { - return Ok(Async::Ready(self.queue.pop_front())); - } - } - } - match self.input.poll() { Err(err) => { return Err(err); @@ -61,10 +48,10 @@ impl Stream for MergeKnownChunksQueue return Ok(Async::NotReady); } Ok(Async::Ready(None)) => { - if let Some(item) = self.queue.pop_front() { - return Ok(Async::Ready(Some(item))); + if let Some(last) = self.buffer.take() { + return Ok(Async::Ready(Some(last))); } else { - return Ok(Async::Ready(None)); + return Ok(Async::Ready(None)); } } Ok(Async::Ready(Some(chunk_info))) => { @@ -74,21 +61,39 @@ impl Stream for MergeKnownChunksQueue if chunk_is_known { - if let Some(last) = self.queue.back_mut() { - if let MergedChunkInfo::Known(list) = last { - list.push(chunk_info); - } else { - let result = MergedChunkInfo::Known(vec![chunk_info]); - self.queue.push_back(result); + let last = self.buffer.take(); + + match last { + None => { + self.buffer = Some(MergedChunkInfo::Known(vec![chunk_info])); + // continue + } + Some(MergedChunkInfo::Known(mut list)) => { + list.push(chunk_info); + let len = list.len(); + self.buffer = Some(MergedChunkInfo::Known(list)); + + if len >= 64 { + return Ok(Async::Ready(self.buffer.take())); + } + // continue + + } + Some(MergedChunkInfo::New(_)) => { + self.buffer = Some(MergedChunkInfo::Known(vec![chunk_info])); + return Ok(Async::Ready(last)); } - } else { - let result = MergedChunkInfo::Known(vec![chunk_info]); - self.queue.push_back(result); } + } else { known_chunks.insert(chunk_info.digest); - let result = MergedChunkInfo::New(chunk_info); - self.queue.push_back(result); + let new = MergedChunkInfo::New(chunk_info); + if let Some(last) = self.buffer.take() { + self.buffer = Some(new); + return Ok(Async::Ready(Some(last))); + } else { + return Ok(Async::Ready(Some(new))); + } } } }