api2/reader: asyncify the reader worker task
this way, the code is much more readable Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
parent
41dacd5d3d
commit
bdb6e6b83f
@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol(
|
||||
|
||||
let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
|
||||
|
||||
WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
|
||||
WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
|
||||
let _guard = _guard;
|
||||
|
||||
let mut env = ReaderEnvironment::new(
|
||||
env_type,
|
||||
auth_id,
|
||||
@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol(
|
||||
|
||||
let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
|
||||
|
||||
let abort_future = worker.abort_future();
|
||||
let mut abort_future = worker.abort_future()
|
||||
.map(|_| Err(format_err!("task aborted")));
|
||||
|
||||
let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
|
||||
.map_err(Error::from)
|
||||
.and_then({
|
||||
let env = env.clone();
|
||||
move |conn| {
|
||||
env.debug("protocol upgrade done");
|
||||
let env2 = env.clone();
|
||||
let req_fut = async move {
|
||||
let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
|
||||
env2.debug("protocol upgrade done");
|
||||
|
||||
let mut http = hyper::server::conn::Http::new();
|
||||
http.http2_only(true);
|
||||
// increase window size: todo - find optiomal size
|
||||
let window_size = 32*1024*1024; // max = (1 << 31) - 2
|
||||
http.http2_initial_stream_window_size(window_size);
|
||||
http.http2_initial_connection_window_size(window_size);
|
||||
http.http2_max_frame_size(4*1024*1024);
|
||||
let mut http = hyper::server::conn::Http::new();
|
||||
http.http2_only(true);
|
||||
// increase window size: todo - find optiomal size
|
||||
let window_size = 32*1024*1024; // max = (1 << 31) - 2
|
||||
http.http2_initial_stream_window_size(window_size);
|
||||
http.http2_initial_connection_window_size(window_size);
|
||||
http.http2_max_frame_size(4*1024*1024);
|
||||
|
||||
http.serve_connection(conn, service)
|
||||
.map_err(Error::from)
|
||||
}
|
||||
});
|
||||
let abort_future = abort_future
|
||||
.map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) });
|
||||
http.serve_connection(conn, service)
|
||||
.map_err(Error::from).await
|
||||
};
|
||||
|
||||
use futures::future::Either;
|
||||
futures::future::select(req_fut, abort_future)
|
||||
.map(move |res| {
|
||||
let _guard = _guard;
|
||||
match res {
|
||||
Either::Left((Ok(_), _)) => Ok(()),
|
||||
Either::Left((Err(err), _)) => Err(err),
|
||||
Either::Right((Ok(_), _)) => Ok(()),
|
||||
Either::Right((Err(err), _)) => Err(err),
|
||||
}
|
||||
})
|
||||
.map_ok(move |_| env.log("reader finished successfully"))
|
||||
futures::select!{
|
||||
req = req_fut.fuse() => req?,
|
||||
abort = abort_future => abort?,
|
||||
};
|
||||
|
||||
env.log("reader finished successfully");
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
let response = Response::builder()
|
||||
|
Loading…
Reference in New Issue
Block a user