src/server/rest.rs: cleanup async code

This commit is contained in:
Dietmar Maurer 2019-11-22 13:02:05 +01:00
parent be2bb37205
commit ad51d02aa9
6 changed files with 451 additions and 502 deletions

View File

@ -480,8 +480,9 @@ fn download_file(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
_rpcenv: Box<dyn RpcEnvironment>, _rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let store = tools::required_string_param(&param, "store")?; let store = tools::required_string_param(&param, "store")?;
let datastore = DataStore::lookup_datastore(store)?; let datastore = DataStore::lookup_datastore(store)?;
@ -501,22 +502,21 @@ fn download_file(
path.push(backup_dir.relative_path()); path.push(backup_dir.relative_path());
path.push(&file_name); path.push(&file_name);
let response_future = tokio::fs::File::open(path) let file = tokio::fs::File::open(path)
.map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err))) .map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err)))
.and_then(move |file| { .await?;
let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
.map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
let body = Body::wrap_stream(payload); let body = Body::wrap_stream(payload);
// fixme: set other headers ? // fixme: set other headers ?
futures::future::ok(Response::builder() Ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_TYPE, "application/octet-stream")
.body(body) .body(body)
.unwrap()) .unwrap())
}); }.boxed()
Ok(Box::new(response_future))
} }
#[sortable] #[sortable]
@ -543,8 +543,9 @@ fn upload_backup_log(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
_rpcenv: Box<dyn RpcEnvironment>, _rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let store = tools::required_string_param(&param, "store")?; let store = tools::required_string_param(&param, "store")?;
let datastore = DataStore::lookup_datastore(store)?; let datastore = DataStore::lookup_datastore(store)?;
@ -568,26 +569,23 @@ fn upload_backup_log(
println!("Upload backup log to {}/{}/{}/{}/{}", store, println!("Upload backup log to {}/{}/{}/{}/{}", store,
backup_type, backup_id, BackupDir::backup_time_to_string(backup_dir.backup_time()), file_name); backup_type, backup_id, BackupDir::backup_time_to_string(backup_dir.backup_time()), file_name);
let resp = req_body let data = req_body
.map_err(Error::from) .map_err(Error::from)
.try_fold(Vec::new(), |mut acc, chunk| { .try_fold(Vec::new(), |mut acc, chunk| {
acc.extend_from_slice(&*chunk); acc.extend_from_slice(&*chunk);
future::ok::<_, Error>(acc) future::ok::<_, Error>(acc)
}) })
.and_then(move |data| async move { .await?;
let blob = DataBlob::from_raw(data)?; let blob = DataBlob::from_raw(data)?;
// always verify CRC at server side // always verify CRC at server side
blob.verify_crc()?; blob.verify_crc()?;
let raw_data = blob.raw_data(); let raw_data = blob.raw_data();
file_set_contents(&path, raw_data, None)?; file_set_contents(&path, raw_data, None)?;
Ok(())
})
.and_then(move |_| {
future::ok(crate::server::formatter::json_response(Ok(Value::Null)))
})
;
Ok(Box::new(resp)) // fixme: use correct formatter
Ok(crate::server::formatter::json_response(Ok(Value::Null)))
}.boxed()
} }
#[sortable] #[sortable]

View File

