src/client/http_client.rs: implement upload_stream

This commit is contained in:
Dietmar Maurer 2019-05-20 14:19:24 +02:00
parent 2698e8a514
commit 82ab72304e
3 changed files with 213 additions and 74 deletions

View File

@ -163,6 +163,7 @@ fn backup_api() -> Router {
"dynamic_index", Router::new()
.download(api_method_dynamic_chunk_index())
.post(api_method_create_dynamic_index())
.put(api_method_dynamic_append())
)
.subdir(
"dynamic_close", Router::new()
@ -237,6 +238,40 @@ fn create_dynamic_index(
Ok(json!(wid))
}
pub fn api_method_dynamic_append() -> ApiMethod {
ApiMethod::new(
dynamic_append,
ObjectSchema::new("Append chunk to dynamic index writer.")
.required("digest", StringSchema::new("Chunk digest."))
.required("wid", IntegerSchema::new("Dynamic writer ID.")
.minimum(1)
.maximum(256)
)
)
}
fn dynamic_append (
param: Value,
_info: &ApiMethod,
rpcenv: &mut RpcEnvironment,
) -> Result<Value, Error> {
let wid = tools::required_integer_param(&param, "wid")? as usize;
let digest_str = tools::required_string_param(&param, "digest")?;
let env: &BackupEnvironment = rpcenv.as_ref();
let _size = 0;
let _digest = crate::tools::hex_to_digest(digest_str)?;
// fixme: lookup digest and chunk size, then add
//env.dynamic_writer_append_chunk(wid, size, &digest)?;
env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid));
Ok(Value::Null)
}
pub fn api_method_close_dynamic_index() -> ApiMethod {
ApiMethod::new(
close_dynamic_index,

View File

@ -110,7 +110,7 @@ pub fn api_method_upload_speedtest() -> ApiAsyncMethod {
fn upload_speedtest(
_parts: Parts,
req_body: Body,
param: Value,
_param: Value,
_info: &ApiAsyncMethod,
rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> {

View File

@ -9,8 +9,10 @@ use chrono::Utc;
use http::{Request, Response};
use http::header::HeaderValue;
use futures::Future;
use futures::*;
use futures::stream::Stream;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc;
use serde_json::{json, Value};
use url::percent_encoding::{percent_encode, DEFAULT_ENCODE_SET};
@ -407,6 +409,11 @@ impl H2Client {
self.request(req)
}
pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
let req = Self::request_builder("localhost", "PUT", path, param).unwrap();
self.request(req)
}
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
let req = Self::request_builder("localhost", "POST", path, param).unwrap();
self.request(req)
@ -429,83 +436,160 @@ impl H2Client {
})
}
pub fn upload_speedtest(&self) -> impl Future<Item=usize, Error=Error> {
fn response_queue(self) -> (
mpsc::Sender<h2::client::ResponseFuture>,
sync::oneshot::Receiver<Result<(), Error>>
) {
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
self.h2.clone()
.ready()
.map_err(Error::from)
.and_then(move |mut send_request| {
let mut data = vec![];
// generate pseudo random byte sequence
for i in 0..1024*1024 {
for j in 0..4 {
let byte = ((i >> (j<<3))&0xff) as u8;
data.push(byte);
}
}
let item_len = data.len();
use std::sync::atomic::{AtomicUsize, Ordering};
let repeat = std::sync::Arc::new(AtomicUsize::new(0));
let repeat2 = repeat.clone();
use tokio::sync::mpsc;
use futures::*;
let (verify_queue_tx, verify_queue_rx) = mpsc::channel(100);
let (verify_result_tx, verify_result_rx) = sync::oneshot::channel();
hyper::rt::spawn(
verify_queue_rx
hyper::rt::spawn(
verify_queue_rx
.map_err(Error::from)
.for_each(|response: h2::client::ResponseFuture| {
response
.map_err(Error::from)
.and_then(|response: h2::client::ResponseFuture| {
response
.map_err(Error::from)
.and_then(Self::h2api_response)
.map_err(|err| format_err!("speedtest chunk upload failed: {}", err))
})
.for_each(|result| {
//println!("response: {:?}", result);
.and_then(Self::h2api_response)
.and_then(|result| {
println!("RESPONSE: {:?}", result);
Ok(())
})
.then(|result| verify_result_tx.send(result))
.map_err(|_| { /* ignore closed channel */ })
);
.map_err(|err| format_err!("pipelined request failed: {}", err))
})
.then(|result|
verify_result_tx.send(result)
)
.map_err(|_| { /* ignore closed channel */ })
);
let start_time = std::time::Instant::now();
(verify_queue_tx, verify_result_rx)
}
futures::stream::repeat(data)
.take_while(move |_| {
repeat.fetch_add(1, Ordering::SeqCst);
Ok(start_time.elapsed().as_secs() < 5)
pub fn upload_stream(
&self,
wid: u64,
stream: impl Stream<Item=Vec<u8>, Error=Error>,
) -> impl Future<Item=usize, Error=Error> {
let repeat = std::sync::Arc::new(AtomicUsize::new(0));
let repeat2 = repeat.clone();
let stream_len = std::sync::Arc::new(AtomicUsize::new(0));
let stream_len2 = stream_len.clone();
let (upload_queue, upload_result) = self.clone().response_queue();
let start_time = std::time::Instant::now();
let self2 = self.clone();
stream
.for_each(move |data| {
repeat.fetch_add(1, Ordering::SeqCst);
stream_len.fetch_add(data.len(), Ordering::SeqCst);
let upload_queue = upload_queue.clone();
let digest = openssl::sha::sha256(&data);
let chunk_is_known = false;
let upload_data;
let request;
println!("upload chunk ({} bytes)", data.len());
if chunk_is_known {
let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) });
request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
upload_data = None;
} else {
let param = json!({ "wid": wid, "size" : data.len() });
request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
upload_data = Some(bytes::Bytes::from(data));
}
self2.send_request(request, upload_data)
.and_then(move |response| {
upload_queue.send(response)
.map(|_| ()).map_err(Error::from)
})
.for_each(move |data| {
let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
let (response, stream) = send_request.send_request(request, false).unwrap();
//println!("send test data ({} bytes)", data.len());
let verify_queue_tx = verify_queue_tx.clone();
PipeToSendStream::new(bytes::Bytes::from(data), stream)
.and_then(move |_| {
verify_queue_tx.send(response).map_err(Error::from).map(|_| ())
})
})
.then(move |result| {
verify_result_rx.map_err(Error::from).and_then(|verify_result| {
Ok(verify_result.and(result))
})
})
.flatten()
.and_then(move |_| {
let repeat = repeat2.load(Ordering::SeqCst);
println!("Uploaded {} chunks in {} seconds", repeat, start_time.elapsed().as_secs());
let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
println!("time per request: {} microseconds", (start_time.elapsed().as_micros())/(repeat as u128));
Ok(speed)
})
.then(move |result| {
println!("RESULT {:?}", result);
upload_result.map_err(Error::from).and_then(|upload1_result| {
Ok(upload1_result.and(result))
})
})
.flatten()
.and_then(move |_| {
let repeat = repeat2.load(Ordering::SeqCst);
let stream_len = stream_len2.load(Ordering::SeqCst);
let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed);
if repeat > 0 {
println!("Average chunk size was {} bytes.", stream_len/repeat);
println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
}
Ok(speed)
})
}
pub fn upload_speedtest(&self) -> impl Future<Item=usize, Error=Error> {
let mut data = vec![];
// generate pseudo random byte sequence
for i in 0..1024*1024 {
for j in 0..4 {
let byte = ((i >> (j<<3))&0xff) as u8;
data.push(byte);
}
}
let item_len = data.len();
let repeat = std::sync::Arc::new(AtomicUsize::new(0));
let repeat2 = repeat.clone();
let (upload_queue, upload_result) = self.clone().response_queue();
let start_time = std::time::Instant::now();
let self2 = self.clone();
futures::stream::repeat(data)
.take_while(move |_| {
repeat.fetch_add(1, Ordering::SeqCst);
Ok(start_time.elapsed().as_secs() < 5)
})
.for_each(move |data| {
let upload_queue = upload_queue.clone();
println!("send test data ({} bytes)", data.len());
let request = Self::request_builder("localhost", "POST", "speedtest", None).unwrap();
self2.send_request(request, Some(bytes::Bytes::from(data)))
.and_then(move |response| {
upload_queue.send(response)
.map(|_| ()).map_err(Error::from)
})
})
.then(move |result| {
println!("RESULT {:?}", result);
upload_result.map_err(Error::from).and_then(|upload1_result| {
Ok(upload1_result.and(result))
})
})
.flatten()
.and_then(move |_| {
let repeat = repeat2.load(Ordering::SeqCst);
println!("Uploaded {} chunks in {} seconds.", repeat, start_time.elapsed().as_secs());
let speed = ((item_len*1000000*(repeat as usize))/(1024*1024))/(start_time.elapsed().as_micros() as usize);
if repeat > 0 {
println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
}
Ok(speed)
})
}
fn request(
@ -513,14 +597,34 @@ impl H2Client {
request: Request<()>,
) -> impl Future<Item=Value, Error=Error> {
self.send_request(request, None)
.and_then(move |response| {
response
.map_err(Error::from)
.and_then(Self::h2api_response)
})
}
fn send_request(
&self,
request: Request<()>,
data: Option<bytes::Bytes>,
) -> impl Future<Item=h2::client::ResponseFuture, Error=Error> {
self.h2.clone()
.ready()
.map_err(Error::from)
.and_then(move |mut send_request| {
let (response, _stream) = send_request.send_request(request, true).unwrap();
response
.map_err(Error::from)
.and_then(Self::h2api_response)
if let Some(data) = data {
let (response, stream) = send_request.send_request(request, false).unwrap();
future::Either::A(PipeToSendStream::new(bytes::Bytes::from(data), stream)
.and_then(move |_| {
future::ok(response)
}))
} else {
let (response, _stream) = send_request.send_request(request, true).unwrap();
future::Either::B(future::ok(response))
}
})
}