diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 1bdd0d7d..0c6b1e93 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -14,7 +14,8 @@ use hyper::Body; use hyper::client::Client; use openssl::ssl::{SslConnector, SslMethod}; use serde_json::{json, Value}; -use tokio::sync::mpsc; +use tokio::io::AsyncReadExt; +use tokio::sync::{mpsc, oneshot}; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; use xdg::BaseDirectories; @@ -145,7 +146,7 @@ impl HttpClient { /// /// Login is done on demand, so this is onyl required if you need /// access to authentication data in 'AuthInfo'. - pub fn login(&self) -> impl Future { + pub fn login(&self) -> impl Future> { self.auth.listen() } @@ -186,7 +187,7 @@ impl HttpClient { .build::<_, Body>(https) } - pub fn request(&self, mut req: Request) -> impl Future { + pub fn request(&self, mut req: Request) -> impl Future> { let login = self.auth.listen(); @@ -204,26 +205,38 @@ impl HttpClient { }) } - pub fn get(&self, path: &str, data: Option) -> impl Future { - + pub fn get( + &self, + path: &str, + data: Option, + ) -> impl Future> { let req = Self::request_builder(&self.server, "GET", path, data).unwrap(); self.request(req) } - pub fn delete(&mut self, path: &str, data: Option) -> impl Future { - + pub fn delete( + &mut self, + path: &str, + data: Option, + ) -> impl Future> { let req = Self::request_builder(&self.server, "DELETE", path, data).unwrap(); self.request(req) } - pub fn post(&mut self, path: &str, data: Option) -> impl Future { - + pub fn post( + &mut self, + path: &str, + data: Option, + ) -> impl Future> { let req = Self::request_builder(&self.server, "POST", path, data).unwrap(); self.request(req) } - pub fn download(&mut self, path: &str, output: W) -> impl Future { - + pub fn download( + &mut self, + path: &str, + output: W, + ) -> impl Future> { let mut req = Self::request_builder(&self.server, "GET", path, None).unwrap(); let login = self.auth.listen(); @@ -240,15 +253,15 @@ impl HttpClient { .and_then(|resp| { let status = resp.status(); if !status.is_success() { - future::Either::A( + future::Either::Left( HttpClient::api_response(resp) - .and_then(|_| { bail!("unknown error"); }) + .map(|_| Err(format_err!("unknown error"))) ) } else { - future::Either::B( + future::Either::Right( resp.into_body() .map_err(Error::from) - .fold(output, move |mut acc, chunk| { + .try_fold(output, move |mut acc, chunk| async move { acc.write_all(&chunk)?; Ok::<_, Error>(acc) }) @@ -264,7 +277,7 @@ impl HttpClient { body: Body, path: &str, data: Option, - ) -> impl Future { + ) -> impl Future> { let path = path.trim_matches('/'); let mut url = format!("https://{}:8007/{}", &self.server, path); @@ -294,7 +307,7 @@ impl HttpClient { backup_id: &str, backup_time: DateTime, debug: bool, - ) -> impl Future, Error=Error> { + ) -> impl Future, Error>> { let param = json!({ "backup-type": backup_type, @@ -307,7 +320,7 @@ impl HttpClient { let req = Self::request_builder(&self.server, "GET", "/api2/json/backup", Some(param)).unwrap(); self.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())) - .map(|(h2, canceller)| BackupClient::new(h2, canceller)) + .map_ok(|(h2, canceller)| BackupClient::new(h2, canceller)) } pub fn start_backup_reader( @@ -317,7 +330,7 @@ impl HttpClient { backup_id: &str, backup_time: DateTime, debug: bool, - ) -> impl Future, Error=Error> { + ) -> impl Future, Error>> { let param = json!({ "backup-type": backup_type, @@ -329,14 +342,14 @@ impl HttpClient { let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap(); self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) - .map(|(h2, canceller)| BackupReader::new(h2, canceller)) + .map_ok(|(h2, canceller)| BackupReader::new(h2, canceller)) } pub fn start_h2_connection( &self, mut req: Request, protocol_name: String, - ) -> impl Future { + ) -> impl Future> { let login = self.auth.listen(); let client = self.client.clone(); @@ -353,9 +366,17 @@ impl HttpClient { let status = resp.status(); if status != http::StatusCode::SWITCHING_PROTOCOLS { - future::Either::A(Self::api_response(resp).and_then(|_| { bail!("unknown error"); })) + future::Either::Left( + Self::api_response(resp) + .map(|_| Err(format_err!("unknown error"))) + ) } else { - future::Either::B(resp.into_body().on_upgrade().map_err(Error::from)) + future::Either::Right( + resp + .into_body() + .on_upgrade() + .map_err(Error::from) + ) } }) .and_then(|upgraded| { @@ -368,7 +389,7 @@ impl HttpClient { .handshake(upgraded) .map_err(Error::from) }) - .and_then(|(h2, connection)| { + .and_then(|(h2, connection)| async move { let connection = connection .map_err(|_| panic!("HTTP/2.0 connection failed")); @@ -382,11 +403,9 @@ impl HttpClient { hyper::rt::spawn(connection); // Wait until the `SendRequest` handle has available capacity. - Ok(h2.ready() - .map(move |c| (H2Client::new(c), canceller)) - .map_err(Error::from)) - }) - .flatten() + let c = h2.ready().await?; + Ok((H2Client::new(c), canceller)) + }.boxed()) }) } @@ -395,60 +414,47 @@ impl HttpClient { server: String, username: String, password: String, - ) -> Box + Send> { - - let server2 = server.clone(); - - let create_request = futures::future::lazy(move || { + ) -> Box> + Send> { + Box::new(async move { let data = json!({ "username": username, "password": password }); let req = Self::request_builder(&server, "POST", "/api2/json/access/ticket", Some(data)).unwrap(); - Self::api_request(client, req) - }); + let cred = Self::api_request(client, req).await?; + let auth = AuthInfo { + username: cred["data"]["username"].as_str().unwrap().to_owned(), + ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(), + token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(), + }; - let login_future = create_request - .and_then(move |cred| { - let auth = AuthInfo { - username: cred["data"]["username"].as_str().unwrap().to_owned(), - ticket: cred["data"]["ticket"].as_str().unwrap().to_owned(), - token: cred["data"]["CSRFPreventionToken"].as_str().unwrap().to_owned(), - }; + let _ = store_ticket_info(&server, &auth.username, &auth.ticket, &auth.token); - let _ = store_ticket_info(&server2, &auth.username, &auth.ticket, &auth.token); - - Ok(auth) - }); - - Box::new(login_future) + Ok(auth) + }) } - fn api_response(response: Response) -> impl Future { - + async fn api_response(response: Response) -> Result { let status = response.status(); - - response + let data = response .into_body() - .concat2() - .map_err(Error::from) - .and_then(move |data| { + .try_concat() + .await?; - let text = String::from_utf8(data.to_vec()).unwrap(); - if status.is_success() { - if text.len() > 0 { - let value: Value = serde_json::from_str(&text)?; - Ok(value) - } else { - Ok(Value::Null) - } - } else { - bail!("HTTP Error {}: {}", status, text); - } - }) + let text = String::from_utf8(data.to_vec()).unwrap(); + if status.is_success() { + if text.len() > 0 { + let value: Value = serde_json::from_str(&text)?; + Ok(value) + } else { + Ok(Value::Null) + } + } else { + bail!("HTTP Error {}: {}", status, text); + } } fn api_request( client: Client>, req: Request - ) -> impl Future { + ) -> impl Future> { client.request(req) .map_err(Error::from) @@ -511,40 +517,52 @@ impl BackupReader { Arc::new(Self { h2, canceller: canceller }) } - pub fn get(&self, path: &str, param: Option) -> impl Future { + pub fn get( + &self, + path: &str, + param: Option, + ) -> impl Future> { self.h2.get(path, param) } - pub fn put(&self, path: &str, param: Option) -> impl Future { + pub fn put( + &self, + path: &str, + param: Option, + ) -> impl Future> { self.h2.put(path, param) } - pub fn post(&self, path: &str, param: Option) -> impl Future { + pub fn post( + &self, + path: &str, + param: Option, + ) -> impl Future> { self.h2.post(path, param) } - pub fn download( + pub fn download( &self, file_name: &str, output: W, - ) -> impl Future { + ) -> impl Future> { let path = "download"; let param = json!({ "file-name": file_name }); self.h2.download(path, Some(param), output) } - pub fn speedtest( + pub fn speedtest( &self, output: W, - ) -> impl Future { + ) -> impl Future> { self.h2.download("speedtest", None, output) } - pub fn download_chunk( + pub fn download_chunk( &self, digest: &[u8; 32], output: W, - ) -> impl Future { + ) -> impl Future> { let path = "chunk"; let param = json!({ "digest": digest_to_hex(digest) }); self.h2.download(path, Some(param), output) @@ -573,27 +591,38 @@ pub struct BackupStats { } impl BackupClient { - pub fn new(h2: H2Client, canceller: Canceller) -> Arc { Arc::new(Self { h2, canceller }) } - pub fn get(&self, path: &str, param: Option) -> impl Future { + pub fn get( + &self, + path: &str, + param: Option, + ) -> impl Future> { self.h2.get(path, param) } - pub fn put(&self, path: &str, param: Option) -> impl Future { + pub fn put( + &self, + path: &str, + param: Option, + ) -> impl Future> { self.h2.put(path, param) } - pub fn post(&self, path: &str, param: Option) -> impl Future { + pub fn post( + &self, + path: &str, + param: Option, + ) -> impl Future> { self.h2.post(path, param) } - pub fn finish(self: Arc) -> impl Future { + pub fn finish(self: Arc) -> impl Future> { self.h2.clone() .post("finish", None) - .map(move |_| { + .map_ok(move |_| { self.canceller.cancel(); }) } @@ -606,27 +635,22 @@ impl BackupClient { &self, mut reader: R, file_name: &str, - ) -> impl Future { + ) -> impl Future> { let h2 = self.h2.clone(); let file_name = file_name.to_owned(); - futures::future::ok(()) - .and_then(move |_| { - let mut raw_data = Vec::new(); - // fixme: avoid loading into memory - reader.read_to_end(&mut raw_data)?; - Ok(raw_data) - }) - .and_then(move |raw_data| { - let csum = openssl::sha::sha256(&raw_data); - let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); - let size = raw_data.len() as u64; // fixme: should be decoded size instead?? - h2.upload("blob", Some(param), raw_data) - .map(move |_| { - BackupStats { size, csum } - }) - }) + async move { + let mut raw_data = Vec::new(); + // fixme: avoid loading into memory + reader.read_to_end(&mut raw_data)?; + + let csum = openssl::sha::sha256(&raw_data); + let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); + let size = raw_data.len() as u64; // fixme: should be decoded size instead?? + let _value = h2.upload("blob", Some(param), raw_data).await?; + Ok(BackupStats { size, csum }) + } } pub fn upload_blob_from_data( @@ -636,35 +660,30 @@ impl BackupClient { crypt_config: Option>, compress: bool, sign_only: bool, - ) -> impl Future { + ) -> impl Future> { let h2 = self.h2.clone(); let file_name = file_name.to_owned(); let size = data.len() as u64; - futures::future::ok(()) - .and_then(move |_| { - let blob = if let Some(crypt_config) = crypt_config { - if sign_only { - DataBlob::create_signed(&data, crypt_config, compress)? - } else { - DataBlob::encode(&data, Some(crypt_config.clone()), compress)? - } + async move { + let blob = if let Some(crypt_config) = crypt_config { + if sign_only { + DataBlob::create_signed(&data, crypt_config, compress)? } else { - DataBlob::encode(&data, None, compress)? - }; + DataBlob::encode(&data, Some(crypt_config.clone()), compress)? + } + } else { + DataBlob::encode(&data, None, compress)? + }; - let raw_data = blob.into_inner(); - Ok(raw_data) - }) - .and_then(move |raw_data| { - let csum = openssl::sha::sha256(&raw_data); - let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); - h2.upload("blob", Some(param), raw_data) - .map(move |_| { - BackupStats { size, csum } - }) - }) + let raw_data = blob.into_inner(); + + let csum = openssl::sha::sha256(&raw_data); + let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); + let _value = h2.upload("blob", Some(param), raw_data).await?; + Ok(BackupStats { size, csum }) + } } pub fn upload_blob_from_file>( @@ -673,52 +692,43 @@ impl BackupClient { file_name: &str, crypt_config: Option>, compress: bool, - ) -> impl Future { + ) -> impl Future> { let h2 = self.h2.clone(); let file_name = file_name.to_owned(); let src_path = src_path.as_ref().to_owned(); - let task = tokio::fs::File::open(src_path.clone()) - .map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err)) - .and_then(move |file| { - let contents = vec![]; - tokio::io::read_to_end(file, contents) - .map_err(Error::from) - .and_then(move |(_, contents)| { - let blob = DataBlob::encode(&contents, crypt_config, compress)?; - let raw_data = blob.into_inner(); - Ok((raw_data, contents.len() as u64)) - }) - .and_then(move |(raw_data, size)| { - let csum = openssl::sha::sha256(&raw_data); - let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); - h2.upload("blob", Some(param), raw_data) - .map(move |_| { - BackupStats { size, csum } - }) - }) - }); + async move { + let mut file = tokio::fs::File::open(src_path.clone()) + .await + .map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err))?; - task + let mut contents = Vec::new(); + file.read_to_end(&mut contents).await.map_err(Error::from)?; + + let size: u64 = contents.len() as u64; + let blob = DataBlob::encode(&contents, crypt_config, compress)?; + let raw_data = blob.into_inner(); + let csum = openssl::sha::sha256(&raw_data); + let param = json!({ + "encoded-size": raw_data.len(), + "file-name": file_name, + }); + h2.upload("blob", Some(param), raw_data).await?; + Ok(BackupStats { size, csum }) + } } pub fn upload_stream( &self, archive_name: &str, - stream: impl Stream, + stream: impl Stream>, prefix: &str, fixed_size: Option, crypt_config: Option>, - ) -> impl Future { - + ) -> impl Future> { let known_chunks = Arc::new(Mutex::new(HashSet::new())); - let h2 = self.h2.clone(); - let h2_2 = self.h2.clone(); - let h2_3 = self.h2.clone(); - let h2_4 = self.h2.clone(); - let mut param = json!({ "archive-name": archive_name }); if let Some(size) = fixed_size { param["size"] = size.into(); @@ -729,51 +739,59 @@ impl BackupClient { let prefix = prefix.to_owned(); - Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone()) - .and_then(move |_| { - h2_2.post(&index_path, Some(param)) - }) - .and_then(move |res| { - let wid = res.as_u64().unwrap(); - Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config) - .and_then(move |(chunk_count, size, _speed, csum)| { - let param = json!({ - "wid": wid , - "chunk-count": chunk_count, - "size": size, - }); - h2_4.post(&close_path, Some(param)) - .map(move |_| { - BackupStats { size: size as u64, csum } - }) - }) + let h2 = self.h2.clone(); + + let download_future = + Self::download_chunk_list(h2.clone(), &index_path, archive_name, known_chunks.clone()); + + async move { + download_future.await?; + + let wid = h2.post(&index_path, Some(param)).await?.as_u64().unwrap(); + + let (chunk_count, size, _speed, csum) = Self::upload_chunk_info_stream( + h2.clone(), + wid, + stream, + &prefix, + known_chunks.clone(), + crypt_config, + ) + .await?; + + let param = json!({ + "wid": wid , + "chunk-count": chunk_count, + "size": size, + }); + let _value = h2.post(&close_path, Some(param)).await?; + Ok(BackupStats { + size: size as u64, + csum, }) + } } fn response_queue() -> ( mpsc::Sender, - sync::oneshot::Receiver> + oneshot::Receiver> ) { let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); - let (verify_result_tx, verify_result_rx) = sync::oneshot::channel(); + let (verify_result_tx, verify_result_rx) = oneshot::channel(); hyper::rt::spawn( verify_queue_rx - .map_err(Error::from) - .for_each(|response: h2::client::ResponseFuture| { + .map(Ok::<_, Error>) + .try_for_each(|response: h2::client::ResponseFuture| { response .map_err(Error::from) .and_then(H2Client::h2api_response) - .and_then(|result| { - println!("RESPONSE: {:?}", result); - Ok(()) - }) + .map_ok(|result| println!("RESPONSE: {:?}", result)) .map_err(|err| format_err!("pipelined request failed: {}", err)) }) - .then(|result| - verify_result_tx.send(result) - ) - .map_err(|_| { /* ignore closed channel */ }) + .map(|result| { + let _ignore_closed_channel = verify_result_tx.send(result); + }) ); (verify_queue_tx, verify_result_rx) @@ -781,30 +799,30 @@ impl BackupClient { fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> ( mpsc::Sender<(MergedChunkInfo, Option)>, - sync::oneshot::Receiver> + oneshot::Receiver> ) { let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64); - let (verify_result_tx, verify_result_rx) = sync::oneshot::channel(); + let (verify_result_tx, verify_result_rx) = oneshot::channel(); let h2_2 = h2.clone(); hyper::rt::spawn( verify_queue_rx - .map_err(Error::from) + .map(Ok::<_, Error>) .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { match (response, merged_chunk_info) { (Some(response), MergedChunkInfo::Known(list)) => { - future::Either::A( + future::Either::Left( response .map_err(Error::from) .and_then(H2Client::h2api_response) .and_then(move |_result| { - Ok(MergedChunkInfo::Known(list)) + future::ok(MergedChunkInfo::Known(list)) }) ) } (None, MergedChunkInfo::Known(list)) => { - future::Either::B(future::ok(MergedChunkInfo::Known(list))) + future::Either::Right(future::ok(MergedChunkInfo::Known(list))) } _ => unreachable!(), } @@ -831,18 +849,17 @@ impl BackupClient { response .map_err(Error::from) .and_then(H2Client::h2api_response) - .and_then(|_| Ok(())) + .map_ok(|_| ()) }) .map_err(|err| format_err!("pipelined request failed: {}", err)) } _ => unreachable!(), } }) - .for_each(|_| Ok(())) - .then(|result| - verify_result_tx.send(result) - ) - .map_err(|_| { /* ignore closed channel */ }) + .try_for_each(|_| future::ok(())) + .map(|result| { + let _ignore_closed_channel = verify_result_tx.send(result); + }) ); (verify_queue_tx, verify_result_rx) @@ -853,7 +870,7 @@ impl BackupClient { path: &str, archive_name: &str, known_chunks: Arc>>, - ) -> impl Future { + ) -> impl Future> { let param = json!({ "archive-name": archive_name }); let request = H2Client::request_builder("localhost", "GET", path, Some(param)).unwrap(); @@ -866,9 +883,12 @@ impl BackupClient { let status = resp.status(); if !status.is_success() { - future::Either::A(H2Client::h2api_response(resp).and_then(|_| { bail!("unknown error"); })) + future::Either::Left( + H2Client::h2api_response(resp) + .map(|_| Err(format_err!("unknown error"))) + ) } else { - future::Either::B(future::ok(resp.into_body())) + future::Either::Right(future::ok(resp.into_body())) } }) .and_then(move |mut body| { @@ -876,11 +896,11 @@ impl BackupClient { let mut release_capacity = body.release_capacity().clone(); DigestListDecoder::new(body.map_err(Error::from)) - .for_each(move |chunk| { + .try_for_each(move |chunk| { let _ = release_capacity.release_capacity(chunk.len()); println!("GOT DOWNLOAD {}", digest_to_hex(&chunk)); known_chunks.lock().unwrap().insert(chunk); - Ok(()) + futures::future::ok(()) }) }) }) @@ -889,22 +909,23 @@ impl BackupClient { fn upload_chunk_info_stream( h2: H2Client, wid: u64, - stream: impl Stream, + stream: impl Stream>, prefix: &str, known_chunks: Arc>>, crypt_config: Option>, - ) -> impl Future { + ) -> impl Future> { - let repeat = std::sync::Arc::new(AtomicUsize::new(0)); + let repeat = Arc::new(AtomicUsize::new(0)); let repeat2 = repeat.clone(); - let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); + let stream_len = Arc::new(AtomicUsize::new(0)); let stream_len2 = stream_len.clone(); let append_chunk_path = format!("{}_index", prefix); let upload_chunk_path = format!("{}_chunk", prefix); - let (upload_queue, upload_result) = Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned()); + let (upload_queue, upload_result) = + Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned()); let start_time = std::time::Instant::now(); @@ -936,21 +957,26 @@ impl BackupClient { let chunk_is_known = known_chunks.contains(digest); if chunk_is_known { - Ok(MergedChunkInfo::Known(vec![(offset, *digest)])) + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) } else { known_chunks.insert(*digest); - let chunk = chunk_builder.build()?; - Ok(MergedChunkInfo::New(ChunkInfo { chunk, chunk_len: chunk_len as u64, offset })) + future::ready(chunk_builder + .build() + .map(move |chunk| MergedChunkInfo::New(ChunkInfo { + chunk, + chunk_len: chunk_len as u64, + offset, + })) + ) } }) .merge_known_chunks() - .for_each(move |merged_chunk_info| { + .try_for_each(move |merged_chunk_info| { if let MergedChunkInfo::New(chunk_info) = merged_chunk_info { let offset = chunk_info.offset; let digest = *chunk_info.chunk.digest(); let digest_str = digest_to_hex(&digest); - let upload_queue = upload_queue.clone(); println!("upload new chunk {} ({} bytes, offset {})", digest_str, chunk_info.chunk_len, offset); @@ -968,28 +994,29 @@ impl BackupClient { let new_info = MergedChunkInfo::Known(vec![(offset, digest)]); - future::Either::A( - h2.send_request(request, upload_data) - .and_then(move |response| { - upload_queue.clone().send((new_info, Some(response))) - .map(|_| ()).map_err(Error::from) - }) + let mut upload_queue = upload_queue.clone(); + future::Either::Left(h2 + .send_request(request, upload_data) + .and_then(move |response| async move { + upload_queue + .send((new_info, Some(response))) + .await + .map_err(Error::from) + }) ) } else { - - future::Either::B( - upload_queue.clone().send((merged_chunk_info, None)) - .map(|_| ()).map_err(Error::from) - ) + let mut upload_queue = upload_queue.clone(); + future::Either::Right(async move { + upload_queue + .send((merged_chunk_info, None)) + .await + .map_err(Error::from) + }) } }) - .then(move |result| { - //println!("RESULT {:?}", result); - upload_result.map_err(Error::from).and_then(|upload1_result| { - Ok(upload1_result.and(result)) - }) - }) - .flatten() + .then(move |result| async move { + upload_result.await?.and(result) + }.boxed()) .and_then(move |_| { let repeat = repeat2.load(Ordering::SeqCst); let stream_len = stream_len2.load(Ordering::SeqCst); @@ -1003,11 +1030,11 @@ impl BackupClient { let mut guard = index_csum_2.lock().unwrap(); let csum = guard.take().unwrap().finish(); - Ok((repeat, stream_len, speed, csum)) + futures::future::ok((repeat, stream_len, speed, csum)) }) } - pub fn upload_speedtest(&self) -> impl Future { + pub fn upload_speedtest(&self) -> impl Future> { let mut data = vec![]; // generate pseudo random byte sequence @@ -1020,7 +1047,7 @@ impl BackupClient { let item_len = data.len(); - let repeat = std::sync::Arc::new(AtomicUsize::new(0)); + let repeat = Arc::new(AtomicUsize::new(0)); let repeat2 = repeat.clone(); let (upload_queue, upload_result) = Self::response_queue(); @@ -1031,29 +1058,32 @@ impl BackupClient { futures::stream::repeat(data) .take_while(move |_| { - repeat.fetch_add(1, Ordering::SeqCst); - Ok(start_time.elapsed().as_secs() < 5) + let repeat = Arc::clone(&repeat); + async move { + repeat.fetch_add(1, Ordering::SeqCst); + start_time.elapsed().as_secs() < 5 + } }) - .for_each(move |data| { + .map(Ok) + .try_for_each(move |data| { let h2 = h2.clone(); - let upload_queue = upload_queue.clone(); + let mut upload_queue = upload_queue.clone(); println!("send test data ({} bytes)", data.len()); let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap(); h2.send_request(request, Some(bytes::Bytes::from(data))) - .and_then(move |response| { - upload_queue.send(response) - .map(|_| ()).map_err(Error::from) + .and_then(move |response| async move { + upload_queue + .send(response) + .await + .map_err(Error::from) }) }) - .then(move |result| { + .then(move |result| async move { println!("RESULT {:?}", result); - upload_result.map_err(Error::from).and_then(|upload1_result| { - Ok(upload1_result.and(result)) - }) + upload_result.await?.and(result) }) - .flatten() .and_then(move |_| { let repeat = repeat2.load(Ordering::SeqCst); println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs()); @@ -1061,7 +1091,7 @@ impl BackupClient { if repeat > 0 { println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); } - Ok(speed) + futures::future::ok(speed) }) } } @@ -1077,22 +1107,27 @@ impl H2Client { Self { h2 } } - pub fn get(&self, path: &str, param: Option) -> impl Future { + pub fn get(&self, path: &str, param: Option) -> impl Future> { let req = Self::request_builder("localhost", "GET", path, param).unwrap(); self.request(req) } - pub fn put(&self, path: &str, param: Option) -> impl Future { + pub fn put(&self, path: &str, param: Option) -> impl Future> { let req = Self::request_builder("localhost", "PUT", path, param).unwrap(); self.request(req) } - pub fn post(&self, path: &str, param: Option) -> impl Future { + pub fn post(&self, path: &str, param: Option) -> impl Future> { let req = Self::request_builder("localhost", "POST", path, param).unwrap(); self.request(req) } - pub fn download(&self, path: &str, param: Option, output: W) -> impl Future { + pub fn download( + &self, + path: &str, + param: Option, + output: W, + ) -> impl Future> { let request = Self::request_builder("localhost", "GET", path, param).unwrap(); self.send_request(request, None) @@ -1102,21 +1137,24 @@ impl H2Client { .and_then(move |resp| { let status = resp.status(); if !status.is_success() { - future::Either::A( + future::Either::Left( H2Client::h2api_response(resp) - .and_then(|_| { bail!("unknown error"); }) + .map(|_| Err(format_err!("unknown error"))) ) } else { let mut body = resp.into_body(); - let mut release_capacity = body.release_capacity().clone(); + let release_capacity = body.release_capacity().clone(); - future::Either::B( + future::Either::Right( body .map_err(Error::from) - .fold(output, move |mut acc, chunk| { - let _ = release_capacity.release_capacity(chunk.len()); - acc.write_all(&chunk)?; - Ok::<_, Error>(acc) + .try_fold(output, move |mut acc, chunk| { + let mut release_capacity = release_capacity.clone(); + async move { + let _ = release_capacity.release_capacity(chunk.len()); + acc.write_all(&chunk)?; + Ok::<_, Error>(acc) + } }) ) } @@ -1124,7 +1162,12 @@ impl H2Client { }) } - pub fn upload(&self, path: &str, param: Option, data: Vec) -> impl Future { + pub fn upload( + &self, + path: &str, + param: Option, + data: Vec, + ) -> impl Future> { let request = Self::request_builder("localhost", "POST", path, param).unwrap(); self.h2.clone() @@ -1144,7 +1187,7 @@ impl H2Client { fn request( &self, request: Request<()>, - ) -> impl Future { + ) -> impl Future> { self.send_request(request, None) .and_then(move |response| { @@ -1158,7 +1201,7 @@ impl H2Client { &self, request: Request<()>, data: Option, - ) -> impl Future { + ) -> impl Future> { self.h2.clone() .ready() @@ -1166,19 +1209,20 @@ impl H2Client { .and_then(move |mut send_request| { if let Some(data) = data { let (response, stream) = send_request.send_request(request, false).unwrap(); - future::Either::A(PipeToSendStream::new(data, stream) + future::Either::Left(PipeToSendStream::new(data, stream) .and_then(move |_| { future::ok(response) })) } else { let (response, _stream) = send_request.send_request(request, true).unwrap(); - future::Either::B(future::ok(response)) + future::Either::Right(future::ok(response)) } }) } - fn h2api_response(response: Response) -> impl Future { - + fn h2api_response( + response: Response, + ) -> impl Future> { let status = response.status(); let (_head, mut body) = response.into_parts(); @@ -1192,14 +1236,14 @@ impl H2Client { let mut release_capacity = body.release_capacity().clone(); body - .map(move |chunk| { + .map_ok(move |chunk| { // Let the server send more data. let _ = release_capacity.release_capacity(chunk.len()); chunk }) - .concat2() + .try_concat() .map_err(Error::from) - .and_then(move |data| { + .and_then(move |data| async move { let text = String::from_utf8(data.to_vec()).unwrap(); if status.is_success() { if text.len() > 0 { @@ -1216,7 +1260,7 @@ impl H2Client { } else { bail!("HTTP Error {}: {}", status, text); } - }) + }.boxed()) } // Note: We always encode parameters with the url