diff --git a/src/backup/chunk_stream.rs b/src/backup/chunk_stream.rs index e8180a75..31d6ea94 100644 --- a/src/backup/chunk_stream.rs +++ b/src/backup/chunk_stream.rs @@ -9,13 +9,13 @@ pub struct ChunkStream, Error=Error>> { input: S, chunker: Chunker, buffer: Option>, - rest: Option>, + scan: Option>, } impl , Error=Error>> ChunkStream { pub fn new(input: S) -> Self { - Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: None, rest: None } + Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: None, scan: None} } } @@ -26,6 +26,27 @@ impl , Error=Error>> Stream for ChunkStream { fn poll(&mut self) -> Poll>, Error> { loop { + + if let Some(data) = self.scan.take() { + let buffer = self.buffer.get_or_insert_with(|| Vec::with_capacity(1024*1024)); + let boundary = self.chunker.scan(&data); + + if boundary == 0 { + buffer.extend(data); + // continue poll + } else if boundary == data.len() { + buffer.extend(data); + return Ok(Async::Ready(self.buffer.take())); + } else if boundary < data.len() { + let (left, right) = data.split_at(boundary); + buffer.extend(left); + self.scan = Some(right.to_vec()); + return Ok(Async::Ready(self.buffer.take())); + } else { + panic!("got unexpected chunk boundary from chunker"); + } + } + match self.input.poll() { Err(err) => { return Err(err); @@ -35,7 +56,7 @@ impl , Error=Error>> Stream for ChunkStream { } Ok(Async::Ready(None)) => { let mut data = self.buffer.take().or_else(|| Some(vec![])).unwrap(); - if let Some(rest) = self.rest.take() { data.extend(rest); } + if let Some(rest) = self.scan.take() { data.extend(rest); } if data.len() > 0 { return Ok(Async::Ready(Some(data))); @@ -44,34 +65,8 @@ impl , Error=Error>> Stream for ChunkStream { } } Ok(Async::Ready(Some(data))) => { - - let data = if let Some(mut rest) = self.rest.take() { - rest.extend(data); - rest - } else { - data - }; - - let buffer = self.buffer.get_or_insert_with(|| Vec::with_capacity(1024*1024)); - let boundary = self.chunker.scan(&data); - - if boundary == 0 { - buffer.extend(data); - // continue poll - } else if boundary == data.len() { - buffer.extend(data); - return Ok(Async::Ready(self.buffer.take())); - } else if boundary < data.len() { - let (left, right) = data.split_at(boundary); - buffer.extend(left); - - let rest = self.rest.get_or_insert_with(|| Vec::with_capacity(right.len())); - rest.extend(right); - - return Ok(Async::Ready(self.buffer.take())); - } else { - panic!("got unexpected chunk boundary from chunker"); - } + let scan = self.scan.get_or_insert_with(|| Vec::with_capacity(1024*1024)); + scan.extend(data); } } }