From 82ab72304efd651d2afa771c499d4bcec2787f64 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Mon, 20 May 2019 14:19:24 +0200 Subject: [PATCH] src/client/http_client.rs: implement upload_stream --- src/api2/admin/datastore/backup.rs | 35 +++ .../admin/datastore/backup/upload_chunk.rs | 2 +- src/client/http_client.rs | 250 +++++++++++++----- 3 files changed, 213 insertions(+), 74 deletions(-) diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index 9f1ed329..05332600 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -163,6 +163,7 @@ fn backup_api() -> Router { "dynamic_index", Router::new() .download(api_method_dynamic_chunk_index()) .post(api_method_create_dynamic_index()) + .put(api_method_dynamic_append()) ) .subdir( "dynamic_close", Router::new() @@ -237,6 +238,40 @@ fn create_dynamic_index( Ok(json!(wid)) } +pub fn api_method_dynamic_append() -> ApiMethod { + ApiMethod::new( + dynamic_append, + ObjectSchema::new("Append chunk to dynamic index writer.") + .required("digest", StringSchema::new("Chunk digest.")) + .required("wid", IntegerSchema::new("Dynamic writer ID.") + .minimum(1) + .maximum(256) + ) + ) +} + +fn dynamic_append ( + param: Value, + _info: &ApiMethod, + rpcenv: &mut RpcEnvironment, +) -> Result { + + let wid = tools::required_integer_param(¶m, "wid")? as usize; + let digest_str = tools::required_string_param(¶m, "digest")?; + + let env: &BackupEnvironment = rpcenv.as_ref(); + + let _size = 0; + let _digest = crate::tools::hex_to_digest(digest_str)?; + + // fixme: lookup digest and chunk size, then add + //env.dynamic_writer_append_chunk(wid, size, &digest)?; + + env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid)); + + Ok(Value::Null) +} + pub fn api_method_close_dynamic_index() -> ApiMethod { ApiMethod::new( close_dynamic_index, diff --git a/src/api2/admin/datastore/backup/upload_chunk.rs b/src/api2/admin/datastore/backup/upload_chunk.rs index 3ac02ee9..075623e3 100644 --- a/src/api2/admin/datastore/backup/upload_chunk.rs +++ b/src/api2/admin/datastore/backup/upload_chunk.rs @@ -110,7 +110,7 @@ pub fn api_method_upload_speedtest() -> ApiAsyncMethod { fn upload_speedtest( _parts: Parts, req_body: Body, - param: Value, + _param: Value, _info: &ApiAsyncMethod, rpcenv: Box, ) -> Result { diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 927b3ac1..1053e45a 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -9,8 +9,10 @@ use chrono::Utc; use http::{Request, Response}; use http::header::HeaderValue; -use futures::Future; +use futures::*; use futures::stream::Stream; +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::sync::mpsc; use serde_json::{json, Value}; use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET}; @@ -407,6 +409,11 @@ impl H2Client { self.request(req) } + pub fn put(&self, path: &str, param: Option) -> impl Future { + let req = Self::request_builder("localhost", "PUT", path, param).unwrap(); + self.request(req) + } + pub fn post(&self, path: &str, param: Option) -> impl Future { let req = Self::request_builder("localhost", "POST", path, param).unwrap(); self.request(req) @@ -429,83 +436,160 @@ impl H2Client { }) } - pub fn upload_speedtest(&self) -> impl Future { + fn response_queue(self) -> ( + mpsc::Sender, + sync::oneshot::Receiver> + ) { + let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); + let (verify_result_tx, verify_result_rx) = sync::oneshot::channel(); - self.h2.clone() - .ready() - .map_err(Error::from) - .and_then(move |mut send_request| { - - let mut data = vec![]; - // generate pseudo random byte sequence - for i in 0..1024*1024 { - for j in 0..4 { - let byte = ((i >> (j<<3))&0xff) as u8; - data.push(byte); - } - } - - let item_len = data.len(); - - use std::sync::atomic::{AtomicUsize, Ordering}; - - let repeat = std::sync::Arc::new(AtomicUsize::new(0)); - let repeat2 = repeat.clone(); - - use tokio::sync::mpsc; - use futures::*; - - let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100); - let (verify_result_tx, verify_result_rx) = sync::oneshot::channel(); - - hyper::rt::spawn( - verify_queue_rx + hyper::rt::spawn( + verify_queue_rx + .map_err(Error::from) + .for_each(|response: h2::client::ResponseFuture| { + response .map_err(Error::from) - .and_then(|response: h2::client::ResponseFuture| { - response - .map_err(Error::from) - .and_then(Self::h2api_response) - .map_err(|err| format_err!("speedtest chunk upload failed: {}", err)) - }) - .for_each(|result| { - //println!("response: {:?}", result); + .and_then(Self::h2api_response) + .and_then(|result| { + println!("RESPONSE: {:?}", result); Ok(()) }) - .then(|result| verify_result_tx.send(result)) - .map_err(|_| { /* ignore closed channel */ }) - ); + .map_err(|err| format_err!("pipelined request failed: {}", err)) + }) + .then(|result| + verify_result_tx.send(result) + ) + .map_err(|_| { /* ignore closed channel */ }) + ); - let start_time = std::time::Instant::now(); + (verify_queue_tx, verify_result_rx) + } - futures::stream::repeat(data) - .take_while(move |_| { - repeat.fetch_add(1, Ordering::SeqCst); - Ok(start_time.elapsed().as_secs() < 5) + pub fn upload_stream( + &self, + wid: u64, + stream: impl Stream, Error=Error>, + ) -> impl Future { + + let repeat = std::sync::Arc::new(AtomicUsize::new(0)); + let repeat2 = repeat.clone(); + + let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); + let stream_len2 = stream_len.clone(); + + let (upload_queue, upload_result) = self.clone().response_queue(); + + let start_time = std::time::Instant::now(); + + let self2 = self.clone(); + + stream + .for_each(move |data| { + repeat.fetch_add(1, Ordering::SeqCst); + stream_len.fetch_add(data.len(), Ordering::SeqCst); + + let upload_queue = upload_queue.clone(); + + let digest = openssl::sha::sha256(&data); + + let chunk_is_known = false; + + let upload_data; + let request; + + println!("upload chunk ({} bytes)", data.len()); + + if chunk_is_known { + let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) }); + request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); + upload_data = None; + } else { + let param = json!({ "wid": wid, "size" : data.len() }); + request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); + upload_data = Some(bytes::Bytes::from(data)); + } + + self2.send_request(request, upload_data) + .and_then(move |response| { + upload_queue.send(response) + .map(|_| ()).map_err(Error::from) }) - .for_each(move |data| { - let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap(); - let (response, stream) = send_request.send_request(request, false).unwrap(); - //println!("send test data ({} bytes)", data.len()); - let verify_queue_tx = verify_queue_tx.clone(); - PipeToSendStream::new(bytes::Bytes::from(data), stream) - .and_then(move |_| { - verify_queue_tx.send(response).map_err(Error::from).map(|_| ()) - }) - }) - .then(move |result| { - verify_result_rx.map_err(Error::from).and_then(|verify_result| { - Ok(verify_result.and(result)) - }) - }) - .flatten() - .and_then(move |_| { - let repeat = repeat2.load(Ordering::SeqCst); - println!("Uploaded {} chunks in {} seconds", repeat, start_time.elapsed().as_secs()); - let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize); - println!("time per request: {} microseconds", (start_time.elapsed().as_micros())/(repeat as u128)); - Ok(speed) + }) + .then(move |result| { + println!("RESULT {:?}", result); + upload_result.map_err(Error::from).and_then(|upload1_result| { + Ok(upload1_result.and(result)) + }) + }) + .flatten() + .and_then(move |_| { + let repeat = repeat2.load(Ordering::SeqCst); + let stream_len = stream_len2.load(Ordering::SeqCst); + let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize); + println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed); + if repeat > 0 { + println!("Average chunk size was {} bytes.", stream_len/repeat); + println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); + } + Ok(speed) + }) + } + + pub fn upload_speedtest(&self) -> impl Future { + + let mut data = vec![]; + // generate pseudo random byte sequence + for i in 0..1024*1024 { + for j in 0..4 { + let byte = ((i >> (j<<3))&0xff) as u8; + data.push(byte); + } + } + + let item_len = data.len(); + + let repeat = std::sync::Arc::new(AtomicUsize::new(0)); + let repeat2 = repeat.clone(); + + let (upload_queue, upload_result) = self.clone().response_queue(); + + let start_time = std::time::Instant::now(); + + let self2 = self.clone(); + + futures::stream::repeat(data) + .take_while(move |_| { + repeat.fetch_add(1, Ordering::SeqCst); + Ok(start_time.elapsed().as_secs() < 5) + }) + .for_each(move |data| { + + let upload_queue = upload_queue.clone(); + + println!("send test data ({} bytes)", data.len()); + let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap(); + self2.send_request(request, Some(bytes::Bytes::from(data))) + .and_then(move |response| { + upload_queue.send(response) + .map(|_| ()).map_err(Error::from) }) }) + .then(move |result| { + println!("RESULT {:?}", result); + upload_result.map_err(Error::from).and_then(|upload1_result| { + Ok(upload1_result.and(result)) + }) + }) + .flatten() + .and_then(move |_| { + let repeat = repeat2.load(Ordering::SeqCst); + println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs()); + let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize); + if repeat > 0 { + println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); + } + Ok(speed) + }) } fn request( @@ -513,14 +597,34 @@ impl H2Client { request: Request<()>, ) -> impl Future { + self.send_request(request, None) + .and_then(move |response| { + response + .map_err(Error::from) + .and_then(Self::h2api_response) + }) + } + + fn send_request( + &self, + request: Request<()>, + data: Option, + ) -> impl Future { + self.h2.clone() .ready() .map_err(Error::from) .and_then(move |mut send_request| { - let (response, _stream) = send_request.send_request(request, true).unwrap(); - response - .map_err(Error::from) - .and_then(Self::h2api_response) + if let Some(data) = data { + let (response, stream) = send_request.send_request(request, false).unwrap(); + future::Either::A(PipeToSendStream::new(bytes::Bytes::from(data), stream) + .and_then(move |_| { + future::ok(response) + })) + } else { + let (response, _stream) = send_request.send_request(request, true).unwrap(); + future::Either::B(future::ok(response)) + } }) }