diff --git a/src/api2/reader.rs b/src/api2/reader.rs index 43d832ce..ae936184 100644 --- a/src/api2/reader.rs +++ b/src/api2/reader.rs @@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol( let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time()); - WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| { + WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move { + let _guard = _guard; + let mut env = ReaderEnvironment::new( env_type, auth_id, @@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol( let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug); - let abort_future = worker.abort_future(); + let mut abort_future = worker.abort_future() + .map(|_| Err(format_err!("task aborted"))); - let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body)) - .map_err(Error::from) - .and_then({ - let env = env.clone(); - move |conn| { - env.debug("protocol upgrade done"); + let env2 = env.clone(); + let req_fut = async move { + let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?; + env2.debug("protocol upgrade done"); - let mut http = hyper::server::conn::Http::new(); - http.http2_only(true); - // increase window size: todo - find optiomal size - let window_size = 32*1024*1024; // max = (1 << 31) - 2 - http.http2_initial_stream_window_size(window_size); - http.http2_initial_connection_window_size(window_size); - http.http2_max_frame_size(4*1024*1024); + let mut http = hyper::server::conn::Http::new(); + http.http2_only(true); + // increase window size: todo - find optiomal size + let window_size = 32*1024*1024; // max = (1 << 31) - 2 + http.http2_initial_stream_window_size(window_size); + http.http2_initial_connection_window_size(window_size); + http.http2_max_frame_size(4*1024*1024); - http.serve_connection(conn, service) - .map_err(Error::from) - } - }); - let abort_future = abort_future - .map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) }); + http.serve_connection(conn, service) + .map_err(Error::from).await + }; - use futures::future::Either; - futures::future::select(req_fut, abort_future) - .map(move |res| { - let _guard = _guard; - match res { - Either::Left((Ok(_), _)) => Ok(()), - Either::Left((Err(err), _)) => Err(err), - Either::Right((Ok(_), _)) => Ok(()), - Either::Right((Err(err), _)) => Err(err), - } - }) - .map_ok(move |_| env.log("reader finished successfully")) + futures::select!{ + req = req_fut.fuse() => req?, + abort = abort_future => abort?, + }; + + env.log("reader finished successfully"); + + Ok(()) })?; let response = Response::builder()