@ -47,8 +47,9 @@ fn upgrade_to_backup_protocol(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let debug = param["debug"].as_bool().unwrap_or(false); let debug = param["debug"].as_bool().unwrap_or(false);
let store = tools::required_string_param(&param, "store")?.to_owned(); let store = tools::required_string_param(&param, "store")?.to_owned();
@ -159,7 +160,8 @@ fn upgrade_to_backup_protocol(
.header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!())) .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
.body(Body::empty())?; .body(Body::empty())?;
Ok(Box::new(futures::future::ok(response))) Ok(response)
}.boxed()
} }
pub const BACKUP_API_SUBDIRS: SubdirMap = &[ pub const BACKUP_API_SUBDIRS: SubdirMap = &[
@ -569,8 +571,9 @@ fn dynamic_chunk_index(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned(); let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
@ -587,7 +590,7 @@ fn dynamic_chunk_index(
let last_backup = match &env.last_backup { let last_backup = match &env.last_backup {
Some(info) => info, Some(info) => info,
None => return Ok(Box::new(future::ok(empty_response))), None => return Ok(empty_response),
}; };
let mut path = last_backup.backup_dir.relative_path(); let mut path = last_backup.backup_dir.relative_path();
@ -597,7 +600,7 @@ fn dynamic_chunk_index(
Ok(index) => index, Ok(index) => index,
Err(_) => { Err(_) => {
env.log(format!("there is no last backup for archive '{}'", archive_name)); env.log(format!("there is no last backup for archive '{}'", archive_name));
return Ok(Box::new(future::ok(empty_response))); return Ok(empty_response);
} }
}; };
@ -619,7 +622,8 @@ fn dynamic_chunk_index(
.status(200) .status(200)
.body(Body::wrap_stream(stream))?; .body(Body::wrap_stream(stream))?;
Ok(Box::new(future::ok(response))) Ok(response)
}.boxed()
} }
#[sortable] #[sortable]
@ -642,8 +646,9 @@ fn fixed_chunk_index(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned(); let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
@ -660,7 +665,7 @@ fn fixed_chunk_index(
let last_backup = match &env.last_backup { let last_backup = match &env.last_backup {
Some(info) => info, Some(info) => info,
None => return Ok(Box::new(future::ok(empty_response))), None => return Ok(empty_response),
}; };
let mut path = last_backup.backup_dir.relative_path(); let mut path = last_backup.backup_dir.relative_path();
@ -670,7 +675,7 @@ fn fixed_chunk_index(
Ok(index) => index, Ok(index) => index,
Err(_) => { Err(_) => {
env.log(format!("there is no last backup for archive '{}'", archive_name)); env.log(format!("there is no last backup for archive '{}'", archive_name));
return Ok(Box::new(future::ok(empty_response))); return Ok(empty_response);
} }
}; };
@ -697,5 +702,6 @@ fn fixed_chunk_index(
.status(200) .status(200)
.body(Body::wrap_stream(stream))?; .body(Body::wrap_stream(stream))?;
Ok(Box::new(future::ok(response))) Ok(response)
}.boxed()
} }

View File

