src/client/http_client.rs: use async for more functions

This commit is contained in:
Dietmar Maurer 2019-09-04 12:47:01 +02:00
parent 96f5e80abb
commit 1a7a0e74c0

View File

@ -143,7 +143,7 @@ impl HttpClient {
}) })
} }
/// Login future /// Login
/// ///
/// Login is done on demand, so this is onyl required if you need /// Login is done on demand, so this is onyl required if you need
/// access to authentication data in 'AuthInfo'. /// access to authentication data in 'AuthInfo'.
@ -188,97 +188,85 @@ impl HttpClient {
.build::<_, Body>(https) .build::<_, Body>(https)
} }
pub fn request(&self, mut req: Request<Body>) -> impl Future<Output = Result<Value, Error>> { pub async fn request(&self, mut req: Request<Body>) -> Result<Value, Error> {
let login = self.auth.listen();
let client = self.client.clone(); let client = self.client.clone();
login.and_then(move |auth| { let auth = self.login().await?;
let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap()); req.headers_mut().insert("CSRFPreventionToken", HeaderValue::from_str(&auth.token).unwrap());
let request = Self::api_request(client, req); Self::api_request(client, req).await
request
})
} }
pub fn get( pub async fn get(
&self, &self,
path: &str, path: &str,
data: Option<Value>, data: Option<Value>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, "GET", path, data).unwrap(); let req = Self::request_builder(&self.server, "GET", path, data).unwrap();
self.request(req) self.request(req).await
} }
pub fn delete( pub async fn delete(
&mut self, &mut self,
path: &str, path: &str,
data: Option<Value>, data: Option<Value>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap(); let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap();
self.request(req) self.request(req).await
} }
pub fn post( pub async fn post(
&mut self, &mut self,
path: &str, path: &str,
data: Option<Value>, data: Option<Value>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
let req = Self::request_builder(&self.server, "POST", path, data).unwrap(); let req = Self::request_builder(&self.server, "POST", path, data).unwrap();
self.request(req) self.request(req).await
} }
pub fn download<W: Write + Send + 'static>( pub async fn download(
&mut self, &mut self,
path: &str, path: &str,
output: W, output: &mut (dyn Write + Send),
) -> impl Future<Output = Result<W, Error>> { ) -> Result<(), Error> {
let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap(); let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap();
let login = self.auth.listen();
let client = self.client.clone(); let client = self.client.clone();
login.and_then(move |auth| { let auth = self.login().await?;
let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
client.request(req) let resp = client.request(req).await?;
let status = resp.status();
if !status.is_success() {
HttpClient::api_response(resp)
.map(|_| Err(format_err!("unknown error")))
.await?
} else {
resp.into_body()
.map_err(Error::from) .map_err(Error::from)
.and_then(|resp| { .try_fold(output, move |acc, chunk| async move {
let status = resp.status(); acc.write_all(&chunk)?;
if !status.is_success() { Ok::<_, Error>(acc)
future::Either::Left(
HttpClient::api_response(resp)
.map(|_| Err(format_err!("unknown error")))
)
} else {
future::Either::Right(
resp.into_body()
.map_err(Error::from)
.try_fold(output, move |mut acc, chunk| async move {
acc.write_all(&chunk)?;
Ok::<_, Error>(acc)
})
)
}
}) })
}) .await?;
}
Ok(())
} }
pub fn upload( pub async fn upload(
&mut self, &mut self,
content_type: &str, content_type: &str,
body: Body, body: Body,
path: &str, path: &str,
data: Option<Value>, data: Option<Value>,
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
let path = path.trim_matches('/'); let path = path.trim_matches('/');
let mut url = format!("https://{}:8007/{}", &self.server, path); let mut url = format!("https://{}:8007/{}", &self.server, path);
@ -298,17 +286,17 @@ impl HttpClient {
.header("Content-Type", content_type) .header("Content-Type", content_type)
.body(body).unwrap(); .body(body).unwrap();
self.request(req) self.request(req).await
} }
pub fn start_backup( pub async fn start_backup(
&self, &self,
datastore: &str, datastore: &str,
backup_type: &str, backup_type: &str,
backup_id: &str, backup_id: &str,
backup_time: DateTime<Utc>, backup_time: DateTime<Utc>,
debug: bool, debug: bool,
) -> impl Future<Output = Result<Arc<BackupClient>, Error>> { ) -> Result<Arc<BackupClient>, Error> {
let param = json!({ let param = json!({
"backup-type": backup_type, "backup-type": backup_type,
@ -320,18 +308,19 @@ impl HttpClient {
let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap(); let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap();
self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())) let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
.map_ok(|(h2, canceller)| BackupClient::new(h2, canceller))
Ok(BackupClient::new(h2, canceller))
} }
pub fn start_backup_reader( pub async fn start_backup_reader(
&self, &self,
datastore: &str, datastore: &str,
backup_type: &str, backup_type: &str,
backup_id: &str, backup_id: &str,
backup_time: DateTime<Utc>, backup_time: DateTime<Utc>,
debug: bool, debug: bool,
) -> impl Future<Output = Result<Arc<BackupReader>, Error>> { ) -> Result<Arc<BackupReader>, Error> {
let param = json!({ let param = json!({
"backup-type": backup_type, "backup-type": backup_type,
@ -342,72 +331,63 @@ impl HttpClient {
}); });
let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap(); let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap();
self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) let (h2, canceller) = self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())).await?;
.map_ok(|(h2, canceller)| BackupReader::new(h2, canceller))
Ok(BackupReader::new(h2, canceller))
} }
pub fn start_h2_connection( pub async fn start_h2_connection(
&self, &self,
mut req: Request<Body>, mut req: Request<Body>,
protocol_name: String, protocol_name: String,
) -> impl Future<Output = Result<(H2Client, Canceller), Error>> { ) -> Result<(H2Client, Canceller), Error> {
let login = self.auth.listen(); let auth = self.login().await?;
let client = self.client.clone(); let client = self.client.clone();
login.and_then(move |auth| { let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET));
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap());
req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
let enc_ticket = format!("PBSAuthCookie={}", percent_encode(auth.ticket.as_bytes(), DEFAULT_ENCODE_SET)); let resp = client.request(req).await?;
req.headers_mut().insert("Cookie", HeaderValue::from_str(&enc_ticket).unwrap()); let status = resp.status();
req.headers_mut().insert("UPGRADE", HeaderValue::from_str(&protocol_name).unwrap());
client.request(req) if status != http::StatusCode::SWITCHING_PROTOCOLS {
.map_err(Error::from) Self::api_response(resp)
.and_then(|resp| { .map(|_| Err(format_err!("unknown error")))
.await?;
unreachable!();
}
let status = resp.status(); let upgraded = resp
if status != http::StatusCode::SWITCHING_PROTOCOLS { .into_body()
future::Either::Left( .on_upgrade()
Self::api_response(resp) .await?;
.map(|_| Err(format_err!("unknown error")))
)
} else {
future::Either::Right(
resp
.into_body()
.on_upgrade()
.map_err(Error::from)
)
}
})
.and_then(|upgraded| {
let max_window_size = (1 << 31) - 2;
h2::client::Builder::new() let max_window_size = (1 << 31) - 2;
.initial_connection_window_size(max_window_size)
.initial_window_size(max_window_size)
.max_frame_size(4*1024*1024)
.handshake(upgraded)
.map_err(Error::from)
})
.and_then(|(h2, connection)| async move {
let connection = connection
.map_err(|_| panic!("HTTP/2.0 connection failed"));
let (connection, canceller) = cancellable(connection)?; let (h2, connection) = h2::client::Builder::new()
// A cancellable future returns an Option which is None when cancelled and .initial_connection_window_size(max_window_size)
// Some when it finished instead, since we don't care about the return type we .initial_window_size(max_window_size)
// need to map it away: .max_frame_size(4*1024*1024)
let connection = connection.map(|_| ()); .handshake(upgraded)
.await?;
// Spawn a new task to drive the connection state let connection = connection
hyper::rt::spawn(connection); .map_err(|_| panic!("HTTP/2.0 connection failed"));
// Wait until the `SendRequest` handle has available capacity. let (connection, canceller) = cancellable(connection)?;
let c = h2.ready().await?; // A cancellable future returns an Option which is None when cancelled and
Ok((H2Client::new(c), canceller)) // Some when it finished instead, since we don't care about the return type we
}.boxed()) // need to map it away:
}) let connection = connection.map(|_| ());
// Spawn a new task to drive the connection state
hyper::rt::spawn(connection);
// Wait until the `SendRequest` handle has available capacity.
let c = h2.ready().await?;
Ok((H2Client::new(c), canceller))
} }
async fn credentials( async fn credentials(
@ -450,14 +430,15 @@ impl HttpClient {
} }
} }
fn api_request( async fn api_request(
client: Client<HttpsConnector>, client: Client<HttpsConnector>,
req: Request<Body> req: Request<Body>
) -> impl Future<Output = Result<Value, Error>> { ) -> Result<Value, Error> {
client.request(req) client.request(req)
.map_err(Error::from) .map_err(Error::from)
.and_then(Self::api_response) .and_then(Self::api_response)
.await
} }
pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> { pub fn request_builder(server: &str, method: &str, path: &str, data: Option<Value>) -> Result<Request<Body>, Error> {
@ -953,7 +934,7 @@ impl BackupClient {
let csum = guard.as_mut().unwrap(); let csum = guard.as_mut().unwrap();
let chunk_end = offset + chunk_len as u64; let chunk_end = offset + chunk_len as u64;
csum.update(&chunk_end.to_le_bytes()); csum.update(&chunk_end.to_le_bytes());
csum.update(digest); csum.update(digest);