use std::pin::Pin; use std::task::{Context, Poll}; use anyhow::Error; use bytes::BytesMut; use futures::ready; use futures::stream::{Stream, TryStream}; use pbs_datastore::Chunker; /// Split input stream into dynamic sized chunks pub struct ChunkStream { input: S, chunker: Chunker, buffer: BytesMut, scan_pos: usize, } impl ChunkStream { pub fn new(input: S, chunk_size: Option) -> Self { Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), buffer: BytesMut::new(), scan_pos: 0, } } } impl Unpin for ChunkStream {} impl Stream for ChunkStream where S: TryStream, S::Ok: AsRef<[u8]>, S::Error: Into, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.get_mut(); loop { if this.scan_pos < this.buffer.len() { let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); let chunk_size = this.scan_pos + boundary; if boundary == 0 { this.scan_pos = this.buffer.len(); // continue poll } else if chunk_size <= this.buffer.len() { let result = this.buffer.split_to(chunk_size); this.scan_pos = 0; return Poll::Ready(Some(Ok(result))); } else { panic!("got unexpected chunk boundary from chunker"); } } match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { Some(Err(err)) => { return Poll::Ready(Some(Err(err.into()))); } None => { this.scan_pos = 0; if !this.buffer.is_empty() { return Poll::Ready(Some(Ok(this.buffer.split()))); } else { return Poll::Ready(None); } } Some(Ok(data)) => { this.buffer.extend_from_slice(data.as_ref()); } } } } } /// Split input stream into fixed sized chunks pub struct FixedChunkStream { input: S, chunk_size: usize, buffer: BytesMut, } impl FixedChunkStream { pub fn new(input: S, chunk_size: usize) -> Self { Self { input, chunk_size, buffer: BytesMut::new(), } } } impl Unpin for FixedChunkStream {} impl Stream for FixedChunkStream where S: TryStream, S::Ok: AsRef<[u8]>, { type Item = Result; fn poll_next( self: Pin<&mut Self>, cx: &mut Context, ) -> Poll>> { let this = self.get_mut(); loop { if this.buffer.len() >= this.chunk_size { return Poll::Ready(Some(Ok(this.buffer.split_to(this.chunk_size)))); } match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { Some(Err(err)) => { return Poll::Ready(Some(Err(err))); } None => { // last chunk can have any size if !this.buffer.is_empty() { return Poll::Ready(Some(Ok(this.buffer.split()))); } else { return Poll::Ready(None); } } Some(Ok(data)) => { this.buffer.extend_from_slice(data.as_ref()); } } } } }