src/backup/index.rs: switch to async
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
		@ -1,4 +1,6 @@
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::task::{Context, Poll};
 | 
			
		||||
 | 
			
		||||
use bytes::{Bytes, BytesMut};
 | 
			
		||||
use failure::*;
 | 
			
		||||
@ -93,55 +95,61 @@ impl std::io::Read for DigestListEncoder {
 | 
			
		||||
///
 | 
			
		||||
/// The reader simply returns a birary stream of 32 byte digest values.
 | 
			
		||||
 | 
			
		||||
pub struct DigestListDecoder<S> {
 | 
			
		||||
pub struct DigestListDecoder<S: Unpin> {
 | 
			
		||||
    input: S,
 | 
			
		||||
    buffer: BytesMut,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <S> DigestListDecoder<S> {
 | 
			
		||||
 | 
			
		||||
impl<S: Unpin> DigestListDecoder<S> {
 | 
			
		||||
    pub fn new(input: S) -> Self {
 | 
			
		||||
        Self { input, buffer: BytesMut::new() }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl <S> Stream for DigestListDecoder<S>
 | 
			
		||||
    where S: Stream<Item=Bytes>,
 | 
			
		||||
          S::Error: Into<Error>,
 | 
			
		||||
impl<S: Unpin> Unpin for DigestListDecoder<S> {}
 | 
			
		||||
 | 
			
		||||
impl<S: Unpin, E> Stream for DigestListDecoder<S>
 | 
			
		||||
where
 | 
			
		||||
    S: Stream<Item=Result<Bytes, E>>,
 | 
			
		||||
    E: Into<Error>,
 | 
			
		||||
{
 | 
			
		||||
    type Item = [u8; 32];
 | 
			
		||||
    type Error = Error;
 | 
			
		||||
    type Item = Result<[u8; 32], Error>;
 | 
			
		||||
 | 
			
		||||
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
 | 
			
		||||
        let this = self.get_mut();
 | 
			
		||||
 | 
			
		||||
    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
 | 
			
		||||
        loop {
 | 
			
		||||
 | 
			
		||||
            if self.buffer.len() >= 32 {
 | 
			
		||||
 | 
			
		||||
                let left = self.buffer.split_to(32);
 | 
			
		||||
            if this.buffer.len() >= 32 {
 | 
			
		||||
                let left = this.buffer.split_to(32);
 | 
			
		||||
 | 
			
		||||
                let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
 | 
			
		||||
                unsafe {
 | 
			
		||||
                    (*digest.as_mut_ptr()).copy_from_slice(&left[..]);
 | 
			
		||||
                    return Ok(Async::Ready(Some(digest.assume_init())));
 | 
			
		||||
                    return Poll::Ready(Some(Ok(digest.assume_init())));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            match self.input.poll() {
 | 
			
		||||
                Err(err) => {
 | 
			
		||||
                    return Err(err.into());
 | 
			
		||||
            match Pin::new(&mut this.input).poll_next(cx) {
 | 
			
		||||
                Poll::Pending => {
 | 
			
		||||
                    return Poll::Pending;
 | 
			
		||||
                }
 | 
			
		||||
                Ok(Async::NotReady) => {
 | 
			
		||||
                    return Ok(Async::NotReady);
 | 
			
		||||
                Poll::Ready(Some(Err(err))) => {
 | 
			
		||||
                    return Poll::Ready(Some(Err(err.into())));
 | 
			
		||||
                }
 | 
			
		||||
                Ok(Async::Ready(None)) => {
 | 
			
		||||
                    let rest = self.buffer.len();
 | 
			
		||||
                    if rest == 0 { return Ok(Async::Ready(None)); }
 | 
			
		||||
                    return Err(format_err!("got small digest ({} != 32).", rest));
 | 
			
		||||
                }
 | 
			
		||||
                Ok(Async::Ready(Some(data))) => {
 | 
			
		||||
                    self.buffer.extend_from_slice(&data);
 | 
			
		||||
                Poll::Ready(Some(Ok(data))) => {
 | 
			
		||||
                    this.buffer.extend_from_slice(&data);
 | 
			
		||||
                    // continue
 | 
			
		||||
                }
 | 
			
		||||
                Poll::Ready(None) => {
 | 
			
		||||
                    let rest = this.buffer.len();
 | 
			
		||||
                    if rest == 0 {
 | 
			
		||||
                        return Poll::Ready(None);
 | 
			
		||||
                    }
 | 
			
		||||
                    return Poll::Ready(Some(Err(format_err!(
 | 
			
		||||
                        "got small digest ({} != 32).",
 | 
			
		||||
                        rest,
 | 
			
		||||
                    ))));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user