diff --git a/src/backup/index.rs b/src/backup/index.rs index e58636bf..ec455214 100644 --- a/src/backup/index.rs +++ b/src/backup/index.rs @@ -1,4 +1,6 @@ use std::collections::HashMap; +use std::pin::Pin; +use std::task::{Context, Poll}; use bytes::{Bytes, BytesMut}; use failure::*; @@ -93,55 +95,61 @@ impl std::io::Read for DigestListEncoder { /// /// The reader simply returns a birary stream of 32 byte digest values. -pub struct DigestListDecoder { +pub struct DigestListDecoder { input: S, buffer: BytesMut, } -impl DigestListDecoder { - +impl DigestListDecoder { pub fn new(input: S) -> Self { Self { input, buffer: BytesMut::new() } } } -impl Stream for DigestListDecoder - where S: Stream, - S::Error: Into, +impl Unpin for DigestListDecoder {} + +impl Stream for DigestListDecoder +where + S: Stream>, + E: Into, { - type Item = [u8; 32]; - type Error = Error; + type Item = Result<[u8; 32], Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); - fn poll(&mut self) -> Result>, Self::Error> { loop { - - if self.buffer.len() >= 32 { - - let left = self.buffer.split_to(32); + if this.buffer.len() >= 32 { + let left = this.buffer.split_to(32); let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit(); unsafe { (*digest.as_mut_ptr()).copy_from_slice(&left[..]); - return Ok(Async::Ready(Some(digest.assume_init()))); + return Poll::Ready(Some(Ok(digest.assume_init()))); } } - match self.input.poll() { - Err(err) => { - return Err(err.into()); + match Pin::new(&mut this.input).poll_next(cx) { + Poll::Pending => { + return Poll::Pending; } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Some(Err(err.into()))); } - Ok(Async::Ready(None)) => { - let rest = self.buffer.len(); - if rest == 0 { return Ok(Async::Ready(None)); } - return Err(format_err!("got small digest ({} != 32).", rest)); - } - Ok(Async::Ready(Some(data))) => { - self.buffer.extend_from_slice(&data); + Poll::Ready(Some(Ok(data))) => { + this.buffer.extend_from_slice(&data); // continue } + Poll::Ready(None) => { + let rest = this.buffer.len(); + if rest == 0 { + return Poll::Ready(None); + } + return Poll::Ready(Some(Err(format_err!( + "got small digest ({} != 32).", + rest, + )))); + } } } }