@ -115,8 +115,9 @@ fn upload_fixed_chunk(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let wid = tools::required_integer_param(&param, "wid")? as usize; let wid = tools::required_integer_param(&param, "wid")? as usize;
let size = tools::required_integer_param(&param, "size")? as u32; let size = tools::required_integer_param(&param, "size")? as u32;
let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32; let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
@ -126,23 +127,18 @@ fn upload_fixed_chunk(
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size); let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
let resp = upload
.then(move |result| {
let env: &BackupEnvironment = rpcenv.as_ref();
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?; env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = proxmox::tools::digest_to_hex(&digest); let digest_str = proxmox::tools::digest_to_hex(&digest);
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
Ok(json!(digest_str))
});
future::ok(env.format_response(result)) let result = Ok(json!(digest_str));
});
Ok(Box::new(resp)) Ok(env.format_response(result))
}
.boxed()
} }
#[sortable] #[sortable]
@ -177,8 +173,9 @@ fn upload_dynamic_chunk(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let wid = tools::required_integer_param(&param, "wid")? as usize; let wid = tools::required_integer_param(&param, "wid")? as usize;
let size = tools::required_integer_param(&param, "size")? as u32; let size = tools::required_integer_param(&param, "size")? as u32;
let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32; let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
@ -188,23 +185,17 @@ fn upload_dynamic_chunk(
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size); let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size)
.await?;
let resp = upload
.then(move |result| {
let env: &BackupEnvironment = rpcenv.as_ref();
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?; env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = proxmox::tools::digest_to_hex(&digest); let digest_str = proxmox::tools::digest_to_hex(&digest);
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
Ok(json!(digest_str))
});
future::ok(env.format_response(result)) let result = Ok(json!(digest_str));
}); Ok(env.format_response(result))
}.boxed()
Ok(Box::new(resp))
} }
pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new( pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
@ -218,16 +209,19 @@ fn upload_speedtest(
_param: Value, _param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
let resp = req_body async move {
let result = req_body
.map_err(Error::from) .map_err(Error::from)
.try_fold(0, |size: usize, chunk| { .try_fold(0, |size: usize, chunk| {
let sum = size + chunk.len(); let sum = size + chunk.len();
//println!("UPLOAD {} bytes, sum {}", chunk.len(), sum); //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
future::ok::<usize, Error>(sum) future::ok::<usize, Error>(sum)
}) })
.then(move |result| { .await;
match result { match result {
Ok(size) => { Ok(size) => {
println!("UPLOAD END {} bytes", size); println!("UPLOAD END {} bytes", size);
@ -237,10 +231,8 @@ fn upload_speedtest(
} }
} }
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
future::ok(env.format_response(Ok(Value::Null))) Ok(env.format_response(Ok(Value::Null)))
}); }.boxed()
Ok(Box::new(resp))
} }
#[sortable] #[sortable]
@ -265,40 +257,32 @@ fn upload_blob(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let file_name = tools::required_string_param(&param, "file-name")?.to_owned(); let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
let encoded_size = tools::required_integer_param(&param, "encoded-size")? as usize; let encoded_size = tools::required_integer_param(&param, "encoded-size")? as usize;
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
if !file_name.ends_with(".blob") { if !file_name.ends_with(".blob") {
bail!("wrong blob file extension: '{}'", file_name); bail!("wrong blob file extension: '{}'", file_name);
} }
let env2 = env.clone(); let data = req_body
let env3 = env.clone();
let resp = req_body
.map_err(Error::from) .map_err(Error::from)
.try_fold(Vec::new(), |mut acc, chunk| { .try_fold(Vec::new(), |mut acc, chunk| {
acc.extend_from_slice(&*chunk); acc.extend_from_slice(&*chunk);
future::ok::<_, Error>(acc) future::ok::<_, Error>(acc)
}) })
.and_then(move |data| async move { .await?;
if encoded_size != data.len() { if encoded_size != data.len() {
bail!("got blob with unexpected length ({} != {})", encoded_size, data.len()); bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
} }
env2.add_blob(&file_name, data)?; env.add_blob(&file_name, data)?;
Ok(()) Ok(env.format_response(Ok(Value::Null)))
}) }.boxed()
.and_then(move |_| {
future::ok(env3.format_response(Ok(Value::Null)))
})
;
Ok(Box::new(resp))
} }

View File

