diff --git a/src/backup/chunk_stream.rs b/src/backup/chunk_stream.rs index 040c7c77..fb11ac8d 100644 --- a/src/backup/chunk_stream.rs +++ b/src/backup/chunk_stream.rs @@ -4,17 +4,19 @@ use proxmox_protocol::Chunker; use futures::{Async, Poll}; use futures::stream::Stream; +use bytes::BytesMut; + /// Split input stream into dynamic sized chunks pub struct ChunkStream { input: S, chunker: Chunker, - buffer: Option>, - scan: Option>, + buffer: BytesMut, + scan_pos: usize, } impl ChunkStream { pub fn new(input: S) -> Self { - Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: None, scan: None} + Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: BytesMut::new(), scan_pos: 0} } } @@ -24,27 +26,24 @@ impl Stream for ChunkStream S::Error: Into, { - type Item = Vec; + type Item = BytesMut; type Error = Error; - fn poll(&mut self) -> Poll>, Error> { + fn poll(&mut self) -> Poll, Error> { loop { - if let Some(data) = self.scan.take() { - let buffer = self.buffer.get_or_insert_with(|| Vec::with_capacity(1024*1024)); - let boundary = self.chunker.scan(&data); + if self.scan_pos < self.buffer.len() { + let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]); + + let chunk_size = self.scan_pos + boundary; if boundary == 0 { - buffer.extend(data); + self.scan_pos = self.buffer.len(); // continue poll - } else if boundary == data.len() { - buffer.extend(data); - return Ok(Async::Ready(self.buffer.take())); - } else if boundary < data.len() { - let (left, right) = data.split_at(boundary); - buffer.extend(left); - self.scan = Some(right.to_vec()); - return Ok(Async::Ready(self.buffer.take())); + } else if chunk_size <= self.buffer.len() { + let result = self.buffer.split_to(chunk_size); + self.scan_pos = 0; + return Ok(Async::Ready(Some(result))); } else { panic!("got unexpected chunk boundary from chunker"); } @@ -58,26 +57,21 @@ impl Stream for ChunkStream return Ok(Async::NotReady); } Ok(Async::Ready(None)) => { - let mut data = self.buffer.take().or_else(|| Some(vec![])).unwrap(); - if let Some(rest) = self.scan.take() { data.extend(rest); } - - if data.len() > 0 { - return Ok(Async::Ready(Some(data))); + self.scan_pos = 0; + if self.buffer.len() > 0 { + return Ok(Async::Ready(Some(self.buffer.take()))); } else { return Ok(Async::Ready(None)); } } Ok(Async::Ready(Some(data))) => { - let scan = self.scan.get_or_insert_with(|| Vec::with_capacity(1024*1024)); - scan.extend(data.as_ref()); - } + self.buffer.extend_from_slice(data.as_ref()); + } } } } } -use bytes::BytesMut; - /// Split input stream into fixed sized chunks pub struct FixedChunkStream { input: S,