From 5b3911995b65b8f6cb13bde1c108860f34577e3f Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Fri, 23 Aug 2019 14:31:23 +0200 Subject: [PATCH] src/client/merge_known_chunks.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/client/merge_known_chunks.rs | 53 ++++++++++++++++---------------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/src/client/merge_known_chunks.rs b/src/client/merge_known_chunks.rs index ff3b3661..9359bffc 100644 --- a/src/client/merge_known_chunks.rs +++ b/src/client/merge_known_chunks.rs @@ -1,3 +1,6 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + use failure::*; use futures::*; @@ -19,7 +22,7 @@ pub struct MergeKnownChunksQueue { impl MergeKnownChunks for S where - S: Stream, + S: Stream>, { fn merge_known_chunks(self) -> MergeKnownChunksQueue { MergeKnownChunksQueue { @@ -31,60 +34,56 @@ where impl Stream for MergeKnownChunksQueue where - S: Stream, + S: Stream>, { - type Item = MergedChunkInfo; - type Error = Error; + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; - fn poll(&mut self) -> Poll, Error> { loop { - match self.input.poll() { - Err(err) => { - return Err(err); - } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); - } - Ok(Async::Ready(None)) => { - if let Some(last) = self.buffer.take() { - return Ok(Async::Ready(Some(last))); + match ready!(unsafe { Pin::new_unchecked(&mut this.input) }.poll_next(cx)) { + Some(Err(err)) => return Poll::Ready(Some(Err(err))), + None => { + if let Some(last) = this.buffer.take() { + return Poll::Ready(Some(Ok(last))); } else { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } } - Ok(Async::Ready(Some(mergerd_chunk_info))) => { + Some(Ok(mergerd_chunk_info)) => { match mergerd_chunk_info { MergedChunkInfo::Known(list) => { - let last = self.buffer.take(); + let last = this.buffer.take(); match last { None => { - self.buffer = Some(MergedChunkInfo::Known(list)); + this.buffer = Some(MergedChunkInfo::Known(list)); // continue } Some(MergedChunkInfo::Known(mut last_list)) => { last_list.extend_from_slice(&list); let len = last_list.len(); - self.buffer = Some(MergedChunkInfo::Known(last_list)); + this.buffer = Some(MergedChunkInfo::Known(last_list)); if len >= 64 { - return Ok(Async::Ready(self.buffer.take())); + return Poll::Ready(this.buffer.take().map(Ok)); } // continue } Some(MergedChunkInfo::New(_)) => { - self.buffer = Some(MergedChunkInfo::Known(list)); - return Ok(Async::Ready(last)); + this.buffer = Some(MergedChunkInfo::Known(list)); + return Poll::Ready(last.map(Ok)); } } } MergedChunkInfo::New(chunk_info) => { let new = MergedChunkInfo::New(chunk_info); - if let Some(last) = self.buffer.take() { - self.buffer = Some(new); - return Ok(Async::Ready(Some(last))); + if let Some(last) = this.buffer.take() { + this.buffer = Some(new); + return Poll::Ready(Some(Ok(last))); } else { - return Ok(Async::Ready(Some(new))); + return Poll::Ready(Some(Ok(new))); } } }