src/backup/index.rs: implement DigestListDecoder
This commit is contained in:
		@ -1,3 +1,7 @@
 | 
			
		||||
use failure::*;
 | 
			
		||||
use futures::*;
 | 
			
		||||
use bytes::{Bytes, BytesMut};
 | 
			
		||||
 | 
			
		||||
pub trait IndexFile: Send {
 | 
			
		||||
    fn index_count(&self) -> usize;
 | 
			
		||||
    fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
 | 
			
		||||
@ -40,3 +44,60 @@ impl std::io::Read for ChunkListReader {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]>
 | 
			
		||||
///
 | 
			
		||||
/// The reader simply returns a birary stream of 32 byte digest values.
 | 
			
		||||
 | 
			
		||||
pub struct DigestListDecoder<S> {
 | 
			
		||||
    input: S,
 | 
			
		||||
    buffer: BytesMut,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <S> DigestListDecoder<S> {
 | 
			
		||||
 | 
			
		||||
    pub fn new(input: S) -> Self {
 | 
			
		||||
        Self { input, buffer: BytesMut::new() }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <S> Stream for DigestListDecoder<S>
 | 
			
		||||
    where S: Stream<Item=Bytes>,
 | 
			
		||||
          S::Error: Into<Error>,
 | 
			
		||||
{
 | 
			
		||||
    type Item = [u8; 32];
 | 
			
		||||
    type Error = Error;
 | 
			
		||||
 | 
			
		||||
    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
 | 
			
		||||
        loop {
 | 
			
		||||
 | 
			
		||||
            if self.buffer.len() >= 32 {
 | 
			
		||||
 | 
			
		||||
                let left = self.buffer.split_to(32);
 | 
			
		||||
 | 
			
		||||
                let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
 | 
			
		||||
                unsafe { std::ptr::copy_nonoverlapping(left.as_ptr(), digest.as_mut_ptr(), 32); }
 | 
			
		||||
 | 
			
		||||
                return Ok(Async::Ready(Some(digest)));
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            match self.input.poll() {
 | 
			
		||||
                Err(err) => {
 | 
			
		||||
                    return Err(err.into());
 | 
			
		||||
                }
 | 
			
		||||
                Ok(Async::NotReady) => {
 | 
			
		||||
                    return Ok(Async::NotReady);
 | 
			
		||||
                }
 | 
			
		||||
                Ok(Async::Ready(None)) => {
 | 
			
		||||
                    let rest = self.buffer.len();
 | 
			
		||||
                    if rest == 0 { return Ok(Async::Ready(None)); }
 | 
			
		||||
                    return Err(format_err!("got small digest ({} != 32).", rest));
 | 
			
		||||
                }
 | 
			
		||||
                Ok(Async::Ready(Some(data))) => {
 | 
			
		||||
                    self.buffer.extend_from_slice(&data);
 | 
			
		||||
                    // continue
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user