src/tools/futures.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-23 12:22:15 +02:00
parent 0f5856acca
commit e668912a99
1 changed files with 21 additions and 22 deletions

View File

@ -1,11 +1,14 @@
//! Provides utilities to deal with futures, such as a `Cancellable` future. //! Provides utilities to deal with futures, such as a `Cancellable` future.
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use failure::Error; use failure::Error;
use futures::{Async, Future, Poll}; use futures::future::FutureExt;
use crate::tools::async_mutex::{AsyncMutex, AsyncLockGuard, LockFuture}; use crate::tools::async_mutex::{AsyncLockGuard, AsyncMutex, LockFuture};
/// Make a future cancellable. /// Make a future cancellable.
/// ///
@ -16,30 +19,30 @@ use crate::tools::async_mutex::{AsyncMutex, AsyncLockGuard, LockFuture};
/// In order to cancel the future, a `Canceller` is used. /// In order to cancel the future, a `Canceller` is used.
/// ///
/// ```no_run /// ```no_run
/// # use std::future::Future;
/// # use failure::Error; /// # use failure::Error;
/// # use futures::Future; /// # use futures::future::FutureExt;
/// # use proxmox_backup::tools::futures::Cancellable; /// # use proxmox_backup::tools::futures::Cancellable;
/// # fn doc<T>(future: T) -> Result<(), Error> /// # fn doc<T>(future: T) -> Result<(), Error>
/// # where /// # where
/// # T: Future<Item = i32, Error = ()> + Send + Sync + 'static, /// # T: Future<Output = i32> + Unpin + Send + Sync + 'static,
/// # { /// # {
/// let (future, canceller) = Cancellable::new(future)?; /// let (future, canceller) = Cancellable::new(future)?;
/// tokio::spawn(future.and_then(|res| { /// tokio::spawn(future.map(|res| {
/// match res { /// match res {
/// Some(value) => println!("Future finished with {}", value), /// Some(value) => println!("Future finished with {}", value),
/// None => println!("Future was cancelled"), /// None => println!("Future was cancelled"),
/// } /// }
/// Ok(())
/// })); /// }));
/// // Do something /// // Do something
/// canceller.cancel(); /// canceller.cancel();
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub struct Cancellable<T: Future> { pub struct Cancellable<T: Future + Unpin> {
/// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the /// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
/// lock, so that our LockFuture finishes. /// lock, so that our LockFuture finishes.
inner: futures::future::Select2<T, LockFuture<(), <T as Future>::Error>>, inner: futures::future::Select<T, LockFuture<()>>,
/// When this future is created, this holds a guard. When a `Canceller` wants to cancel the /// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
/// future, it'll drop this guard, causing our inner future to resolve to `None`. /// future, it'll drop this guard, causing our inner future to resolve to `None`.
@ -63,7 +66,7 @@ impl Canceller {
} }
} }
impl<T: Future> Cancellable<T> { impl<T: Future + Unpin> Cancellable<T> {
/// Make a future cancellable. /// Make a future cancellable.
/// ///
/// Returns a future and a `Canceller` which can be cloned and used later to cancel the future. /// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
@ -71,7 +74,7 @@ impl<T: Future> Cancellable<T> {
// we don't even need to sture the mutex... // we don't even need to sture the mutex...
let (mutex, guard) = AsyncMutex::new_locked(())?; let (mutex, guard) = AsyncMutex::new_locked(())?;
let this = Self { let this = Self {
inner: inner.select2(mutex.lock()), inner: futures::future::select(inner, mutex.lock()),
guard: Arc::new(Mutex::new(Some(guard))), guard: Arc::new(Mutex::new(Some(guard))),
}; };
let canceller = this.canceller(); let canceller = this.canceller();
@ -87,22 +90,18 @@ impl<T: Future> Cancellable<T> {
/// Make a future cancellable. /// Make a future cancellable.
/// ///
/// This is a shortcut for `Cancellable::new` /// This is a shortcut for `Cancellable::new`
pub fn cancellable<T: Future>(future: T) -> Result<(Cancellable<T>, Canceller), Error> { pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> {
Cancellable::new(future) Cancellable::new(future)
} }
impl<T: Future> Future for Cancellable<T> { impl<T: Future + Unpin> Future for Cancellable<T> {
type Item = Option<<T as Future>::Item>; type Output = Option<<T as Future>::Output>;
type Error = <T as Future>::Error; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
use futures::future::Either; use futures::future::Either;
match self.inner.poll() { match self.inner.poll_unpin(cx) {
Ok(Async::Ready(Either::A((item, _)))) => Ok(Async::Ready(Some(item))), Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)),
Ok(Async::Ready(Either::B(_))) => Ok(Async::Ready(None)), Poll::Ready(Either::Right(_)) => Poll::Ready(None),
Ok(Async::NotReady) => Ok(Async::NotReady), Poll::Pending => Poll::Pending,
Err(Either::A((err, _))) => Err(err),
Err(Either::B((err, _))) => Err(err),
} }
} }
} }