handle_async_api_request: put rpcenv into a Box

So that we can pass rpcenv into futures.
This commit is contained in:
Dietmar Maurer 2019-05-09 18:01:24 +02:00
parent 2c41096a4b
commit b4b63e520d
8 changed files with 37 additions and 16 deletions

View File

@ -42,7 +42,7 @@ fn upgrade_to_backup_protocol(
req_body: Body, req_body: Body,
param: Value, param: Value,
_info: &ApiAsyncMethod, _info: &ApiAsyncMethod,
rpcenv: &mut RpcEnvironment, rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> { ) -> Result<BoxFut, Error> {
static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2"; static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2";
@ -155,7 +155,7 @@ fn test2_get(
_req_body: Body, _req_body: Body,
_param: Value, _param: Value,
_info: &ApiAsyncMethod, _info: &ApiAsyncMethod,
_rpcenv: &mut RpcEnvironment, _rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> { ) -> Result<BoxFut, Error> {
let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300)) let fut = tokio::timer::Interval::new_interval(std::time::Duration::from_millis(300))

View File

@ -1,3 +1,4 @@
use failure::*;
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap; use std::collections::HashMap;
@ -6,6 +7,8 @@ use serde_json::Value;
use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType}; use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
use crate::server::WorkerTask; use crate::server::WorkerTask;
use crate::backup::*; use crate::backup::*;
use crate::server::formatter::*;
use hyper::{Body, Response};
/// `RpcEnvironmet` implementation for backup service /// `RpcEnvironmet` implementation for backup service
#[derive(Clone)] #[derive(Clone)]
@ -13,6 +16,7 @@ pub struct BackupEnvironment {
env_type: RpcEnvironmentType, env_type: RpcEnvironmentType,
result_attributes: HashMap<String, Value>, result_attributes: HashMap<String, Value>,
user: String, user: String,
pub formatter: &'static OutputFormatter,
pub worker: Arc<WorkerTask>, pub worker: Arc<WorkerTask>,
pub datastore: Arc<DataStore>, pub datastore: Arc<DataStore>,
} }
@ -25,12 +29,20 @@ impl BackupEnvironment {
user, user,
worker, worker,
datastore, datastore,
formatter: &JSON_FORMATTER,
} }
} }
pub fn log<S: AsRef<str>>(&self, msg: S) { pub fn log<S: AsRef<str>>(&self, msg: S) {
self.worker.log(msg); self.worker.log(msg);
} }
pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
match result {
Ok(data) => (self.formatter.format_data)(data, self),
Err(err) => (self.formatter.format_error)(err),
}
}
} }
impl RpcEnvironment for BackupEnvironment { impl RpcEnvironment for BackupEnvironment {
@ -61,3 +73,8 @@ impl AsRef<BackupEnvironment> for RpcEnvironment {
self.as_any().downcast_ref::<BackupEnvironment>().unwrap() self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
} }
} }
impl AsRef<BackupEnvironment> for Box<RpcEnvironment> {
fn as_ref(&self) -> &BackupEnvironment {
self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
}
}

View File

@ -41,8 +41,6 @@ impl BackupService {
Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))), Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))),
}; };
let formatter = &JSON_FORMATTER;
self.worker.log(format!("H2 REQUEST {} {}", method, path)); self.worker.log(format!("H2 REQUEST {} {}", method, path));
self.worker.log(format!("H2 COMPO {:?}", components)); self.worker.log(format!("H2 COMPO {:?}", components));
@ -51,13 +49,15 @@ impl BackupService {
match BACKUP_ROUTER.find_method(&components, method, &mut uri_param) { match BACKUP_ROUTER.find_method(&components, method, &mut uri_param) {
MethodDefinition::None => { MethodDefinition::None => {
let err = http_err!(NOT_FOUND, "Path not found.".to_string()); let err = http_err!(NOT_FOUND, "Path not found.".to_string());
return Box::new(future::ok((formatter.format_error)(err))); return Box::new(future::ok((self.rpcenv.formatter.format_error)(err)));
} }
MethodDefinition::Simple(api_method) => { MethodDefinition::Simple(api_method) => {
return crate::server::rest::handle_sync_api_request(self.rpcenv.clone(), api_method, formatter, parts, body, uri_param); return crate::server::rest::handle_sync_api_request(
self.rpcenv.clone(), api_method, self.rpcenv.formatter, parts, body, uri_param);
} }
MethodDefinition::Async(async_method) => { MethodDefinition::Async(async_method) => {
return crate::server::rest::handle_async_api_request(self.rpcenv.clone(), async_method, formatter, parts, body, uri_param); return crate::server::rest::handle_async_api_request(
self.rpcenv.clone(), async_method, self.rpcenv.formatter, parts, body, uri_param);
} }
} }
} }

View File

@ -71,7 +71,7 @@ fn upload_chunk(
req_body: Body, req_body: Body,
param: Value, param: Value,
_info: &ApiAsyncMethod, _info: &ApiAsyncMethod,
rpcenv: &mut RpcEnvironment, rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> { ) -> Result<BoxFut, Error> {
let size = tools::required_integer_param(&param, "size")?; let size = tools::required_integer_param(&param, "size")?;
@ -85,7 +85,11 @@ fn upload_chunk(
let resp = upload.select(abort_future) let resp = upload.select(abort_future)
.and_then(|(result, _)| Ok(result)) .and_then(|(result, _)| Ok(result))
.map_err(|(err, _)| err) .map_err(|(err, _)| err)
.then(|res| Ok(crate::server::formatter::json_response(res))); //.then(|res| Ok(crate::server::formatter::json_response(res)));
.then(move |res| {
let env: &BackupEnvironment = rpcenv.as_ref();
Ok(env.format_response(res))
});
Ok(Box::new(resp)) Ok(Box::new(resp))

View File

@ -51,7 +51,7 @@ fn upload_pxar(
req_body: Body, req_body: Body,
param: Value, param: Value,
_info: &ApiAsyncMethod, _info: &ApiAsyncMethod,
rpcenv: &mut RpcEnvironment, rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> { ) -> Result<BoxFut, Error> {
let store = tools::required_string_param(&param, "store")?; let store = tools::required_string_param(&param, "store")?;
@ -149,7 +149,7 @@ fn download_pxar(
_req_body: Body, _req_body: Body,
param: Value, param: Value,
_info: &ApiAsyncMethod, _info: &ApiAsyncMethod,
_rpcenv: &mut RpcEnvironment, _rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> { ) -> Result<BoxFut, Error> {
let store = tools::required_string_param(&param, "store")?; let store = tools::required_string_param(&param, "store")?;

View File

@ -34,7 +34,7 @@ fn upgrade_upload(
req_body: Body, req_body: Body,
param: Value, param: Value,
_info: &ApiAsyncMethod, _info: &ApiAsyncMethod,
_rpcenv: &mut RpcEnvironment, _rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut> { ) -> Result<BoxFut> {
let store = tools::required_string_param(&param, "store")?.to_string(); let store = tools::required_string_param(&param, "store")?.to_string();
let expected_protocol: &'static str = "proxmox-backup-protocol-1"; let expected_protocol: &'static str = "proxmox-backup-protocol-1";

View File

@ -75,7 +75,7 @@ macro_rules! http_err {
} }
type ApiAsyncHandlerFn = Box< type ApiAsyncHandlerFn = Box<
dyn Fn(Parts, Body, Value, &ApiAsyncMethod, &mut dyn RpcEnvironment) -> Result<BoxFut, Error> dyn Fn(Parts, Body, Value, &ApiAsyncMethod, Box<RpcEnvironment>) -> Result<BoxFut, Error>
+ Send + Sync + 'static + Send + Sync + 'static
>; >;
@ -152,7 +152,7 @@ impl ApiAsyncMethod {
pub fn new<F>(handler: F, parameters: ObjectSchema) -> Self pub fn new<F>(handler: F, parameters: ObjectSchema) -> Self
where where
F: Fn(Parts, Body, Value, &ApiAsyncMethod, &mut dyn RpcEnvironment) -> Result<BoxFut, Error> F: Fn(Parts, Body, Value, &ApiAsyncMethod, Box<RpcEnvironment>) -> Result<BoxFut, Error>
+ Send + Sync + 'static, + Send + Sync + 'static,
{ {
Self { Self {

View File

@ -264,7 +264,7 @@ pub fn handle_sync_api_request<Env: RpcEnvironment>(
} }
pub fn handle_async_api_request<Env: RpcEnvironment>( pub fn handle_async_api_request<Env: RpcEnvironment>(
mut rpcenv: Env, rpcenv: Env,
info: &'static ApiAsyncMethod, info: &'static ApiAsyncMethod,
formatter: &'static OutputFormatter, formatter: &'static OutputFormatter,
parts: Parts, parts: Parts,
@ -294,7 +294,7 @@ pub fn handle_async_api_request<Env: RpcEnvironment>(
} }
}; };
match (info.handler)(parts, req_body, params, info, &mut rpcenv) { match (info.handler)(parts, req_body, params, info, Box::new(rpcenv)) {
Ok(future) => future, Ok(future) => future,
Err(err) => { Err(err) => {
let resp = (formatter.format_error)(Error::from(err)); let resp = (formatter.format_error)(Error::from(err));