From 9af37c8f0e31f6d1402a5f02de9c24106e5efe1f Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Wed, 22 May 2019 17:28:25 +0200 Subject: [PATCH] src/client/http_client.rs: move low level H2 code into separate class --- src/client/http_client.rs | 131 ++++++++++++++++++++------------------ 1 file changed, 70 insertions(+), 61 deletions(-) diff --git a/src/client/http_client.rs b/src/client/http_client.rs index afeb9c93..dc94e8f6 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -275,12 +275,11 @@ impl HttpClient { let status = resp.status(); if status != http::StatusCode::SWITCHING_PROTOCOLS { - bail!("got status {:?} instead of protocol switch", status); + future::Either::A(Self::api_response(resp).and_then(|_| { bail!("unknown error"); })) + } else { + future::Either::B(resp.into_body().on_upgrade().map_err(Error::from)) } - - Ok(resp.into_body().on_upgrade().map_err(Error::from)) }) - .flatten() .and_then(|upgraded| { h2::client::handshake(upgraded).map_err(Error::from) }) @@ -403,53 +402,25 @@ impl HttpClient { //#[derive(Clone)] pub struct BackupClient { - h2: h2::client::SendRequest, + h2: H2Client, } impl BackupClient { pub fn new(h2: h2::client::SendRequest) -> Self { - Self { h2 } + Self { h2: H2Client::new(h2) } } pub fn get(&self, path: &str, param: Option) -> impl Future { - let req = Self::request_builder("localhost", "GET", path, param).unwrap(); - Self::request(self.h2.clone(), req) + self.h2.get(path, param) } pub fn put(&self, path: &str, param: Option) -> impl Future { - let req = Self::request_builder("localhost", "PUT", path, param).unwrap(); - Self::request(self.h2.clone(), req) + self.h2.put(path, param) } pub fn post(&self, path: &str, param: Option) -> impl Future { - Self::h2post(self.h2.clone(), path, param) - } - - fn h2post( - h2: h2::client::SendRequest, - path: &str, - param: Option - ) -> impl Future { - let req = Self::request_builder("localhost", "POST", path, param).unwrap(); - Self::request(h2, req) - } - - pub fn upload(&self, path: &str, param: Option, data: Vec) -> impl Future { - let request = Self::request_builder("localhost", "POST", path, param).unwrap(); - - self.h2.clone() - .ready() - .map_err(Error::from) - .and_then(move |mut send_request| { - let (response, stream) = send_request.send_request(request, false).unwrap(); - PipeToSendStream::new(bytes::Bytes::from(data), stream) - .and_then(|_| { - response - .map_err(Error::from) - .and_then(Self::h2api_response) - }) - }) + self.h2.post(path, param) } fn response_queue() -> ( @@ -465,7 +436,7 @@ impl BackupClient { .for_each(|response: h2::client::ResponseFuture| { response .map_err(Error::from) - .and_then(Self::h2api_response) + .and_then(H2Client::h2api_response) .and_then(|result| { println!("RESPONSE: {:?}", result); Ok(()) @@ -482,16 +453,16 @@ impl BackupClient { } fn download_chunk_list( - h2: h2::client::SendRequest, + h2: H2Client, path: &str, archive_name: &str, known_chunks: Arc>>, ) -> impl Future { let param = json!({ "archive-name": archive_name }); - let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap(); + let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap(); - Self::send_request(h2.clone(), request, None) + h2.send_request(request, None) .and_then(move |response| { response .map_err(Error::from) @@ -521,8 +492,7 @@ impl BackupClient { } pub fn finish(&self) -> impl Future { - Self::h2post(self.h2.clone(), "finish", None) - .map(|_| ()) + self.h2.clone().post("finish", None).map(|_| ()) } pub fn upload_dynamic_stream( @@ -542,25 +512,20 @@ impl BackupClient { Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone()) .and_then(move |_| { - Self::h2post(h2_2, "dynamic_index", Some(param)) + h2_2.post("dynamic_index", Some(param)) }) .and_then(move |res| { - println!("GOT1 {:?}", res); let wid = res.as_u64().unwrap(); - //let dir_path = PathBuf::from("../casync-pve"); - //let dir_path = PathBuf::from("."); - - //upload_pxar(h2, known_chunks, &dir_path, wid).unwrap() Self::upload_stream(h2_3, wid, stream, known_chunks.clone()) .and_then(move |_size| { - Self::h2post(h2_4, "dynamic_close", Some(json!({ "wid": wid }))) + h2_4.post("dynamic_close", Some(json!({ "wid": wid }))) }) .map(|_| ()) }) } fn upload_stream( - h2: h2::client::SendRequest, + h2: H2Client, wid: u64, stream: impl Stream, known_chunks: Arc>>, @@ -576,7 +541,6 @@ impl BackupClient { let start_time = std::time::Instant::now(); - stream .for_each(move |data| { let h2 = h2.clone(); @@ -597,17 +561,17 @@ impl BackupClient { if chunk_is_known { println!("append existing chunk ({} bytes)", data.len()); let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) }); - request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); + request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); upload_data = None; } else { println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len()); known_chunks.insert(digest); let param = json!({ "wid": wid, "size" : data.len() }); - request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); + request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); upload_data = Some(bytes::Bytes::from(data)); } - Self::send_request(h2, request, upload_data) + h2.send_request(request, upload_data) .and_then(move |response| { upload_queue.send(response) .map(|_| ()).map_err(Error::from) @@ -666,8 +630,8 @@ impl BackupClient { let upload_queue = upload_queue.clone(); println!("send test data ({} bytes)", data.len()); - let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap(); - Self::send_request(h2, request, Some(bytes::Bytes::from(data))) + let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap(); + h2.send_request(request, Some(bytes::Bytes::from(data))) .and_then(move |response| { upload_queue.send(response) .map(|_| ()).map_err(Error::from) @@ -690,13 +654,58 @@ impl BackupClient { Ok(speed) }) } +} + +#[derive(Clone)] +pub struct H2Client { + h2: h2::client::SendRequest, +} + +impl H2Client { + + pub fn new(h2: h2::client::SendRequest) -> Self { + Self { h2 } + } + + pub fn get(&self, path: &str, param: Option) -> impl Future { + let req = Self::request_builder("localhost", "GET", path, param).unwrap(); + self.request(req) + } + + pub fn put(&self, path: &str, param: Option) -> impl Future { + let req = Self::request_builder("localhost", "PUT", path, param).unwrap(); + self.request(req) + } + + pub fn post(&self, path: &str, param: Option) -> impl Future { + let req = Self::request_builder("localhost", "POST", path, param).unwrap(); + self.request(req) + } + + pub fn upload(&self, path: &str, param: Option, data: Vec) -> impl Future { + let request = Self::request_builder("localhost", "POST", path, param).unwrap(); + + + self.h2.clone() + .ready() + .map_err(Error::from) + .and_then(move |mut send_request| { + let (response, stream) = send_request.send_request(request, false).unwrap(); + PipeToSendStream::new(bytes::Bytes::from(data), stream) + .and_then(|_| { + response + .map_err(Error::from) + .and_then(Self::h2api_response) + }) + }) + } fn request( - h2: h2::client::SendRequest, + &self, request: Request<()>, ) -> impl Future { - Self::send_request(h2, request, None) + self.send_request(request, None) .and_then(move |response| { response .map_err(Error::from) @@ -705,12 +714,12 @@ impl BackupClient { } fn send_request( - h2: h2::client::SendRequest, + &self, request: Request<()>, data: Option, ) -> impl Future { - h2 + self.h2.clone() .ready() .map_err(Error::from) .and_then(move |mut send_request| {