diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 6b2be5fc..b3d5d779 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -993,7 +993,7 @@ impl BackupClient { }) } - pub fn upload_speedtest(&self) -> impl Future> { + pub async fn upload_speedtest(&self) -> Result { let mut data = vec![]; // generate pseudo random byte sequence @@ -1006,52 +1006,36 @@ impl BackupClient { let item_len = data.len(); - let repeat = Arc::new(AtomicUsize::new(0)); - let repeat2 = repeat.clone(); + let mut repeat = 0; let (upload_queue, upload_result) = Self::response_queue(); let start_time = std::time::Instant::now(); - let h2 = self.h2.clone(); + loop { + repeat += 1; + if start_time.elapsed().as_secs() >= 5 { + break; + } - futures::stream::repeat(data) - .take_while(move |_| { - let repeat = Arc::clone(&repeat); - async move { - repeat.fetch_add(1, Ordering::SeqCst); - start_time.elapsed().as_secs() < 5 - } - }) - .map(Ok) - .try_for_each(move |data| { - let h2 = h2.clone(); + let mut upload_queue = upload_queue.clone(); - let mut upload_queue = upload_queue.clone(); + println!("send test data ({} bytes)", data.len()); + let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap(); + let request_future = self.h2.send_request(request, Some(bytes::Bytes::from(data.clone()))).await?; - println!("send test data ({} bytes)", data.len()); - let request = H2Client::request_builder("localhost", "POST", "speedtest", None).unwrap(); - h2.send_request(request, Some(bytes::Bytes::from(data))) - .and_then(move |response| async move { - upload_queue - .send(response) - .await - .map_err(Error::from) - }) - }) - .then(move |result| async move { - println!("RESULT {:?}", result); - upload_result.await?.and(result) - }) - .and_then(move |_| { - let repeat = repeat2.load(Ordering::SeqCst); - println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs()); - let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize); - if repeat > 0 { - println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); - } - futures::future::ok(speed) - }) + upload_queue.send(request_future).await?; + } + + drop(upload_queue); // close queue + + let _ = upload_result.await?; + + println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs()); + let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize); + println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); + + Ok(speed) } }