drop Cancellable future in favor of abortable
futures-0.3 has a futures::future::abortable() function which does the exact same, returns an Abortable future with an AbortHandle providing an abort() method. Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
parent
8554ac5ec3
commit
dc08934563
|
@ -5,11 +5,11 @@ use std::sync::Arc;
|
||||||
use std::os::unix::fs::OpenOptionsExt;
|
use std::os::unix::fs::OpenOptionsExt;
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
use futures::future::AbortHandle;
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
|
|
||||||
use proxmox::tools::digest_to_hex;
|
use proxmox::tools::digest_to_hex;
|
||||||
|
|
||||||
use crate::tools::futures::Canceller;
|
|
||||||
use crate::backup::*;
|
use crate::backup::*;
|
||||||
|
|
||||||
use super::{HttpClient, H2Client};
|
use super::{HttpClient, H2Client};
|
||||||
|
@ -17,21 +17,21 @@ use super::{HttpClient, H2Client};
|
||||||
/// Backup Reader
|
/// Backup Reader
|
||||||
pub struct BackupReader {
|
pub struct BackupReader {
|
||||||
h2: H2Client,
|
h2: H2Client,
|
||||||
canceller: Canceller,
|
abort: AbortHandle,
|
||||||
crypt_config: Option<Arc<CryptConfig>>,
|
crypt_config: Option<Arc<CryptConfig>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for BackupReader {
|
impl Drop for BackupReader {
|
||||||
|
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.canceller.cancel();
|
self.abort.abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackupReader {
|
impl BackupReader {
|
||||||
|
|
||||||
fn new(h2: H2Client, canceller: Canceller, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
|
fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>) -> Arc<Self> {
|
||||||
Arc::new(Self { h2, canceller, crypt_config})
|
Arc::new(Self { h2, abort, crypt_config})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new instance by upgrading the connection at '/api2/json/reader'
|
/// Create a new instance by upgrading the connection at '/api2/json/reader'
|
||||||
|
@ -54,9 +54,9 @@ impl BackupReader {
|
||||||
});
|
});
|
||||||
let req = HttpClient::request_builder(client.server(), "GET", "/api2/json/reader", Some(param)).unwrap();
|
let req = HttpClient::request_builder(client.server(), "GET", "/api2/json/reader", Some(param)).unwrap();
|
||||||
|
|
||||||
let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
|
let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
|
||||||
|
|
||||||
Ok(BackupReader::new(h2, canceller, crypt_config))
|
Ok(BackupReader::new(h2, abort, crypt_config))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute a GET request
|
/// Execute a GET request
|
||||||
|
@ -119,7 +119,7 @@ impl BackupReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn force_close(self) {
|
pub fn force_close(self) {
|
||||||
self.canceller.cancel();
|
self.abort.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Download backup manifest (index.json)
|
/// Download backup manifest (index.json)
|
||||||
|
|
|
@ -6,6 +6,7 @@ use failure::*;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use futures::*;
|
use futures::*;
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
|
use futures::future::AbortHandle;
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
@ -14,19 +15,18 @@ use proxmox::tools::digest_to_hex;
|
||||||
|
|
||||||
use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
|
use super::merge_known_chunks::{MergedChunkInfo, MergeKnownChunks};
|
||||||
use crate::backup::*;
|
use crate::backup::*;
|
||||||
use crate::tools::futures::Canceller;
|
|
||||||
|
|
||||||
use super::{HttpClient, H2Client};
|
use super::{HttpClient, H2Client};
|
||||||
|
|
||||||
pub struct BackupWriter {
|
pub struct BackupWriter {
|
||||||
h2: H2Client,
|
h2: H2Client,
|
||||||
canceller: Canceller,
|
abort: AbortHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for BackupWriter {
|
impl Drop for BackupWriter {
|
||||||
|
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.canceller.cancel();
|
self.abort.abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,8 +37,8 @@ pub struct BackupStats {
|
||||||
|
|
||||||
impl BackupWriter {
|
impl BackupWriter {
|
||||||
|
|
||||||
fn new(h2: H2Client, canceller: Canceller) -> Arc<Self> {
|
fn new(h2: H2Client, abort: AbortHandle) -> Arc<Self> {
|
||||||
Arc::new(Self { h2, canceller })
|
Arc::new(Self { h2, abort })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start(
|
pub async fn start(
|
||||||
|
@ -61,9 +61,9 @@ impl BackupWriter {
|
||||||
let req = HttpClient::request_builder(
|
let req = HttpClient::request_builder(
|
||||||
client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
|
client.server(), "GET", "/api2/json/backup", Some(param)).unwrap();
|
||||||
|
|
||||||
let (h2, canceller) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
|
let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
|
||||||
|
|
||||||
Ok(BackupWriter::new(h2, canceller))
|
Ok(BackupWriter::new(h2, abort))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get(
|
pub async fn get(
|
||||||
|
@ -129,13 +129,13 @@ impl BackupWriter {
|
||||||
|
|
||||||
h2.post("finish", None)
|
h2.post("finish", None)
|
||||||
.map_ok(move |_| {
|
.map_ok(move |_| {
|
||||||
self.canceller.cancel();
|
self.abort.abort();
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cancel(&self) {
|
pub fn cancel(&self) {
|
||||||
self.canceller.cancel();
|
self.abort.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload_blob<R: std::io::Read>(
|
pub async fn upload_blob<R: std::io::Read>(
|
||||||
|
|
|
@ -20,7 +20,6 @@ use proxmox::tools::{
|
||||||
|
|
||||||
use super::pipe_to_stream::PipeToSendStream;
|
use super::pipe_to_stream::PipeToSendStream;
|
||||||
use crate::tools::async_io::EitherStream;
|
use crate::tools::async_io::EitherStream;
|
||||||
use crate::tools::futures::{cancellable, Canceller};
|
|
||||||
use crate::tools::{self, tty, BroadcastFuture, DEFAULT_ENCODE_SET};
|
use crate::tools::{self, tty, BroadcastFuture, DEFAULT_ENCODE_SET};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -287,7 +286,7 @@ impl HttpClient {
|
||||||
&self,
|
&self,
|
||||||
mut req: Request<Body>,
|
mut req: Request<Body>,
|
||||||
protocol_name: String,
|
protocol_name: String,
|
||||||
) -> Result<(H2Client, Canceller), Error> {
|
) -> Result<(H2Client, futures::future::AbortHandle), Error> {
|
||||||
|
|
||||||
let auth = self.login().await?;
|
let auth = self.login().await?;
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
|
@ -323,7 +322,7 @@ impl HttpClient {
|
||||||
let connection = connection
|
let connection = connection
|
||||||
.map_err(|_| panic!("HTTP/2.0 connection failed"));
|
.map_err(|_| panic!("HTTP/2.0 connection failed"));
|
||||||
|
|
||||||
let (connection, canceller) = cancellable(connection)?;
|
let (connection, abort) = futures::future::abortable(connection);
|
||||||
// A cancellable future returns an Option which is None when cancelled and
|
// A cancellable future returns an Option which is None when cancelled and
|
||||||
// Some when it finished instead, since we don't care about the return type we
|
// Some when it finished instead, since we don't care about the return type we
|
||||||
// need to map it away:
|
// need to map it away:
|
||||||
|
@ -334,7 +333,7 @@ impl HttpClient {
|
||||||
|
|
||||||
// Wait until the `SendRequest` handle has available capacity.
|
// Wait until the `SendRequest` handle has available capacity.
|
||||||
let c = h2.ready().await?;
|
let c = h2.ready().await?;
|
||||||
Ok((H2Client::new(c), canceller))
|
Ok((H2Client::new(c), abort))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn credentials(
|
async fn credentials(
|
||||||
|
|
|
@ -23,7 +23,6 @@ pub mod async_io;
|
||||||
pub mod borrow;
|
pub mod borrow;
|
||||||
pub mod daemon;
|
pub mod daemon;
|
||||||
pub mod fs;
|
pub mod fs;
|
||||||
pub mod futures;
|
|
||||||
pub mod runtime;
|
pub mod runtime;
|
||||||
pub mod ticket;
|
pub mod ticket;
|
||||||
pub mod timer;
|
pub mod timer;
|
||||||
|
|
|
@ -1,109 +0,0 @@
|
||||||
//! Provides utilities to deal with futures, such as a `Cancellable` future.
|
|
||||||
|
|
||||||
use std::future::Future;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
|
|
||||||
use failure::Error;
|
|
||||||
use futures::future::FutureExt;
|
|
||||||
use tokio::sync::oneshot;
|
|
||||||
|
|
||||||
/// Make a future cancellable.
|
|
||||||
///
|
|
||||||
/// This simply performs a `select()` on the future and something waiting for a signal. If the
|
|
||||||
/// future finishes successfully, it yields `Some(T::Item)`. If it was cancelled, it'll yield
|
|
||||||
/// `None`.
|
|
||||||
///
|
|
||||||
/// In order to cancel the future, a `Canceller` is used.
|
|
||||||
///
|
|
||||||
/// ```no_run
|
|
||||||
/// # use std::future::Future;
|
|
||||||
/// # use failure::Error;
|
|
||||||
/// # use futures::future::FutureExt;
|
|
||||||
/// # use proxmox_backup::tools::futures::Cancellable;
|
|
||||||
/// # fn doc<T>(future: T) -> Result<(), Error>
|
|
||||||
/// # where
|
|
||||||
/// # T: Future<Output = i32> + Unpin + Send + Sync + 'static,
|
|
||||||
/// # {
|
|
||||||
/// let (future, canceller) = Cancellable::new(future)?;
|
|
||||||
/// tokio::spawn(future.map(|res| {
|
|
||||||
/// match res {
|
|
||||||
/// Some(value) => println!("Future finished with {}", value),
|
|
||||||
/// None => println!("Future was cancelled"),
|
|
||||||
/// }
|
|
||||||
/// }));
|
|
||||||
/// // Do something
|
|
||||||
/// canceller.cancel();
|
|
||||||
/// # Ok(())
|
|
||||||
/// # }
|
|
||||||
/// ```
|
|
||||||
pub struct Cancellable<T: Future + Unpin> {
|
|
||||||
/// Our core: we're waiting on a future, on on a lock. The cancel method just unlocks the
|
|
||||||
/// lock, so that our LockFuture finishes.
|
|
||||||
inner: futures::future::Select<T, oneshot::Receiver<()>>,
|
|
||||||
|
|
||||||
/// When this future is created, this holds a guard. When a `Canceller` wants to cancel the
|
|
||||||
/// future, it'll drop this guard, causing our inner future to resolve to `None`.
|
|
||||||
sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Reference to a cancellable future. Multiple instances may exist simultaneously.
|
|
||||||
///
|
|
||||||
/// This allows cancelling another future. If the future already finished, nothing happens.
|
|
||||||
///
|
|
||||||
/// This can be cloned to be used in multiple places.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Canceller(Arc<Mutex<Option<oneshot::Sender<()>>>>);
|
|
||||||
|
|
||||||
impl Canceller {
|
|
||||||
/// Cancel the associated future.
|
|
||||||
///
|
|
||||||
/// This does nothing if the future already finished successfully.
|
|
||||||
pub fn cancel(&self) {
|
|
||||||
if let Some(sender) = self.0.lock().unwrap().take() {
|
|
||||||
let _ = sender.send(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Future + Unpin> Cancellable<T> {
|
|
||||||
/// Make a future cancellable.
|
|
||||||
///
|
|
||||||
/// Returns a future and a `Canceller` which can be cloned and used later to cancel the future.
|
|
||||||
pub fn new(inner: T) -> Result<(Self, Canceller), Error> {
|
|
||||||
// we don't even need to store the mutex...
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
let this = Self {
|
|
||||||
inner: futures::future::select(inner, rx),
|
|
||||||
sender: Arc::new(Mutex::new(Some(tx))),
|
|
||||||
};
|
|
||||||
|
|
||||||
let canceller = this.canceller();
|
|
||||||
Ok((this, canceller))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create another `Canceller` for this future.
|
|
||||||
pub fn canceller(&self) -> Canceller {
|
|
||||||
Canceller(Arc::clone(&self.sender))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Make a future cancellable.
|
|
||||||
///
|
|
||||||
/// This is a shortcut for `Cancellable::new`
|
|
||||||
pub fn cancellable<T: Future + Unpin>(future: T) -> Result<(Cancellable<T>, Canceller), Error> {
|
|
||||||
Cancellable::new(future)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Future + Unpin> Future for Cancellable<T> {
|
|
||||||
type Output = Option<<T as Future>::Output>;
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
|
||||||
use futures::future::Either;
|
|
||||||
match self.inner.poll_unpin(cx) {
|
|
||||||
Poll::Ready(Either::Left((output, _))) => Poll::Ready(Some(output)),
|
|
||||||
Poll::Ready(Either::Right(_)) => Poll::Ready(None),
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue