src/api2/reader.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-27 13:55:41 +02:00
parent 7622005574
commit ffb6434485

View File

@ -93,30 +93,37 @@ fn upgrade_to_backup_reader_protocol(
let abort_future = worker.abort_future(); let abort_future = worker.abort_future();
let env3 = env.clone(); let req_fut = req_body
req_body
.on_upgrade() .on_upgrade()
.map_err(Error::from) .map_err(Error::from)
.and_then(move |conn| { .and_then({
env3.debug("protocol upgrade done"); let env = env.clone();
move |conn| {
env.debug("protocol upgrade done");
let mut http = hyper::server::conn::Http::new(); let mut http = hyper::server::conn::Http::new();
http.http2_only(true); http.http2_only(true);
// increase window size: todo - find optiomal size // increase window size: todo - find optiomal size
let window_size = 32*1024*1024; // max = (1 << 31) - 2 let window_size = 32*1024*1024; // max = (1 << 31) - 2
http.http2_initial_stream_window_size(window_size); http.http2_initial_stream_window_size(window_size);
http.http2_initial_connection_window_size(window_size); http.http2_initial_connection_window_size(window_size);
http.serve_connection(conn, service) http.serve_connection(conn, service)
.map_err(Error::from) .map_err(Error::from)
}) }
.select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) });
.map_err(|(err, _)| err) let abort_future = abort_future
.and_then(move |(_result, _)| { .map(|_| Err(format_err!("task aborted")));
env.log("reader finished sucessfully");
Ok(()) use futures::future::Either;
futures::future::select(req_fut, abort_future)
.map(|res| match res {
Either::Left((Ok(res), _)) => Ok(res),
Either::Left((Err(err), _)) => Err(err),
Either::Right((Ok(res), _)) => Ok(res),
Either::Right((Err(err), _)) => Err(err),
}) })
.map_ok(move |_| env.log("reader finished sucessfully"))
})?; })?;
let response = Response::builder() let response = Response::builder()
@ -182,13 +189,13 @@ fn download_file(
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
.and_then(move |file| { .and_then(move |file| {
env2.log(format!("download {:?}", path3)); env2.log(format!("download {:?}", path3));
let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()). let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
map(|bytes| hyper::Chunk::from(bytes.freeze())); .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
let body = Body::wrap_stream(payload); let body = Body::wrap_stream(payload);
// fixme: set other headers ? // fixme: set other headers ?
Ok(Response::builder() futures::future::ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_TYPE, "application/octet-stream")
.body(body) .body(body)
@ -229,13 +236,13 @@ fn download_chunk(
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
.and_then(move |file| { .and_then(move |file| {
env2.debug(format!("download chunk {:?}", path3)); env2.debug(format!("download chunk {:?}", path3));
let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()). let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
map(|bytes| hyper::Chunk::from(bytes.freeze())); .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
let body = Body::wrap_stream(payload); let body = Body::wrap_stream(payload);
// fixme: set other headers ? // fixme: set other headers ?
Ok(Response::builder() futures::future::ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_TYPE, "application/octet-stream")
.body(body) .body(body)