From cb4426b3481cc7aa716badd66e8d05a4c576665c Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Wed, 5 Jun 2019 09:45:59 +0200 Subject: [PATCH] make h2 client connection cancellable Signed-off-by: Wolfgang Bumiller --- src/bin/proxmox-backup-client.rs | 1 + src/client/http_client.rs | 26 +++++++++++++++++++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 7e590972..5d5b85e7 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -476,6 +476,7 @@ fn create_backup( } client.finish().wait()?; + client.force_close(); let end_time = Local.timestamp(Local::now().timestamp(), 0); let elapsed = end_time.signed_duration_since(backup_time); diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 7deb63d1..1cd8a091 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -20,6 +20,7 @@ use serde_json::{json, Value}; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; use crate::tools::{self, BroadcastFuture, tty}; +use crate::tools::futures::{cancellable, Canceller}; use super::pipe_to_stream::*; use super::merge_known_chunks::*; @@ -291,14 +292,21 @@ impl HttpClient { let connection = connection .map_err(|_| panic!("HTTP/2.0 connection failed")); + let (connection, canceller) = cancellable(connection)?; + // A cancellable future returns an Option which is None when cancelled and + // Some when it finished instead, since we don't care about the return type we + // need to map it away: + let connection = connection.map(|_| ()); + // Spawn a new task to drive the connection state hyper::rt::spawn(connection); // Wait until the `SendRequest` handle has available capacity. - h2.ready() - .map(BackupClient::new) - .map_err(Error::from) + Ok(h2.ready() + .map(move |c| BackupClient::new(c, canceller)) + .map_err(Error::from)) }) + .flatten() }) } @@ -407,13 +415,21 @@ impl HttpClient { //#[derive(Clone)] pub struct BackupClient { h2: H2Client, + canceller: Option, } impl BackupClient { - pub fn new(h2: h2::client::SendRequest) -> Self { - Self { h2: H2Client::new(h2) } + pub fn new(h2: h2::client::SendRequest, canceller: Canceller) -> Self { + Self { + h2: H2Client::new(h2), + canceller: Some(canceller), + } + } + + pub fn force_close(mut self) { + self.canceller.take().unwrap().cancel(); } pub fn get(&self, path: &str, param: Option) -> impl Future {