src/client/http_client.rs: improve speed test

pipeline responses to a separate async channel ...
This commit is contained in:
Dietmar Maurer 2019-05-17 11:51:14 +02:00
parent 8bf7342c92
commit 9c9ad941c0
1 changed files with 48 additions and 12 deletions

View File

@ -153,7 +153,7 @@ impl HttpClient {
builder.danger_accept_invalid_certs(true);
let tlsconnector = builder.build().unwrap();
let mut httpc = hyper::client::HttpConnector::new(1);
httpc.set_nodelay(true); // important!
//httpc.set_nodelay(true); // not sure if this help?
httpc.enforce_http(false); // we want https...
let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
https.https_only(true); // force it!
@ -446,27 +446,63 @@ impl H2Client {
}
let item_len = data.len();
let repeat = 100;
let start = std::time::SystemTime::now();
use std::sync::atomic::{AtomicUsize, Ordering};
futures::stream::repeat(data)
.take(repeat)
.for_each(move |data| {
let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
let (response, stream) = send_request.send_request(request, false).unwrap();
println!("send test data ({} bytes)", data.len());
PipeToSendStream::new(bytes::Bytes::from(data), stream)
.and_then(|_| {
let repeat = std::sync::Arc::new(AtomicUsize::new(0));
let repeat2 = repeat.clone();
use tokio::sync::mpsc;
use futures::*;
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
hyper::rt::spawn(
verify_queue_rx
.map_err(Error::from)
.and_then(|response: h2::client::ResponseFuture| {
response
.map_err(Error::from)
.and_then(Self::h2api_response)
.and_then(|_| Ok(()))
.map_err(|err| format_err!("speedtest chunk upload failed: {}", err))
})
.for_each(|result| {
//println!("response: {:?}", result);
Ok(())
})
.then(|result| verify_result_tx.send(result))
.map_err(|_| { /* ignore closed channel */ })
);
let start_time = std::time::Instant::now();
futures::stream::repeat(data)
.take_while(move |_| {
repeat.fetch_add(1, Ordering::SeqCst);
Ok(start_time.elapsed().as_secs() < 5)
})
.for_each(move |data| {
let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
let (response, stream) = send_request.send_request(request, false).unwrap();
//println!("send test data ({} bytes)", data.len());
let verify_queue_tx = verify_queue_tx.clone();
PipeToSendStream::new(bytes::Bytes::from(data), stream)
.and_then(move |_| {
let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start.elapsed()?.as_micros() as usize);
println!("time per request: {} microseconds", (start.elapsed()?.as_micros())/(repeat as u128));
verify_queue_tx.send(response).map_err(Error::from).map(|_| ())
})
})
.then(move |result| {
verify_result_rx.map_err(Error::from).and_then(|verify_result| {
Ok(verify_result.and(result))
})
})
.flatten()
.and_then(move |_| {
let repeat = repeat2.load(Ordering::SeqCst);
println!("Uploaded {} chunks in {} seconds", repeat, start_time.elapsed().as_secs());
let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
println!("time per request: {} microseconds", (start_time.elapsed().as_micros())/(repeat as u128));
Ok(speed)
})
})