From e668912a9938c8fe42cb811eaf9e0501fe9385a5 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Fri, 23 Aug 2019 12:22:15 +0200 Subject: [PATCH] src/tools/futures.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/tools/futures.rs | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/src/tools/futures.rs b/src/tools/futures.rs index 8cea6f95..19cd2a16 100644 --- a/src/tools/futures.rs +++ b/src/tools/futures.rs @@ -1,11 +1,14 @@ //! Provides utilities to deal with futures, such as a `Cancellable` future. +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use failure::Error; -use futures::{Async, Future, Poll}; +use futures::future::FutureExt; -use crate::tools::async_mutex::{AsyncMutex, AsyncLockGuard, LockFuture}; +use crate::tools::async_mutex::{AsyncLockGuard, AsyncMutex, LockFuture}; /// Make a future cancellable. /// @@ -16,30 +19,30 @@ use crate::tools::async_mutex::{AsyncMutex, AsyncLockGuard, LockFuture}; /// In order to cancel the future, a `Canceller` is used. /// /// ```no_run +/// # use std::future::Future; /// # use failure::Error; -/// # use futures::Future; +/// # use futures::future::FutureExt; /// # use proxmox_backup::tools::futures::Cancellable; /// # fn doc(future: T) -> Result<(), Error> /// # where -/// # T: Future + Send + Sync + 'static, +/// # T: Future + Unpin + Send + Sync + 'static, /// # { /// let (future, canceller) = Cancellable::new(future)?; -/// tokio::spawn(future.and_then(|res| { +/// tokio::spawn(future.map(|res| { /// match res { /// Some(value) => println!("Future finished with {}", value), /// None => println!("Future was cancelled"), /// } -/// Ok(()) /// })); /// // Do something /// canceller.cancel(); /// # Ok(()) /// # } /// ``` -pub struct Cancellable { +pub struct Cancellable { /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the /// lock, so that our LockFuture finishes. - inner: futures::future::Select2::Error>>, + inner: futures::future::Select>, /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the /// future, it'll drop this guard, causing our inner future to resolve to `None`. @@ -63,7 +66,7 @@ impl Canceller { } } -impl Cancellable { +impl Cancellable { /// Make a future cancellable. /// /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future. @@ -71,7 +74,7 @@ impl Cancellable { // we don't even need to sture the mutex... let (mutex, guard) = AsyncMutex::new_locked(())?; let this = Self { - inner: inner.select2(mutex.lock()), + inner: futures::future::select(inner, mutex.lock()), guard: Arc::new(Mutex::new(Some(guard))), }; let canceller = this.canceller(); @@ -87,22 +90,18 @@ impl Cancellable { /// Make a future cancellable. /// /// This is a shortcut for `Cancellable::new` -pub fn cancellable(future: T) -> Result<(Cancellable, Canceller), Error> { +pub fn cancellable(future: T) -> Result<(Cancellable, Canceller), Error> { Cancellable::new(future) } -impl Future for Cancellable { - type Item = Option<::Item>; - type Error = ::Error; - - fn poll(&mut self) -> Poll { +impl Future for Cancellable { + type Output = Option<::Output>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use futures::future::Either; - match self.inner.poll() { - Ok(Async::Ready(Either::A((item, _)))) => Ok(Async::Ready(Some(item))), - Ok(Async::Ready(Either::B(_))) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(Either::A((err, _))) => Err(err), - Err(Either::B((err, _))) => Err(err), + match self.inner.poll_unpin(cx) { + Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)), + Poll::Ready(Either::Right(_)) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } }