diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 3abae83c..927b3ac1 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -153,7 +153,7 @@ impl HttpClient { builder.danger_accept_invalid_certs(true); let tlsconnector = builder.build().unwrap(); let mut httpc = hyper::client::HttpConnector::new(1); - httpc.set_nodelay(true); // important! + //httpc.set_nodelay(true); // not sure if this help? httpc.enforce_http(false); // we want https... let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector)); https.https_only(true); // force it! @@ -446,27 +446,63 @@ impl H2Client { } let item_len = data.len(); - let repeat = 100; - let start = std::time::SystemTime::now(); + use std::sync::atomic::{AtomicUsize, Ordering}; + + let repeat = std::sync::Arc::new(AtomicUsize::new(0)); + let repeat2 = repeat.clone(); + + use tokio::sync::mpsc; + use futures::*; + + let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); + let (verify_result_tx, verify_result_rx) = sync::oneshot::channel(); + + hyper::rt::spawn( + verify_queue_rx + .map_err(Error::from) + .and_then(|response: h2::client::ResponseFuture| { + response + .map_err(Error::from) + .and_then(Self::h2api_response) + .map_err(|err| format_err!("speedtest chunk upload failed: {}", err)) + }) + .for_each(|result| { + //println!("response: {:?}", result); + Ok(()) + }) + .then(|result| verify_result_tx.send(result)) + .map_err(|_| { /* ignore closed channel */ }) + ); + + let start_time = std::time::Instant::now(); futures::stream::repeat(data) - .take(repeat) + .take_while(move |_| { + repeat.fetch_add(1, Ordering::SeqCst); + Ok(start_time.elapsed().as_secs() < 5) + }) .for_each(move |data| { let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap(); let (response, stream) = send_request.send_request(request, false).unwrap(); - println!("send test data ({} bytes)", data.len()); + //println!("send test data ({} bytes)", data.len()); + let verify_queue_tx = verify_queue_tx.clone(); PipeToSendStream::new(bytes::Bytes::from(data), stream) - .and_then(|_| { - response - .map_err(Error::from) - .and_then(Self::h2api_response) - .and_then(|_| Ok(())) + .and_then(move |_| { + verify_queue_tx.send(response).map_err(Error::from).map(|_| ()) }) }) + .then(move |result| { + verify_result_rx.map_err(Error::from).and_then(|verify_result| { + Ok(verify_result.and(result)) + }) + }) + .flatten() .and_then(move |_| { - let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start.elapsed()?.as_micros() as usize); - println!("time per request: {} microseconds", (start.elapsed()?.as_micros())/(repeat as u128)); + let repeat = repeat2.load(Ordering::SeqCst); + println!("Uploaded {} chunks in {} seconds", repeat, start_time.elapsed().as_secs()); + let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize); + println!("time per request: {} microseconds", (start_time.elapsed().as_micros())/(repeat as u128)); Ok(speed) }) })