diff --git a/src/backup/index.rs b/src/backup/index.rs index 44dabee6..29a0d54a 100644 --- a/src/backup/index.rs +++ b/src/backup/index.rs @@ -1,3 +1,7 @@ +use failure::*; +use futures::*; +use bytes::{Bytes, BytesMut}; + pub trait IndexFile: Send { fn index_count(&self) -> usize; fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>; @@ -40,3 +44,60 @@ impl std::io::Read for ChunkListReader { } } } + +/// Decodes a Stream into Stream +/// +/// The reader simply returns a birary stream of 32 byte digest values. + +pub struct DigestListDecoder { + input: S, + buffer: BytesMut, +} + +impl DigestListDecoder { + + pub fn new(input: S) -> Self { + Self { input, buffer: BytesMut::new() } + } +} + +impl Stream for DigestListDecoder + where S: Stream, + S::Error: Into, +{ + type Item = [u8; 32]; + type Error = Error; + + fn poll(&mut self) -> Result>, Self::Error> { + loop { + + if self.buffer.len() >= 32 { + + let left = self.buffer.split_to(32); + + let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() }; + unsafe { std::ptr::copy_nonoverlapping(left.as_ptr(), digest.as_mut_ptr(), 32); } + + return Ok(Async::Ready(Some(digest))); + } + + match self.input.poll() { + Err(err) => { + return Err(err.into()); + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + } + Ok(Async::Ready(None)) => { + let rest = self.buffer.len(); + if rest == 0 { return Ok(Async::Ready(None)); } + return Err(format_err!("got small digest ({} != 32).", rest)); + } + Ok(Async::Ready(Some(data))) => { + self.buffer.extend_from_slice(&data); + // continue + } + } + } + } +}