From 6ab34afa881ec9910d032c7571a1ac3bde810f58 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Wed, 22 May 2019 13:05:51 +0200 Subject: [PATCH] src/client/http_client.rs: cleanup h2 backup client --- src/bin/upload-speed.rs | 3 +- src/client/http_client.rs | 109 ++++++++++++++++++++++++++++---------- 2 files changed, 82 insertions(+), 30 deletions(-) diff --git a/src/bin/upload-speed.rs b/src/bin/upload-speed.rs index d22c66a2..2724e805 100644 --- a/src/bin/upload-speed.rs +++ b/src/bin/upload-speed.rs @@ -13,8 +13,7 @@ fn upload_speed() -> Result { let mut client = HttpClient::new(host, username)?; - let param = json!({"backup-type": "host", "backup-id": "speedtest" }); - let upgrade = client.h2upgrade(&format!("/api2/json/admin/datastore/{}/backup", datastore), Some(param)); + let upgrade = client.start_backup(datastore, "host", "speedtest"); let res = upgrade.and_then(|h2| { println!("start upload speed test"); diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 46ddf758..1d2fab18 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -248,12 +248,16 @@ impl HttpClient { self.request(req) } - pub fn h2upgrade( - &mut self, path: - &str, param: Option - ) -> impl Future { + pub fn start_backup( + &self, + datastore: &str, + backup_type: &str, + backup_id: &str, + ) -> impl Future { - let mut req = Self::request_builder(&self.server, "GET", path, param).unwrap(); + let path = format!("/api2/json/admin/datastore/{}/backup", datastore); + let param = json!({"backup-type": backup_type, "backup-id": backup_id}); + let mut req = Self::request_builder(&self.server, "GET", &path, Some(param)).unwrap(); let login = self.auth.listen(); @@ -271,7 +275,7 @@ impl HttpClient { let status = resp.status(); if status != http::StatusCode::SWITCHING_PROTOCOLS { - bail!("h2upgrade failed with status {:?}", status); + bail!("got status {:?} instead of protocol switch", status); } Ok(resp.into_body().on_upgrade().map_err(Error::from)) @@ -289,7 +293,7 @@ impl HttpClient { // Wait until the `SendRequest` handle has available capacity. h2.ready() - .map(H2Client::new) + .map(BackupClient::new) .map_err(Error::from) }) }) @@ -395,12 +399,12 @@ impl HttpClient { } } -#[derive(Clone)] -pub struct H2Client { +//#[derive(Clone)] +pub struct BackupClient { h2: h2::client::SendRequest, } -impl H2Client { +impl BackupClient { pub fn new(h2: h2::client::SendRequest) -> Self { Self { h2 } @@ -408,17 +412,25 @@ impl H2Client { pub fn get(&self, path: &str, param: Option) -> impl Future { let req = Self::request_builder("localhost", "GET", path, param).unwrap(); - self.request(req) + Self::request(self.h2.clone(), req) } pub fn put(&self, path: &str, param: Option) -> impl Future { let req = Self::request_builder("localhost", "PUT", path, param).unwrap(); - self.request(req) + Self::request(self.h2.clone(), req) } pub fn post(&self, path: &str, param: Option) -> impl Future { + Self::h2post(self.h2.clone(), path, param) + } + + fn h2post( + h2: h2::client::SendRequest, + path: &str, + param: Option + ) -> impl Future { let req = Self::request_builder("localhost", "POST", path, param).unwrap(); - self.request(req) + Self::request(h2, req) } pub fn upload(&self, path: &str, param: Option, data: Vec) -> impl Future { @@ -438,7 +450,7 @@ impl H2Client { }) } - fn response_queue(self) -> ( + fn response_queue() -> ( mpsc::Sender, sync::oneshot::Receiver> ) { @@ -467,8 +479,8 @@ impl H2Client { (verify_queue_tx, verify_result_rx) } - pub fn download_chunk_list( - &self, + fn download_chunk_list( + h2: h2::client::SendRequest, path: &str, archive_name: &str, known_chunks: Arc>>, @@ -477,7 +489,7 @@ impl H2Client { let param = json!({ "archive-name": archive_name }); let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap(); - self.send_request(request, None) + Self::send_request(h2.clone(), request, None) .and_then(move |response| { response .map_err(Error::from) @@ -506,8 +518,47 @@ impl H2Client { }) } - pub fn upload_stream( + pub fn finish(&self) -> impl Future { + Self::h2post(self.h2.clone(), "finish", None) + .map(|_| ()) + } + + pub fn upload_dynamic_stream( &self, + archive_name: &str, + stream: impl Stream, + ) -> impl Future { + + let known_chunks = Arc::new(Mutex::new(HashSet::new())); + + let h2 = self.h2.clone(); + let h2_2 = self.h2.clone(); + let h2_3 = self.h2.clone(); + let h2_4 = self.h2.clone(); + + let param = json!({ "archive-name": archive_name }); + + Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone()) + .and_then(move |_| { + Self::h2post(h2_2, "dynamic_index", Some(param)) + }) + .and_then(move |res| { + println!("GOT1 {:?}", res); + let wid = res.as_u64().unwrap(); + //let dir_path = PathBuf::from("../casync-pve"); + //let dir_path = PathBuf::from("."); + + //upload_pxar(h2, known_chunks, &dir_path, wid).unwrap() + Self::upload_stream(h2_3, wid, stream, known_chunks.clone()) + .and_then(move |size| { + Self::h2post(h2_4, "dynamic_close", Some(json!({ "wid": wid }))) + }) + .map(|_| ()) + }) + } + + fn upload_stream( + h2: h2::client::SendRequest, wid: u64, stream: impl Stream, known_chunks: Arc>>, @@ -519,14 +570,15 @@ impl H2Client { let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); let stream_len2 = stream_len.clone(); - let (upload_queue, upload_result) = self.clone().response_queue(); + let (upload_queue, upload_result) = Self::response_queue(); let start_time = std::time::Instant::now(); - let self2 = self.clone(); stream .for_each(move |data| { + let h2 = h2.clone(); + repeat.fetch_add(1, Ordering::SeqCst); stream_len.fetch_add(data.len(), Ordering::SeqCst); @@ -553,7 +605,7 @@ impl H2Client { upload_data = Some(bytes::Bytes::from(data)); } - self2.send_request(request, upload_data) + Self::send_request(h2, request, upload_data) .and_then(move |response| { upload_queue.send(response) .map(|_| ()).map_err(Error::from) @@ -595,11 +647,11 @@ impl H2Client { let repeat = std::sync::Arc::new(AtomicUsize::new(0)); let repeat2 = repeat.clone(); - let (upload_queue, upload_result) = self.clone().response_queue(); + let (upload_queue, upload_result) = Self::response_queue(); let start_time = std::time::Instant::now(); - let self2 = self.clone(); + let h2 = self.h2.clone(); futures::stream::repeat(data) .take_while(move |_| { @@ -607,12 +659,13 @@ impl H2Client { Ok(start_time.elapsed().as_secs() < 5) }) .for_each(move |data| { + let h2 = h2.clone(); let upload_queue = upload_queue.clone(); println!("send test data ({} bytes)", data.len()); let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap(); - self2.send_request(request, Some(bytes::Bytes::from(data))) + Self::send_request(h2, request, Some(bytes::Bytes::from(data))) .and_then(move |response| { upload_queue.send(response) .map(|_| ()).map_err(Error::from) @@ -637,11 +690,11 @@ impl H2Client { } fn request( - &self, + h2: h2::client::SendRequest, request: Request<()>, ) -> impl Future { - self.send_request(request, None) + Self::send_request(h2, request, None) .and_then(move |response| { response .map_err(Error::from) @@ -650,12 +703,12 @@ impl H2Client { } fn send_request( - &self, + h2: h2::client::SendRequest, request: Request<()>, data: Option, ) -> impl Future { - self.h2.clone() + h2 .ready() .map_err(Error::from) .and_then(move |mut send_request| {