src/client/merge_known_chunks.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-23 14:31:23 +02:00
parent a6782ca10b
commit 5b3911995b
1 changed files with 26 additions and 27 deletions

View File

@ -1,3 +1,6 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use failure::*; use failure::*;
use futures::*; use futures::*;
@ -19,7 +22,7 @@ pub struct MergeKnownChunksQueue<S> {
impl<S> MergeKnownChunks for S impl<S> MergeKnownChunks for S
where where
S: Stream<Item = MergedChunkInfo, Error = Error>, S: Stream<Item = Result<MergedChunkInfo, Error>>,
{ {
fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self> { fn merge_known_chunks(self) -> MergeKnownChunksQueue<Self> {
MergeKnownChunksQueue { MergeKnownChunksQueue {
@ -31,60 +34,56 @@ where
impl<S> Stream for MergeKnownChunksQueue<S> impl<S> Stream for MergeKnownChunksQueue<S>
where where
S: Stream<Item = MergedChunkInfo, Error = Error>, S: Stream<Item = Result<MergedChunkInfo, Error>>,
{ {
type Item = MergedChunkInfo; type Item = Result<MergedChunkInfo, Error>;
type Error = Error;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = unsafe { self.get_unchecked_mut() };
fn poll(&mut self) -> Poll<Option<MergedChunkInfo>, Error> {
loop { loop {
match self.input.poll() { match ready!(unsafe { Pin::new_unchecked(&mut this.input) }.poll_next(cx)) {
Err(err) => { Some(Err(err)) => return Poll::Ready(Some(Err(err))),
return Err(err); None => {
} if let Some(last) = this.buffer.take() {
Ok(Async::NotReady) => { return Poll::Ready(Some(Ok(last)));
return Ok(Async::NotReady);
}
Ok(Async::Ready(None)) => {
if let Some(last) = self.buffer.take() {
return Ok(Async::Ready(Some(last)));
} else { } else {
return Ok(Async::Ready(None)); return Poll::Ready(None);
} }
} }
Ok(Async::Ready(Some(mergerd_chunk_info))) => { Some(Ok(mergerd_chunk_info)) => {
match mergerd_chunk_info { match mergerd_chunk_info {
MergedChunkInfo::Known(list) => { MergedChunkInfo::Known(list) => {
let last = self.buffer.take(); let last = this.buffer.take();
match last { match last {
None => { None => {
self.buffer = Some(MergedChunkInfo::Known(list)); this.buffer = Some(MergedChunkInfo::Known(list));
// continue // continue
} }
Some(MergedChunkInfo::Known(mut last_list)) => { Some(MergedChunkInfo::Known(mut last_list)) => {
last_list.extend_from_slice(&list); last_list.extend_from_slice(&list);
let len = last_list.len(); let len = last_list.len();
self.buffer = Some(MergedChunkInfo::Known(last_list)); this.buffer = Some(MergedChunkInfo::Known(last_list));
if len >= 64 { if len >= 64 {
return Ok(Async::Ready(self.buffer.take())); return Poll::Ready(this.buffer.take().map(Ok));
} }
// continue // continue
} }
Some(MergedChunkInfo::New(_)) => { Some(MergedChunkInfo::New(_)) => {
self.buffer = Some(MergedChunkInfo::Known(list)); this.buffer = Some(MergedChunkInfo::Known(list));
return Ok(Async::Ready(last)); return Poll::Ready(last.map(Ok));
} }
} }
} }
MergedChunkInfo::New(chunk_info) => { MergedChunkInfo::New(chunk_info) => {
let new = MergedChunkInfo::New(chunk_info); let new = MergedChunkInfo::New(chunk_info);
if let Some(last) = self.buffer.take() { if let Some(last) = this.buffer.take() {
self.buffer = Some(new); this.buffer = Some(new);
return Ok(Async::Ready(Some(last))); return Poll::Ready(Some(Ok(last)));
} else { } else {
return Ok(Async::Ready(Some(new))); return Poll::Ready(Some(Ok(new)));
} }
} }
} }