diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index 67e726f8..9f1ed329 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -103,6 +103,9 @@ fn upgrade_to_backup_protocol( let mut http = hyper::server::conn::Http::new(); http.http2_only(true); + // increase window size: todo - find optiomal size + http.http2_initial_stream_window_size( (1 << 31) - 2); + http.http2_initial_connection_window_size( (1 << 31) - 2); http.serve_connection(conn, service) .map_err(Error::from) @@ -174,6 +177,10 @@ fn backup_api() -> Router { ) ) ) + .subdir( + "speedtest", Router::new() + .upload(api_method_upload_speedtest()) + ) .subdir("test1", test1) .subdir("test2", test2) .list_subdirs(); diff --git a/src/api2/admin/datastore/backup/upload_chunk.rs b/src/api2/admin/datastore/backup/upload_chunk.rs index 0507809b..3ac02ee9 100644 --- a/src/api2/admin/datastore/backup/upload_chunk.rs +++ b/src/api2/admin/datastore/backup/upload_chunk.rs @@ -99,3 +99,41 @@ fn upload_dynamic_chunk( Ok(Box::new(resp)) } + +pub fn api_method_upload_speedtest() -> ApiAsyncMethod { + ApiAsyncMethod::new( + upload_speedtest, + ObjectSchema::new("Test uploadf speed.") + ) +} + +fn upload_speedtest( + _parts: Parts, + req_body: Body, + param: Value, + _info: &ApiAsyncMethod, + rpcenv: Box, +) -> Result { + + let resp = req_body + .map_err(Error::from) + .fold(0, |size: usize, chunk| -> Result { + let sum = size + chunk.len(); + //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum); + Ok(sum) + }) + .then(move |result| { + match result { + Ok(size) => { + println!("UPLOAD END {} bytes", size); + } + Err(err) => { + println!("Upload error: {}", err); + } + } + let env: &BackupEnvironment = rpcenv.as_ref(); + Ok(env.format_response(Ok(Value::Null))) + }); + + Ok(Box::new(resp)) +} diff --git a/src/bin/upload-speed.rs b/src/bin/upload-speed.rs new file mode 100644 index 00000000..d22c66a2 --- /dev/null +++ b/src/bin/upload-speed.rs @@ -0,0 +1,43 @@ +use failure::*; +use futures::*; +use serde_json::json; + +use proxmox_backup::client::*; + +fn upload_speed() -> Result { + + let host = "localhost"; + let datastore = "store2"; + + let username = "root@pam"; + + let mut client = HttpClient::new(host, username)?; + + let param = json!({"backup-type": "host", "backup-id": "speedtest" }); + let upgrade = client.h2upgrade(&format!("/api2/json/admin/datastore/{}/backup", datastore), Some(param)); + + let res = upgrade.and_then(|h2| { + println!("start upload speed test"); + h2.upload_speedtest() + }).wait()?; + + Ok(res) +} + +fn main() { + + let mut rt = tokio::runtime::Runtime::new().unwrap(); + + // should be rt.block_on_all, but this block forever in release builds + let _ = rt.block_on(futures::future::lazy(move || -> Result<(), ()> { + match upload_speed() { + Ok(mbs) => { + println!("average upload speed: {} MB/s", mbs); + } + Err(err) => { + eprintln!("ERROR: {}", err); + } + } + Ok(()) + })); +} diff --git a/src/client/http_client.rs b/src/client/http_client.rs index c27d5932..3abae83c 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -153,10 +153,14 @@ impl HttpClient { builder.danger_accept_invalid_certs(true); let tlsconnector = builder.build().unwrap(); let mut httpc = hyper::client::HttpConnector::new(1); + httpc.set_nodelay(true); // important! httpc.enforce_http(false); // we want https... let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector)); https.https_only(true); // force it! - Client::builder().build::<_, Body>(https) + Client::builder() + //.http2_initial_stream_window_size( (1 << 31) - 2) + //.http2_initial_connection_window_size( (1 << 31) - 2) + .build::<_, Body>(https) } pub fn request(&self, mut req: Request) -> impl Future { @@ -425,6 +429,49 @@ impl H2Client { }) } + pub fn upload_speedtest(&self) -> impl Future { + + self.h2.clone() + .ready() + .map_err(Error::from) + .and_then(move |mut send_request| { + + let mut data = vec![]; + // generate pseudo random byte sequence + for i in 0..1024*1024 { + for j in 0..4 { + let byte = ((i >> (j<<3))&0xff) as u8; + data.push(byte); + } + } + + let item_len = data.len(); + let repeat = 100; + + let start = std::time::SystemTime::now(); + + futures::stream::repeat(data) + .take(repeat) + .for_each(move |data| { + let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap(); + let (response, stream) = send_request.send_request(request, false).unwrap(); + println!("send test data ({} bytes)", data.len()); + PipeToSendStream::new(bytes::Bytes::from(data), stream) + .and_then(|_| { + response + .map_err(Error::from) + .and_then(Self::h2api_response) + .and_then(|_| Ok(())) + }) + }) + .and_then(move |_| { + let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start.elapsed()?.as_micros() as usize); + println!("time per request: {} microseconds", (start.elapsed()?.as_micros())/(repeat as u128)); + Ok(speed) + }) + }) + } + fn request( &self, request: Request<()>, diff --git a/src/client/pipe_to_stream.rs b/src/client/pipe_to_stream.rs index 14c34be7..4a972f29 100644 --- a/src/client/pipe_to_stream.rs +++ b/src/client/pipe_to_stream.rs @@ -29,9 +29,9 @@ impl Future for PipeToSendStream { fn poll(&mut self) -> Poll { loop { if self.data != None { - // we don't have the next chunk of data yet, so just reserve 1 byte to make - // sure there's some capacity available. h2 will handle the capacity management - // for the actual body chunk. + // just reserve 1 byte to make sure there's some + // capacity available. h2 will handle the capacity + // management for the actual body chunk. self.body_tx.reserve_capacity(1); if self.body_tx.capacity() == 0 {