@ -49,8 +49,9 @@ fn upgrade_to_backup_reader_protocol(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let debug = param["debug"].as_bool().unwrap_or(false); let debug = param["debug"].as_bool().unwrap_or(false);
let store = tools::required_string_param(&param, "store")?.to_owned(); let store = tools::required_string_param(&param, "store")?.to_owned();
@ -134,7 +135,8 @@ fn upgrade_to_backup_reader_protocol(
.header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
.body(Body::empty())?; .body(Body::empty())?;
Ok(Box::new(futures::future::ok(response))) Ok(response)
}.boxed()
} }
pub const READER_API_ROUTER: Router = Router::new() pub const READER_API_ROUTER: Router = Router::new()
@ -170,10 +172,10 @@ fn download_file(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let env: &ReaderEnvironment = rpcenv.as_ref(); let env: &ReaderEnvironment = rpcenv.as_ref();
let env2 = env.clone();
let file_name = tools::required_string_param(&param, "file-name")?.to_owned(); let file_name = tools::required_string_param(&param, "file-name")?.to_owned();
@ -184,24 +186,24 @@ fn download_file(
let path2 = path.clone(); let path2 = path.clone();
let path3 = path.clone(); let path3 = path.clone();
let response_future = tokio::fs::File::open(path) let file = tokio::fs::File::open(path)
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
.and_then(move |file| { .await?;
env2.log(format!("download {:?}", path3));
env.log(format!("download {:?}", path3));
let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
.map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); .map_ok(|bytes| hyper::Chunk::from(bytes.freeze()));
let body = Body::wrap_stream(payload); let body = Body::wrap_stream(payload);
// fixme: set other headers ? // fixme: set other headers ?
futures::future::ok(Response::builder() Ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_TYPE, "application/octet-stream")
.body(body) .body(body)
.unwrap()) .unwrap())
}); }.boxed()
Ok(Box::new(response_future))
} }
#[sortable] #[sortable]
@ -221,8 +223,9 @@ fn download_chunk(
param: Value, param: Value,
_info: &ApiMethod, _info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>, rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
async move {
let env: &ReaderEnvironment = rpcenv.as_ref(); let env: &ReaderEnvironment = rpcenv.as_ref();
let digest_str = tools::required_string_param(&param, "digest")?; let digest_str = tools::required_string_param(&param, "digest")?;
@ -233,21 +236,19 @@ fn download_chunk(
env.debug(format!("download chunk {:?}", path)); env.debug(format!("download chunk {:?}", path));
let response_future = tokio::fs::read(path) let data = tokio::fs::read(path)
.map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err))) .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err)))
.and_then(move |data| { .await?;
let body = Body::from(data); let body = Body::from(data);
// fixme: set other headers ? // fixme: set other headers ?
futures::future::ok( Ok(Response::builder()
Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_TYPE, "application/octet-stream")
.body(body) .body(body)
.unwrap()) .unwrap())
}); }.boxed()
Ok(Box::new(response_future))
} }
/* this is too slow /* this is too slow
@ -302,7 +303,7 @@ fn speedtest(
_param: Value, _param: Value,
_info: &ApiMethod, _info: &ApiMethod,
_rpcenv: Box<dyn RpcEnvironment>, _rpcenv: Box<dyn RpcEnvironment>,
) -> Result<ApiFuture, Error> { ) -> ApiFuture {
let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...] let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
@ -314,5 +315,5 @@ fn speedtest(
.body(body) .body(body)
.unwrap(); .unwrap();
Ok(Box::new(future::ok(response))) future::ok(response).boxed()
} }

View File

@ -43,7 +43,7 @@ impl <E: RpcEnvironment + Clone> H2Service<E> {
let (path, components) = match tools::normalize_uri_path(parts.uri.path()) { let (path, components) = match tools::normalize_uri_path(parts.uri.path()) {
Ok((p,c)) => (p, c), Ok((p,c)) => (p, c),
Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))), Err(err) => return future::err(http_err!(BAD_REQUEST, err.to_string())).boxed(),
}; };
self.debug(format!("{} {}", method, path)); self.debug(format!("{} {}", method, path));
@ -55,17 +55,17 @@ impl <E: RpcEnvironment + Clone> H2Service<E> {
match self.router.find_method(&components, method, &mut uri_param) { match self.router.find_method(&components, method, &mut uri_param) {
None => { None => {
let err = http_err!(NOT_FOUND, "Path not found.".to_string()); let err = http_err!(NOT_FOUND, "Path not found.".to_string());
Box::new(future::ok((formatter.format_error)(err))) future::ok((formatter.format_error)(err)).boxed()
} }
Some(api_method) => { Some(api_method) => {
match api_method.handler { match api_method.handler {
ApiHandler::Sync(_) => { ApiHandler::Sync(_) => {
crate::server::rest::handle_sync_api_request( crate::server::rest::handle_sync_api_request(
self.rpcenv.clone(), api_method, formatter, parts, body, uri_param) self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed()
} }
ApiHandler::Async(_) => { ApiHandler::Async(_) => {
crate::server::rest::handle_async_api_request( crate::server::rest::handle_async_api_request(
self.rpcenv.clone(), api_method, formatter, parts, body, uri_param) self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed()
} }
} }
} }

View File

@ -6,7 +6,7 @@ use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use failure::*; use failure::*;
use futures::future::{self, Either, FutureExt, TryFutureExt}; use futures::future::{self, FutureExt, TryFutureExt};
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
use hyper::header; use hyper::header;
use hyper::http::request::Parts; use hyper::http::request::Parts;
@ -17,7 +17,7 @@ use tokio::fs::File;
use url::form_urlencoded; use url::form_urlencoded;
use proxmox::api::http_err; use proxmox::api::http_err;
use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, HttpError}; use proxmox::api::{ApiHandler, ApiMethod, HttpError};
use proxmox::api::{RpcEnvironment, RpcEnvironmentType}; use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
use proxmox::api::schema::{parse_simple_value, verify_json_object, parse_parameter_strings}; use proxmox::api::schema::{parse_simple_value, verify_json_object, parse_parameter_strings};
@ -125,7 +125,7 @@ impl tower_service::Service<Request<Body>> for ApiService {
let method = req.method().clone(); let method = req.method().clone();
let peer = self.peer; let peer = self.peer;
Pin::from(handle_request(self.api_config.clone(), req)) handle_request(self.api_config.clone(), req)
.map(move |result| match result { .map(move |result| match result {
Ok(res) => { Ok(res) => {
log_response(&peer, method, &path, &res); log_response(&peer, method, &path, &res);
@ -149,13 +149,13 @@ impl tower_service::Service<Request<Body>> for ApiService {
} }
} }
fn get_request_parameters_async<S: 'static + BuildHasher + Send>( async fn get_request_parameters_async<S: 'static + BuildHasher + Send>(
info: &'static ApiMethod, info: &'static ApiMethod,
parts: Parts, parts: Parts,
req_body: Body, req_body: Body,
uri_param: HashMap<String, String, S>, uri_param: HashMap<String, String, S>,
) -> Box<dyn Future<Output = Result<Value, failure::Error>> + Send> ) -> Result<Value, Error> {
{
let mut is_json = false; let mut is_json = false;
if let Some(value) = parts.headers.get(header::CONTENT_TYPE) { if let Some(value) = parts.headers.get(header::CONTENT_TYPE) {
@ -166,13 +166,11 @@ fn get_request_parameters_async<S: 'static + BuildHasher + Send>(
Ok(Some("application/json")) => { Ok(Some("application/json")) => {
is_json = true; is_json = true;
} }
_ => { _ => bail!("unsupported content type {:?}", value.to_str()),
return Box::new(future::err(http_err!(BAD_REQUEST, "unsupported content type".to_string())));
}
} }
} }
let resp = req_body let body = req_body
.map_err(|err| http_err!(BAD_REQUEST, format!("Promlems reading request body: {}", err))) .map_err(|err| http_err!(BAD_REQUEST, format!("Promlems reading request body: {}", err)))
.try_fold(Vec::new(), |mut acc, chunk| async move { .try_fold(Vec::new(), |mut acc, chunk| async move {
if acc.len() + chunk.len() < 64*1024 { //fimxe: max request body size? if acc.len() + chunk.len() < 64*1024 { //fimxe: max request body size?
@ -181,9 +179,10 @@ fn get_request_parameters_async<S: 'static + BuildHasher + Send>(
} else { } else {
Err(http_err!(BAD_REQUEST, "Request body too large".to_string())) Err(http_err!(BAD_REQUEST, "Request body too large".to_string()))
} }
}) }).await?;
.and_then(move |body| async move {
let utf8 = std::str::from_utf8(&body)?; let utf8 = std::str::from_utf8(&body)
.map_err(|err| format_err!("Request body not uft8: {}", err))?;
let obj_schema = &info.parameters; let obj_schema = &info.parameters;
@ -220,18 +219,15 @@ fn get_request_parameters_async<S: 'static + BuildHasher + Send>(
let params = parse_parameter_strings(&param_list, obj_schema, true)?; let params = parse_parameter_strings(&param_list, obj_schema, true)?;
Ok(params) Ok(params)
}.boxed());
Box::new(resp)
} }
struct NoLogExtension(); struct NoLogExtension();
fn proxy_protected_request( async fn proxy_protected_request(
info: &'static ApiMethod, info: &'static ApiMethod,
mut parts: Parts, mut parts: Parts,
req_body: Body, req_body: Body,
) -> ApiFuture { ) -> Result<Response<Body>, Error> {
let mut uri_parts = parts.uri.clone().into_parts(); let mut uri_parts = parts.uri.clone().into_parts();
@ -243,49 +239,40 @@ fn proxy_protected_request(
let request = Request::from_parts(parts, req_body); let request = Request::from_parts(parts, req_body);
let reload_timezone = info.reload_timezone;
let resp = hyper::client::Client::new() let resp = hyper::client::Client::new()
.request(request) .request(request)
.map_err(Error::from) .map_err(Error::from)
.map_ok(|mut resp| { .map_ok(|mut resp| {
resp.extensions_mut().insert(NoLogExtension()); resp.extensions_mut().insert(NoLogExtension());
resp resp
});
let reload_timezone = info.reload_timezone;
Box::new(async move {
let result = resp.await;
if reload_timezone {
unsafe {
tzset();
}
}
result
}) })
.await?;
if reload_timezone { unsafe { tzset(); } }
Ok(resp)
} }
pub fn handle_sync_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>( pub async fn handle_sync_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + Send>(
mut rpcenv: Env, mut rpcenv: Env,
info: &'static ApiMethod, info: &'static ApiMethod,
formatter: &'static OutputFormatter, formatter: &'static OutputFormatter,
parts: Parts, parts: Parts,
req_body: Body, req_body: Body,
uri_param: HashMap<String, String, S>, uri_param: HashMap<String, String, S>,
) -> ApiFuture ) -> Result<Response<Body>, Error> {
{
let handler = match info.handler { let handler = match info.handler {
ApiHandler::Async(_) => { ApiHandler::Async(_) => bail!("handle_sync_api_request: internal error (called with Async handler)"),
panic!("fixme");
}
ApiHandler::Sync(handler) => handler, ApiHandler::Sync(handler) => handler,
}; };
let params = get_request_parameters_async(info, parts, req_body, uri_param); let params = get_request_parameters_async(info, parts, req_body, uri_param).await?;
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000); let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
let resp = Pin::from(params)
.and_then(move |params| {
let mut delay = false; let mut delay = false;
let resp = match (handler)(params, info, &mut rpcenv) { let resp = match (handler)(params, info, &mut rpcenv) {
@ -300,36 +287,26 @@ pub fn handle_sync_api_request<Env: RpcEnvironment, S: 'static + BuildHasher + S
} }
}; };
if info.reload_timezone { if info.reload_timezone { unsafe { tzset(); } }
unsafe { tzset() };
}
if delay { if delay {
Either::Left(delayed_response(resp, delay_unauth_time)) tokio::timer::delay(delay_unauth_time).await;
} else {
Either::Right(future::ok(resp))
} }
})
.or_else(move |err| {
future::ok((formatter.format_error)(err))
});
Box::new(resp) Ok(resp)
} }
pub fn handle_async_api_request<Env: RpcEnvironment>( pub async fn handle_async_api_request<Env: RpcEnvironment>(
rpcenv: Env, rpcenv: Env,
info: &'static ApiMethod, info: &'static ApiMethod,
formatter: &'static OutputFormatter, formatter: &'static OutputFormatter,
parts: Parts, parts: Parts,
req_body: Body, req_body: Body,
uri_param: HashMap<String, String>, uri_param: HashMap<String, String>,
) -> ApiFuture ) -> Result<Response<Body>, Error> {
{
let handler = match info.handler { let handler = match info.handler {
ApiHandler::Sync(_) => { ApiHandler::Sync(_) => bail!("handle_async_api_request: internal error (called with Sync handler)"),
panic!("fixme");
}
ApiHandler::Async(handler) => handler, ApiHandler::Async(handler) => handler,
}; };
@ -350,18 +327,14 @@ pub fn handle_async_api_request<Env: RpcEnvironment>(
let params = match parse_parameter_strings(&param_list, &info.parameters, true) { let params = match parse_parameter_strings(&param_list, &info.parameters, true) {
Ok(v) => v, Ok(v) => v,
Err(err) => { Err(err) => {
let resp = (formatter.format_error)(Error::from(err)); return Ok((formatter.format_error)(Error::from(err)));
return Box::new(future::ok(resp));
} }
}; };
match (handler)(parts, req_body, params, info, Box::new(rpcenv)) {
Ok(future) => future, let resp = (handler)(parts, req_body, params, info, Box::new(rpcenv)).await?;
Err(err) => {
let resp = (formatter.format_error)(err); Ok(resp)
Box::new(future::ok(resp))
}
}
} }
fn get_index(username: Option<String>, token: Option<String>) -> Response<Body> { fn get_index(username: Option<String>, token: Option<String>) -> Response<Body> {
@ -491,9 +464,9 @@ async fn chuncked_static_file_download(filename: PathBuf) -> Result<Response<Bod
) )
} }
fn handle_static_file_download(filename: PathBuf) -> ApiFuture { async fn handle_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
let response = tokio::fs::metadata(filename.clone()) tokio::fs::metadata(filename.clone())
.map_err(|err| http_err!(BAD_REQUEST, format!("File access problems: {}", err))) .map_err(|err| http_err!(BAD_REQUEST, format!("File access problems: {}", err)))
.and_then(|metadata| async move { .and_then(|metadata| async move {
if metadata.len() < 1024*32 { if metadata.len() < 1024*32 {
@ -501,9 +474,8 @@ fn handle_static_file_download(filename: PathBuf) -> ApiFuture {
} else { } else {
chuncked_static_file_download(filename).await chuncked_static_file_download(filename).await
} }
}); })
.await
Box::new(response)
} }
fn extract_auth_data(headers: &http::HeaderMap) -> (Option<String>, Option<String>) { fn extract_auth_data(headers: &http::HeaderMap) -> (Option<String>, Option<String>) {
@ -548,24 +520,12 @@ fn check_auth(method: &hyper::Method, ticket: &Option<String>, token: &Option<St
Ok(username) Ok(username)
} }
async fn delayed_response( pub async fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> Result<Response<Body>, Error> {
resp: Response<Body>,
delay_unauth_time: std::time::Instant,
) -> Result<Response<Body>, Error> {
tokio::timer::delay(delay_unauth_time).await;
Ok(resp)
}
pub fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> ApiFuture {
let (parts, body) = req.into_parts(); let (parts, body) = req.into_parts();
let method = parts.method.clone(); let method = parts.method.clone();
let (path, components) = tools::normalize_uri_path(parts.uri.path())?;
let (path, components) = match tools::normalize_uri_path(parts.uri.path()) {
Ok((p,c)) => (p, c),
Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))),
};
let comp_len = components.len(); let comp_len = components.len();
@ -580,13 +540,13 @@ pub fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> ApiFuture {
if comp_len >= 1 && components[0] == "api2" { if comp_len >= 1 && components[0] == "api2" {
if comp_len >= 2 { if comp_len >= 2 {
let format = components[1]; let format = components[1];
let formatter = match format { let formatter = match format {
"json" => &JSON_FORMATTER, "json" => &JSON_FORMATTER,
"extjs" => &EXTJS_FORMATTER, "extjs" => &EXTJS_FORMATTER,
_ => { _ => bail!("Unsupported output format '{}'.", format),
return Box::new(future::err(http_err!(BAD_REQUEST, format!("Unsupported output format '{}'.", format))));
}
}; };
let mut uri_param = HashMap::new(); let mut uri_param = HashMap::new();
@ -605,9 +565,8 @@ pub fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> ApiFuture {
Err(err) => { Err(err) => {
// always delay unauthorized calls by 3 seconds (from start of request) // always delay unauthorized calls by 3 seconds (from start of request)
let err = http_err!(UNAUTHORIZED, format!("permission check failed - {}", err)); let err = http_err!(UNAUTHORIZED, format!("permission check failed - {}", err));
return Box::new( tokio::timer::delay(delay_unauth_time).await;
delayed_response((formatter.format_error)(err), delay_unauth_time) return Ok((formatter.format_error)(err));
);
} }
} }
} }
@ -615,18 +574,18 @@ pub fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> ApiFuture {
match api.find_method(&components[2..], method, &mut uri_param) { match api.find_method(&components[2..], method, &mut uri_param) {
None => { None => {
let err = http_err!(NOT_FOUND, "Path not found.".to_string()); let err = http_err!(NOT_FOUND, "Path not found.".to_string());
return Box::new(future::ok((formatter.format_error)(err))); return Ok((formatter.format_error)(err));
} }
Some(api_method) => { Some(api_method) => {
if api_method.protected && env_type == RpcEnvironmentType::PUBLIC { if api_method.protected && env_type == RpcEnvironmentType::PUBLIC {
return proxy_protected_request(api_method, parts, body); return proxy_protected_request(api_method, parts, body).await;
} else { } else {
match api_method.handler { match api_method.handler {
ApiHandler::Sync(_) => { ApiHandler::Sync(_) => {
return handle_sync_api_request(rpcenv, api_method, formatter, parts, body, uri_param); return handle_sync_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await;
} }
ApiHandler::Async(_) => { ApiHandler::Async(_) => {
return handle_async_api_request(rpcenv, api_method, formatter, parts, body, uri_param); return handle_async_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await;
} }
} }
} }
@ -637,7 +596,7 @@ pub fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> ApiFuture {
// not Auth required for accessing files! // not Auth required for accessing files!
if method != hyper::Method::GET { if method != hyper::Method::GET {
return Box::new(future::err(http_err!(BAD_REQUEST, "Unsupported method".to_string()))); bail!("Unsupported HTTP method {}", method);
} }
if comp_len == 0 { if comp_len == 0 {
@ -646,20 +605,21 @@ pub fn handle_request(api: Arc<ApiConfig>, req: Request<Body>) -> ApiFuture {
match check_auth(&method, &ticket, &token) { match check_auth(&method, &ticket, &token) {
Ok(username) => { Ok(username) => {
let new_token = assemble_csrf_prevention_token(csrf_secret(), &username); let new_token = assemble_csrf_prevention_token(csrf_secret(), &username);
return Box::new(future::ok(get_index(Some(username), Some(new_token)))); return Ok(get_index(Some(username), Some(new_token)));
} }
_ => { _ => {
return Box::new(delayed_response(get_index(None, None), delay_unauth_time)); tokio::timer::delay(delay_unauth_time).await;
return Ok(get_index(None, None));
} }
} }
} else { } else {
return Box::new(future::ok(get_index(None, None))); return Ok(get_index(None, None));
} }
} else { } else {
let filename = api.find_alias(&components); let filename = api.find_alias(&components);
return handle_static_file_download(filename); return handle_static_file_download(filename).await;
} }
} }
Box::new(future::err(http_err!(NOT_FOUND, "Path not found.".to_string()))) Err(http_err!(NOT_FOUND, "Path not found.".to_string()))
} }