src/api2/admin/datastore/backup.rs: add speedtest api, improve upload speed
We need to disable tcp Nagle algorythm (set_nodelay), and use larger window size for http2
This commit is contained in:
		@ -103,6 +103,9 @@ fn upgrade_to_backup_protocol(
 | 
			
		||||
 | 
			
		||||
                let mut http = hyper::server::conn::Http::new();
 | 
			
		||||
                http.http2_only(true);
 | 
			
		||||
                // increase window size: todo - find optiomal size
 | 
			
		||||
                http.http2_initial_stream_window_size( (1 << 31) - 2);
 | 
			
		||||
                http.http2_initial_connection_window_size( (1 << 31) - 2);
 | 
			
		||||
 | 
			
		||||
                http.serve_connection(conn, service)
 | 
			
		||||
                    .map_err(Error::from)
 | 
			
		||||
@ -174,6 +177,10 @@ fn backup_api() -> Router {
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
        )
 | 
			
		||||
        .subdir(
 | 
			
		||||
            "speedtest", Router::new()
 | 
			
		||||
                .upload(api_method_upload_speedtest())
 | 
			
		||||
        )
 | 
			
		||||
        .subdir("test1", test1)
 | 
			
		||||
        .subdir("test2", test2)
 | 
			
		||||
        .list_subdirs();
 | 
			
		||||
 | 
			
		||||
@ -99,3 +99,41 @@ fn upload_dynamic_chunk(
 | 
			
		||||
 | 
			
		||||
    Ok(Box::new(resp))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn api_method_upload_speedtest() -> ApiAsyncMethod {
 | 
			
		||||
    ApiAsyncMethod::new(
 | 
			
		||||
        upload_speedtest,
 | 
			
		||||
        ObjectSchema::new("Test uploadf speed.")
 | 
			
		||||
    )
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn upload_speedtest(
 | 
			
		||||
    _parts: Parts,
 | 
			
		||||
    req_body: Body,
 | 
			
		||||
    param: Value,
 | 
			
		||||
    _info: &ApiAsyncMethod,
 | 
			
		||||
    rpcenv: Box<RpcEnvironment>,
 | 
			
		||||
) -> Result<BoxFut, Error> {
 | 
			
		||||
 | 
			
		||||
    let resp = req_body
 | 
			
		||||
        .map_err(Error::from)
 | 
			
		||||
        .fold(0, |size: usize, chunk| -> Result<usize, Error> {
 | 
			
		||||
            let sum = size + chunk.len();
 | 
			
		||||
            //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
 | 
			
		||||
            Ok(sum)
 | 
			
		||||
        })
 | 
			
		||||
        .then(move |result| {
 | 
			
		||||
            match result {
 | 
			
		||||
                Ok(size) => {
 | 
			
		||||
                    println!("UPLOAD END {} bytes", size);
 | 
			
		||||
                }
 | 
			
		||||
                Err(err) => {
 | 
			
		||||
                    println!("Upload error: {}", err);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            let env: &BackupEnvironment = rpcenv.as_ref();
 | 
			
		||||
            Ok(env.format_response(Ok(Value::Null)))
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
    Ok(Box::new(resp))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										43
									
								
								src/bin/upload-speed.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										43
									
								
								src/bin/upload-speed.rs
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,43 @@
 | 
			
		||||
use failure::*;
 | 
			
		||||
use futures::*;
 | 
			
		||||
use serde_json::json;
 | 
			
		||||
 | 
			
		||||
use proxmox_backup::client::*;
 | 
			
		||||
 | 
			
		||||
fn upload_speed() -> Result<usize, Error> {
 | 
			
		||||
 | 
			
		||||
    let host = "localhost";
 | 
			
		||||
    let datastore = "store2";
 | 
			
		||||
 | 
			
		||||
    let username = "root@pam";
 | 
			
		||||
 | 
			
		||||
    let mut client = HttpClient::new(host, username)?;
 | 
			
		||||
 | 
			
		||||
    let param = json!({"backup-type": "host", "backup-id": "speedtest" });
 | 
			
		||||
    let upgrade = client.h2upgrade(&format!("/api2/json/admin/datastore/{}/backup", datastore), Some(param));
 | 
			
		||||
 | 
			
		||||
    let res = upgrade.and_then(|h2| {
 | 
			
		||||
        println!("start upload speed test");
 | 
			
		||||
        h2.upload_speedtest()
 | 
			
		||||
    }).wait()?;
 | 
			
		||||
 | 
			
		||||
    Ok(res)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn main()  {
 | 
			
		||||
 | 
			
		||||
    let mut rt = tokio::runtime::Runtime::new().unwrap();
 | 
			
		||||
 | 
			
		||||
    // should be rt.block_on_all, but this block forever in release builds
 | 
			
		||||
    let _ = rt.block_on(futures::future::lazy(move || -> Result<(), ()> {
 | 
			
		||||
        match upload_speed() {
 | 
			
		||||
            Ok(mbs) => {
 | 
			
		||||
                println!("average upload speed: {} MB/s", mbs);
 | 
			
		||||
            }
 | 
			
		||||
            Err(err) => {
 | 
			
		||||
                eprintln!("ERROR: {}", err);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }));
 | 
			
		||||
}
 | 
			
		||||
@ -153,10 +153,14 @@ impl HttpClient {
 | 
			
		||||
        builder.danger_accept_invalid_certs(true);
 | 
			
		||||
        let tlsconnector = builder.build().unwrap();
 | 
			
		||||
        let mut httpc = hyper::client::HttpConnector::new(1);
 | 
			
		||||
        httpc.set_nodelay(true); // important!
 | 
			
		||||
        httpc.enforce_http(false); // we want https...
 | 
			
		||||
        let mut https = hyper_tls::HttpsConnector::from((httpc, tlsconnector));
 | 
			
		||||
        https.https_only(true); // force it!
 | 
			
		||||
        Client::builder().build::<_, Body>(https)
 | 
			
		||||
        Client::builder()
 | 
			
		||||
        //.http2_initial_stream_window_size( (1 << 31) - 2)
 | 
			
		||||
        //.http2_initial_connection_window_size( (1 << 31) - 2)
 | 
			
		||||
            .build::<_, Body>(https)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn request(&self, mut req: Request<Body>) -> impl Future<Item=Value, Error=Error>  {
 | 
			
		||||
@ -425,6 +429,49 @@ impl H2Client {
 | 
			
		||||
            })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn upload_speedtest(&self) -> impl Future<Item=usize, Error=Error> {
 | 
			
		||||
 | 
			
		||||
        self.h2.clone()
 | 
			
		||||
            .ready()
 | 
			
		||||
            .map_err(Error::from)
 | 
			
		||||
            .and_then(move |mut send_request| {
 | 
			
		||||
 | 
			
		||||
                let mut data = vec![];
 | 
			
		||||
                // generate pseudo random byte sequence
 | 
			
		||||
                for i in 0..1024*1024 {
 | 
			
		||||
                    for j in 0..4 {
 | 
			
		||||
                        let byte = ((i >> (j<<3))&0xff) as u8;
 | 
			
		||||
                        data.push(byte);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                let item_len = data.len();
 | 
			
		||||
                let repeat = 100;
 | 
			
		||||
 | 
			
		||||
                let start = std::time::SystemTime::now();
 | 
			
		||||
 | 
			
		||||
                futures::stream::repeat(data)
 | 
			
		||||
                    .take(repeat)
 | 
			
		||||
                    .for_each(move |data| {
 | 
			
		||||
                        let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
 | 
			
		||||
                        let (response, stream) = send_request.send_request(request, false).unwrap();
 | 
			
		||||
                        println!("send test data ({} bytes)", data.len());
 | 
			
		||||
                        PipeToSendStream::new(bytes::Bytes::from(data), stream)
 | 
			
		||||
                            .and_then(|_| {
 | 
			
		||||
                                response
 | 
			
		||||
                                    .map_err(Error::from)
 | 
			
		||||
                                    .and_then(Self::h2api_response)
 | 
			
		||||
                                    .and_then(|_| Ok(()))
 | 
			
		||||
                            })
 | 
			
		||||
                    })
 | 
			
		||||
                    .and_then(move |_| {
 | 
			
		||||
                        let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start.elapsed()?.as_micros() as usize);
 | 
			
		||||
                        println!("time per request: {} microseconds", (start.elapsed()?.as_micros())/(repeat as u128));
 | 
			
		||||
                        Ok(speed)
 | 
			
		||||
                    })
 | 
			
		||||
            })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn request(
 | 
			
		||||
        &self,
 | 
			
		||||
        request: Request<()>,
 | 
			
		||||
 | 
			
		||||
@ -29,9 +29,9 @@ impl Future for PipeToSendStream {
 | 
			
		||||
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 | 
			
		||||
        loop {
 | 
			
		||||
            if self.data != None {
 | 
			
		||||
                // we don't have the next chunk of data yet, so just reserve 1 byte to make
 | 
			
		||||
                // sure there's some capacity available. h2 will handle the capacity management
 | 
			
		||||
                // for the actual body chunk.
 | 
			
		||||
                // just reserve 1 byte to make sure there's some
 | 
			
		||||
                // capacity available. h2 will handle the capacity
 | 
			
		||||
                // management for the actual body chunk.
 | 
			
		||||
                self.body_tx.reserve_capacity(1);
 | 
			
		||||
 | 
			
		||||
                if self.body_tx.capacity() == 0 {
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user