src/api2/admin/datastore/h2upload.rs: use WorkerTask

This commit is contained in:
Dietmar Maurer 2019-05-07 13:42:00 +02:00
parent 52cf506e48
commit d9bd06eae8
1 changed files with 23 additions and 13 deletions

View File

@ -7,7 +7,6 @@ use futures::*;
use hyper::header::{HeaderValue, UPGRADE}; use hyper::header::{HeaderValue, UPGRADE};
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use hyper::http::request::Parts; use hyper::http::request::Parts;
use hyper::rt;
use serde_json::Value; use serde_json::Value;
@ -15,7 +14,7 @@ use crate::tools;
use crate::api_schema::router::*; use crate::api_schema::router::*;
use crate::api_schema::*; use crate::api_schema::*;
use crate::server::formatter::*; use crate::server::formatter::*;
use crate::server::RestEnvironment; use crate::server::{WorkerTask, RestEnvironment};
pub fn api_method_upgrade_h2upload() -> ApiAsyncMethod { pub fn api_method_upgrade_h2upload() -> ApiAsyncMethod {
ApiAsyncMethod::new( ApiAsyncMethod::new(
@ -74,6 +73,12 @@ impl BackupService {
} }
} }
impl Drop for BackupService {
fn drop(&mut self) {
println!("SERVER DROP");
}
}
impl hyper::service::Service for BackupService { impl hyper::service::Service for BackupService {
type ReqBody = Body; type ReqBody = Body;
type ResBody = Body; type ResBody = Body;
@ -131,22 +136,28 @@ fn upgrade_h2upload(
bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
} }
let worker_id = String::from("test2workerid");
let service = BackupService::new(rpcenv); let service = BackupService::new(rpcenv);
rt::spawn( WorkerTask::spawn("test2_download", Some(worker_id), &rpcenv.get_user().unwrap(), false, move |worker| {
req_body req_body
.on_upgrade() .on_upgrade()
.map_err(Error::from) .map_err(Error::from)
.and_then(move |conn| { .and_then(move |conn| {
println!("upgrade done"); worker.log("upgrade done");
let mut http = hyper::server::conn::Http::new(); let mut http = hyper::server::conn::Http::new();
http.http2_only(true); http.http2_only(true);
http.serve_connection(conn, service).map_err(Error::from) http.serve_connection(conn, service)
.map_err(Error::from)
.then(|x| {
println!("H2 END");
x
}) })
.map_err(|e| eprintln!("error during upgrade: {}", e)) })
); }).unwrap();
Ok(Box::new(futures::future::ok( Ok(Box::new(futures::future::ok(
Response::builder() Response::builder()
@ -194,13 +205,12 @@ fn test1_get (
} }
fn test2_get( fn test2_get(
parts: Parts, _parts: Parts,
req_body: Body, _req_body: Body,
param: Value, _param: Value,
_info: &ApiAsyncMethod, _info: &ApiAsyncMethod,
rpcenv: &mut RpcEnvironment, _rpcenv: &mut RpcEnvironment,
) -> Result<BoxFut, Error> { ) -> Result<BoxFut, Error> {
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300)) let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300))
.map_err(|err| http_err!(INTERNAL_SERVER_ERROR, format!("tokio timer interval error: {}", err))) .map_err(|err| http_err!(INTERNAL_SERVER_ERROR, format!("tokio timer interval error: {}", err)))