src/tools/async_mutex.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-23 12:20:49 +02:00
parent 75fef4b463
commit 627bb7d114

View File

@ -1,50 +1,46 @@
use std::marker::PhantomData; use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use failure::{bail, Error}; use failure::Error;
use futures::{Async, Poll}; use futures::future::FutureExt;
use futures::future::Future; use tokio::sync::Lock as TokioLock;
use tokio::sync::lock::Lock as TokioLock;
pub use tokio::sync::lock::LockGuard as AsyncLockGuard;
pub struct AsyncMutex<T>(TokioLock<T>); pub use tokio::sync::LockGuard as AsyncLockGuard;
unsafe impl<T> Sync for AsyncMutex<T> {} pub struct AsyncMutex<T: Send>(TokioLock<T>);
impl<T> AsyncMutex<T> { unsafe impl<T: Send> Sync for AsyncMutex<T> {}
impl<T: Send + 'static> AsyncMutex<T> {
pub fn new(value: T) -> Self { pub fn new(value: T) -> Self {
Self(TokioLock::new(value)) Self(TokioLock::new(value))
} }
// <E> to allow any error type (we never error, so we have no error type of our own) pub fn lock(&self) -> LockFuture<T> {
pub fn lock<E>(&self) -> LockFuture<T, E> { let mut lock = self.0.clone();
LockFuture { LockFuture {
lock: self.0.clone(), lock: async move { lock.lock().await }.boxed(),
_error: PhantomData,
} }
} }
// FIXME: remove Result<> from this.
pub fn new_locked(value: T) -> Result<(Self, AsyncLockGuard<T>), Error> { pub fn new_locked(value: T) -> Result<(Self, AsyncLockGuard<T>), Error> {
let mut this = Self::new(value); let mut this = Self::new(value);
let guard = match this.0.poll_lock() { let guard = futures::executor::block_on(this.0.lock());
Async::Ready(guard) => guard,
_ => bail!("failed to create locked mutex"),
};
Ok((this, guard)) Ok((this, guard))
} }
} }
/// Represents a lock to be held in the future: /// Represents a lock to be held in the future:
pub struct LockFuture<T, E> { pub struct LockFuture<T: Send + 'static> {
lock: TokioLock<T>, lock: Pin<Box<dyn Future<Output = AsyncLockGuard<T>> + Send + 'static>>,
// We can't error and we don't want to enforce a specific error type either
_error: PhantomData<E>,
} }
impl<T, E> Future for LockFuture<T, E> { impl<T: Send + 'static> Future for LockFuture<T> {
type Item = AsyncLockGuard<T>; type Output = AsyncLockGuard<T>;
type Error = E;
fn poll(&mut self) -> Poll<AsyncLockGuard<T>, E> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<AsyncLockGuard<T>> {
Ok(self.lock.poll_lock()) self.lock.poll_unpin(cx)
} }
} }