diff --git a/src/backup/chunk_stream.rs b/src/backup/chunk_stream.rs index 39cadfe0..e21224dc 100644 --- a/src/backup/chunk_stream.rs +++ b/src/backup/chunk_stream.rs @@ -4,6 +4,7 @@ use proxmox_protocol::Chunker; use futures::{Async, Poll}; use futures::stream::Stream; +/// Split input stream into dynamic sized chunks pub struct ChunkStream, Error=Error>> { input: S, chunker: Chunker, @@ -45,7 +46,7 @@ impl , Error=Error>> Stream for ChunkStream { Ok(Async::Ready(Some(mut data))) => { if let Some(rest) = self.rest.take() { data.extend(rest); } - + let buffer = self.buffer.get_or_insert_with(|| Vec::with_capacity(1024*1024)); let boundary = self.chunker.scan(&data); @@ -71,3 +72,63 @@ impl , Error=Error>> Stream for ChunkStream { } } } + +/// Split input stream into fixed sized chunks +pub struct FixedChunkStream, Error=Error>> { + input: S, + chunk_size: usize, + buffer: Option>, +} + +impl , Error=Error>> FixedChunkStream { + + pub fn new(input: S, chunk_size: usize) -> Self { + Self { input, chunk_size, buffer: None } + } +} + +impl , Error=Error>> Stream for FixedChunkStream { + + type Item = Vec; + type Error = Error; + + fn poll(&mut self) -> Poll>, Error> { + loop { + match self.input.poll() { + Err(err) => { + return Err(err); + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + } + Ok(Async::Ready(None)) => { + // last chunk can have any size + return Ok(Async::Ready(self.buffer.take())); + } + Ok(Async::Ready(Some(data))) => { + let buffer = self.buffer.get_or_insert_with(|| Vec::with_capacity(1024*1024)); + let need = self.chunk_size - buffer.len(); + + if need > data.len() { + buffer.extend(data); + // continue poll + } else if need == data.len() { + buffer.extend(data); + return Ok(Async::Ready(self.buffer.take())); + } else if need < data.len() { + let (left, right) = data.split_at(need); + buffer.extend(left); + + let result = self.buffer.take(); + + self.buffer = Some(Vec::from(right)); + + return Ok(Async::Ready(result)); + } else { + unreachable!(); + } + } + } + } + } +}