make h2 client connection cancellable

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-06-05 09:45:59 +02:00
parent e3975c2c47
commit cb4426b348
2 changed files with 22 additions and 5 deletions

View File

@ -476,6 +476,7 @@ fn create_backup(
} }
client.finish().wait()?; client.finish().wait()?;
client.force_close();
let end_time = Local.timestamp(Local::now().timestamp(), 0); let end_time = Local.timestamp(Local::now().timestamp(), 0);
let elapsed = end_time.signed_duration_since(backup_time); let elapsed = end_time.signed_duration_since(backup_time);

View File

@ -20,6 +20,7 @@ use serde_json::{json, Value};
use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
use crate::tools::{self, BroadcastFuture, tty}; use crate::tools::{self, BroadcastFuture, tty};
use crate::tools::futures::{cancellable, Canceller};
use super::pipe_to_stream::*; use super::pipe_to_stream::*;
use super::merge_known_chunks::*; use super::merge_known_chunks::*;
@ -291,14 +292,21 @@ impl HttpClient {
let connection = connection let connection = connection
.map_err(|_| panic!("HTTP/2.0 connection failed")); .map_err(|_| panic!("HTTP/2.0 connection failed"));
let (connection, canceller) = cancellable(connection)?;
// A cancellable future returns an Option which is None when cancelled and
// Some when it finished instead, since we don't care about the return type we
// need to map it away:
let connection = connection.map(|_| ());
// Spawn a new task to drive the connection state // Spawn a new task to drive the connection state
hyper::rt::spawn(connection); hyper::rt::spawn(connection);
// Wait until the `SendRequest` handle has available capacity. // Wait until the `SendRequest` handle has available capacity.
h2.ready() Ok(h2.ready()
.map(BackupClient::new) .map(move |c| BackupClient::new(c, canceller))
.map_err(Error::from) .map_err(Error::from))
}) })
.flatten()
}) })
} }
@ -407,13 +415,21 @@ impl HttpClient {
//#[derive(Clone)] //#[derive(Clone)]
pub struct BackupClient { pub struct BackupClient {
h2: H2Client, h2: H2Client,
canceller: Option<Canceller>,
} }
impl BackupClient { impl BackupClient {
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self { pub fn new(h2: h2::client::SendRequest<bytes::Bytes>, canceller: Canceller) -> Self {
Self { h2: H2Client::new(h2) } Self {
h2: H2Client::new(h2),
canceller: Some(canceller),
}
}
pub fn force_close(mut self) {
self.canceller.take().unwrap().cancel();
} }
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> { pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {