src/server/rest.rs: simplify code

This commit is contained in:
Dietmar Maurer 2019-11-22 18:44:14 +01:00
parent 2bbd835b9b
commit 70fbac84da
2 changed files with 54 additions and 101 deletions

View File

@ -7,7 +7,7 @@ use std::task::{Context, Poll};
use futures::*;
use hyper::{Body, Request, Response, StatusCode};
use proxmox::api::{http_err, ApiFuture, ApiHandler, HttpError, Router, RpcEnvironment};
use proxmox::api::{http_err, ApiFuture, HttpError, Router, RpcEnvironment};
use crate::tools;
use crate::server::formatter::*;
@ -58,16 +58,8 @@ impl <E: RpcEnvironment + Clone> H2Service<E> {
future::ok((formatter.format_error)(err)).boxed()
}
Some(api_method) => {
match api_method.handler {
ApiHandler::Sync(_) => {
crate::server::rest::handle_sync_api_request(
self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed()
}
ApiHandler::Async(_) => {
crate::server::rest::handle_async_api_request(
self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed()
}
}
crate::server::rest::handle_api_request(
self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed()
}
}
}

View File

@ -149,6 +149,37 @@ impl tower_service::Service<Request<Body>> for ApiService {
}
}
fn parse_query_parameters<S: 'static + BuildHasher + Send>(
param_schema: &ObjectSchema,
form: &str, // x-www-form-urlencoded body data
parts: &Parts,
uri_param: &HashMap<String, String, S>,
) -> Result<Value, Error> {
let mut param_list: Vec<(String, String)> = vec![];
if !form.is_empty() {
for (k, v) in form_urlencoded::parse(form.as_bytes()).into_owned() {
param_list.push((k, v));
}
}
if let Some(query_str) = parts.uri.query() {
for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
if k == "_dc" { continue; } // skip extjs "disable cache" parameter
param_list.push((k, v));
}
}
for (k, v) in uri_param {
param_list.push((k.clone(), v.clone()));
}
let params = parse_parameter_strings(&param_list, param_schema, true)?;
Ok(params)
}
async fn get_request_parameters<S: 'static + BuildHasher + Send>(
param_schema: &ObjectSchema,
parts: Parts,
@ -181,11 +212,11 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
}
}).await?;
let utf8 = std::str::from_utf8(&body)
let utf8_data = std::str::from_utf8(&body)
.map_err(|err| format_err!("Request body not uft8: {}", err))?;
if is_json {
let mut params: Value = serde_json::from_str(utf8)?;
let mut params: Value = serde_json::from_str(utf8_data)?;
for (k, v) in uri_param {
if let Some((_optional, prop_schema)) = param_schema.lookup(&k) {
params[&k] = parse_simple_value(&v, prop_schema)?;
@ -193,30 +224,9 @@ async fn get_request_parameters<S: 'static + BuildHasher + Send>(
}
verify_json_object(&params, param_schema)?;
return Ok(params);
} else {
parse_query_parameters(param_schema, utf8_data, &parts, &uri_param)
}
let mut param_list: Vec<(String, String)> = vec![];
if !utf8.is_empty() {
for (k, v) in form_urlencoded::parse(utf8.as_bytes()).into_owned() {
param_list.push((k, v));
}
}
if let Some(query_str) = parts.uri.query() {
for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
if k == "_dc" { continue; } // skip extjs "disable cache" parameter
param_list.push((k, v));
}
}
for (k, v) in uri_param {
param_list.push((k.clone(), v.clone()));
}
let params = parse_parameter_strings(&param_list, param_schema, true)?;
Ok(params)
}
struct NoLogExtension();
@ -253,7 +263,7 @@ async fn proxy_protected_request(
Ok(resp)
}
pub async fn handle_sync_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>(
pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>(
mut rpcenv: Env,
info: &'static ApiMethod,
formatter: &'static OutputFormatter,
@ -262,23 +272,26 @@ pub async fn handle_sync_api_request<Env: RpcEnvironment, S: 'static + BuildHash
uri_param: HashMap<String, String, S>,
) -> Result<Response<Body>, Error> {
let handler = match info.handler {
ApiHandler::Async(_) => bail!("handle_sync_api_request: internal error (called with Async handler)"),
ApiHandler::Sync(handler) => handler,
};
let params = get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
let mut delay = false;
let result = match info.handler {
ApiHandler::Async(handler) => {
let params = parse_query_parameters(info.parameters, "", &parts, &uri_param)?;
(handler)(parts, req_body, params, info, Box::new(rpcenv)).await
}
ApiHandler::Sync(handler) => {
let params = get_request_parameters(info.parameters, parts, req_body, uri_param).await?;
(handler)(params, info, &mut rpcenv)
.map(|data| (formatter.format_data)(data, &rpcenv))
}
};
let resp = match (handler)(params, info, &mut rpcenv) {
Ok(data) => (formatter.format_data)(data, &rpcenv),
let resp = match result {
Ok(resp) => resp,
Err(err) => {
if let Some(httperr) = err.downcast_ref::<HttpError>() {
if httperr.code == StatusCode::UNAUTHORIZED {
delay = true;
tokio::timer::delay(delay_unauth_time).await;
}
}
(formatter.format_error)(err)
@ -287,51 +300,6 @@ pub async fn handle_sync_api_request<Env: RpcEnvironment, S: 'static + BuildHash
if info.reload_timezone { unsafe { tzset(); } }
if delay {
tokio::timer::delay(delay_unauth_time).await;
}
Ok(resp)
}
pub async fn handle_async_api_request<Env: RpcEnvironment>(
rpcenv: Env,
info: &'static ApiMethod,
formatter: &'static OutputFormatter,
parts: Parts,
req_body: Body,
uri_param: HashMap<String, String>,
) -> Result<Response<Body>, Error> {
let handler = match info.handler {
ApiHandler::Sync(_) => bail!("handle_async_api_request: internal error (called with Sync handler)"),
ApiHandler::Async(handler) => handler,
};
// fixme: convert parameters to Json
let mut param_list: Vec<(String, String)> = vec![];
if let Some(query_str) = parts.uri.query() {
for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
if k == "_dc" { continue; } // skip extjs "disable cache" parameter
param_list.push((k, v));
}
}
for (k, v) in uri_param {
param_list.push((k.clone(), v.clone()));
}
let params = match parse_parameter_strings(&param_list, &info.parameters, true) {
Ok(v) => v,
Err(err) => {
return Ok((formatter.format_error)(Error::from(err)));
}
};
let resp = (handler)(parts, req_body, params, info, Box::new(rpcenv)).await?;
Ok(resp)
}
@ -578,14 +546,7 @@ pub async fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> Result<R
if api_method.protected && env_type == RpcEnvironmentType::PUBLIC {
return proxy_protected_request(api_method, parts, body).await;
} else {
match api_method.handler {
ApiHandler::Sync(_) => {
return handle_sync_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await;
}
ApiHandler::Async(_) => {
return handle_async_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await;
}
}
return handle_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await;
}
}
}