src/client/merge_known_chunks.rs: avoid VecDequeu - a single buffer is enough
This commit is contained in:
parent
2dbba78b98
commit
ecb21b4794
@ -1,6 +1,6 @@
|
|||||||
use failure::*;
|
use failure::*;
|
||||||
use futures::*;
|
use futures::*;
|
||||||
use std::collections::{VecDeque, HashSet};
|
use std::collections::HashSet;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
pub struct ChunkInfo {
|
pub struct ChunkInfo {
|
||||||
@ -21,14 +21,14 @@ pub trait MergeKnownChunks: Sized {
|
|||||||
pub struct MergeKnownChunksQueue<S> {
|
pub struct MergeKnownChunksQueue<S> {
|
||||||
input: S,
|
input: S,
|
||||||
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
queue: VecDeque<MergedChunkInfo>,
|
buffer: Option<MergedChunkInfo>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl <S> MergeKnownChunks for S
|
impl <S> MergeKnownChunks for S
|
||||||
where S: Stream<Item=ChunkInfo, Error=Error>,
|
where S: Stream<Item=ChunkInfo, Error=Error>,
|
||||||
{
|
{
|
||||||
fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self> {
|
fn merge_known_chunks(self, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>) -> MergeKnownChunksQueue<Self> {
|
||||||
MergeKnownChunksQueue { input: self, known_chunks, queue: VecDeque::new() }
|
MergeKnownChunksQueue { input: self, known_chunks, buffer: None }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -40,19 +40,6 @@ impl <S> Stream for MergeKnownChunksQueue<S>
|
|||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<MergedChunkInfo>, Error> {
|
fn poll(&mut self) -> Poll<Option<MergedChunkInfo>, Error> {
|
||||||
loop {
|
loop {
|
||||||
|
|
||||||
if let Some(first) = self.queue.front() {
|
|
||||||
if let MergedChunkInfo::New(_) = first {
|
|
||||||
return Ok(Async::Ready(self.queue.pop_front()));
|
|
||||||
} else if self.queue.len() > 1 {
|
|
||||||
return Ok(Async::Ready(self.queue.pop_front()));
|
|
||||||
} else if let MergedChunkInfo::Known(list) = first {
|
|
||||||
if list.len() >= 64 {
|
|
||||||
return Ok(Async::Ready(self.queue.pop_front()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.input.poll() {
|
match self.input.poll() {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
return Err(err);
|
return Err(err);
|
||||||
@ -61,10 +48,10 @@ impl <S> Stream for MergeKnownChunksQueue<S>
|
|||||||
return Ok(Async::NotReady);
|
return Ok(Async::NotReady);
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
if let Some(item) = self.queue.pop_front() {
|
if let Some(last) = self.buffer.take() {
|
||||||
return Ok(Async::Ready(Some(item)));
|
return Ok(Async::Ready(Some(last)));
|
||||||
} else {
|
} else {
|
||||||
return Ok(Async::Ready(None));
|
return Ok(Async::Ready(None));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(chunk_info))) => {
|
Ok(Async::Ready(Some(chunk_info))) => {
|
||||||
@ -74,21 +61,39 @@ impl <S> Stream for MergeKnownChunksQueue<S>
|
|||||||
|
|
||||||
if chunk_is_known {
|
if chunk_is_known {
|
||||||
|
|
||||||
if let Some(last) = self.queue.back_mut() {
|
let last = self.buffer.take();
|
||||||
if let MergedChunkInfo::Known(list) = last {
|
|
||||||
list.push(chunk_info);
|
match last {
|
||||||
} else {
|
None => {
|
||||||
let result = MergedChunkInfo::Known(vec![chunk_info]);
|
self.buffer = Some(MergedChunkInfo::Known(vec![chunk_info]));
|
||||||
self.queue.push_back(result);
|
// continue
|
||||||
|
}
|
||||||
|
Some(MergedChunkInfo::Known(mut list)) => {
|
||||||
|
list.push(chunk_info);
|
||||||
|
let len = list.len();
|
||||||
|
self.buffer = Some(MergedChunkInfo::Known(list));
|
||||||
|
|
||||||
|
if len >= 64 {
|
||||||
|
return Ok(Async::Ready(self.buffer.take()));
|
||||||
|
}
|
||||||
|
// continue
|
||||||
|
|
||||||
|
}
|
||||||
|
Some(MergedChunkInfo::New(_)) => {
|
||||||
|
self.buffer = Some(MergedChunkInfo::Known(vec![chunk_info]));
|
||||||
|
return Ok(Async::Ready(last));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
let result = MergedChunkInfo::Known(vec![chunk_info]);
|
|
||||||
self.queue.push_back(result);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
known_chunks.insert(chunk_info.digest);
|
known_chunks.insert(chunk_info.digest);
|
||||||
let result = MergedChunkInfo::New(chunk_info);
|
let new = MergedChunkInfo::New(chunk_info);
|
||||||
self.queue.push_back(result);
|
if let Some(last) = self.buffer.take() {
|
||||||
|
self.buffer = Some(new);
|
||||||
|
return Ok(Async::Ready(Some(last)));
|
||||||
|
} else {
|
||||||
|
return Ok(Async::Ready(Some(new)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user