diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index a7a37443..137d6b79 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -480,43 +480,43 @@ fn download_file( param: Value, _info: &ApiMethod, _rpcenv: Box, -) -> Result { +) -> ApiFuture { - let store = tools::required_string_param(¶m, "store")?; + async move { + let store = tools::required_string_param(¶m, "store")?; - let datastore = DataStore::lookup_datastore(store)?; + let datastore = DataStore::lookup_datastore(store)?; - let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); + let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); - let backup_type = tools::required_string_param(¶m, "backup-type")?; - let backup_id = tools::required_string_param(¶m, "backup-id")?; - let backup_time = tools::required_integer_param(¶m, "backup-time")?; + let backup_type = tools::required_string_param(¶m, "backup-type")?; + let backup_id = tools::required_string_param(¶m, "backup-id")?; + let backup_time = tools::required_integer_param(¶m, "backup-time")?; - println!("Download {} from {} ({}/{}/{}/{})", file_name, store, - backup_type, backup_id, Local.timestamp(backup_time, 0), file_name); + println!("Download {} from {} ({}/{}/{}/{})", file_name, store, + backup_type, backup_id, Local.timestamp(backup_time, 0), file_name); - let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); + let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); - let mut path = datastore.base_path(); - path.push(backup_dir.relative_path()); - path.push(&file_name); + let mut path = datastore.base_path(); + path.push(backup_dir.relative_path()); + path.push(&file_name); - let response_future = tokio::fs::File::open(path) - .map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err))) - .and_then(move |file| { - let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) - .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); - let body = Body::wrap_stream(payload); + let file = tokio::fs::File::open(path) + .map_err(|err| http_err!(BAD_REQUEST, format!("File open failed: {}", err))) + .await?; - // fixme: set other headers ? - futures::future::ok(Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/octet-stream") - .body(body) - .unwrap()) - }); + let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) + .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); + let body = Body::wrap_stream(payload); - Ok(Box::new(response_future)) + // fixme: set other headers ? + Ok(Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(body) + .unwrap()) + }.boxed() } #[sortable] @@ -543,51 +543,49 @@ fn upload_backup_log( param: Value, _info: &ApiMethod, _rpcenv: Box, -) -> Result { +) -> ApiFuture { - let store = tools::required_string_param(¶m, "store")?; + async move { + let store = tools::required_string_param(¶m, "store")?; - let datastore = DataStore::lookup_datastore(store)?; + let datastore = DataStore::lookup_datastore(store)?; - let file_name = "client.log.blob"; + let file_name = "client.log.blob"; - let backup_type = tools::required_string_param(¶m, "backup-type")?; - let backup_id = tools::required_string_param(¶m, "backup-id")?; - let backup_time = tools::required_integer_param(¶m, "backup-time")?; + let backup_type = tools::required_string_param(¶m, "backup-type")?; + let backup_id = tools::required_string_param(¶m, "backup-id")?; + let backup_time = tools::required_integer_param(¶m, "backup-time")?; - let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); + let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); - let mut path = datastore.base_path(); - path.push(backup_dir.relative_path()); - path.push(&file_name); + let mut path = datastore.base_path(); + path.push(backup_dir.relative_path()); + path.push(&file_name); - if path.exists() { - bail!("backup already contains a log."); - } + if path.exists() { + bail!("backup already contains a log."); + } - println!("Upload backup log to {}/{}/{}/{}/{}", store, - backup_type, backup_id, BackupDir::backup_time_to_string(backup_dir.backup_time()), file_name); + println!("Upload backup log to {}/{}/{}/{}/{}", store, + backup_type, backup_id, BackupDir::backup_time_to_string(backup_dir.backup_time()), file_name); - let resp = req_body - .map_err(Error::from) - .try_fold(Vec::new(), |mut acc, chunk| { - acc.extend_from_slice(&*chunk); - future::ok::<_, Error>(acc) - }) - .and_then(move |data| async move { - let blob = DataBlob::from_raw(data)?; - // always verify CRC at server side - blob.verify_crc()?; - let raw_data = blob.raw_data(); - file_set_contents(&path, raw_data, None)?; - Ok(()) - }) - .and_then(move |_| { - future::ok(crate::server::formatter::json_response(Ok(Value::Null))) - }) - ; + let data = req_body + .map_err(Error::from) + .try_fold(Vec::new(), |mut acc, chunk| { + acc.extend_from_slice(&*chunk); + future::ok::<_, Error>(acc) + }) + .await?; - Ok(Box::new(resp)) + let blob = DataBlob::from_raw(data)?; + // always verify CRC at server side + blob.verify_crc()?; + let raw_data = blob.raw_data(); + file_set_contents(&path, raw_data, None)?; + + // fixme: use correct formatter + Ok(crate::server::formatter::json_response(Ok(Value::Null))) + }.boxed() } #[sortable] @@ -698,7 +696,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[ ), ]; -const DATASTORE_INFO_ROUTER: Router = Router::new() +const DATASTORE_INFO_ROUTER: Router = Router::new() .get(&list_subdirs_api_method!(DATASTORE_INFO_SUBDIRS)) .subdirs(DATASTORE_INFO_SUBDIRS); diff --git a/src/api2/backup.rs b/src/api2/backup.rs index 4860f0be..bf731153 100644 --- a/src/api2/backup.rs +++ b/src/api2/backup.rs @@ -47,8 +47,9 @@ fn upgrade_to_backup_protocol( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { + async move { let debug = param["debug"].as_bool().unwrap_or(false); let store = tools::required_string_param(¶m, "store")?.to_owned(); @@ -159,7 +160,8 @@ fn upgrade_to_backup_protocol( .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!())) .body(Body::empty())?; - Ok(Box::new(futures::future::ok(response))) + Ok(response) + }.boxed() } pub const BACKUP_API_SUBDIRS: SubdirMap = &[ @@ -569,57 +571,59 @@ fn dynamic_chunk_index( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let env: &BackupEnvironment = rpcenv.as_ref(); + async move { + let env: &BackupEnvironment = rpcenv.as_ref(); - let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); + let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); - if !archive_name.ends_with(".didx") { - bail!("wrong archive extension: '{}'", archive_name); - } - - let empty_response = { - Response::builder() - .status(StatusCode::OK) - .body(Body::empty())? - }; - - let last_backup = match &env.last_backup { - Some(info) => info, - None => return Ok(Box::new(future::ok(empty_response))), - }; - - let mut path = last_backup.backup_dir.relative_path(); - path.push(&archive_name); - - let index = match env.datastore.open_dynamic_reader(path) { - Ok(index) => index, - Err(_) => { - env.log(format!("there is no last backup for archive '{}'", archive_name)); - return Ok(Box::new(future::ok(empty_response))); + if !archive_name.ends_with(".didx") { + bail!("wrong archive extension: '{}'", archive_name); } - }; - env.log(format!("download last backup index for archive '{}'", archive_name)); + let empty_response = { + Response::builder() + .status(StatusCode::OK) + .body(Body::empty())? + }; - let count = index.index_count(); - for pos in 0..count { - let (start, end, digest) = index.chunk_info(pos)?; - let size = (end - start) as u32; - env.register_chunk(digest, size)?; - } + let last_backup = match &env.last_backup { + Some(info) => info, + None => return Ok(empty_response), + }; - let reader = DigestListEncoder::new(Box::new(index)); + let mut path = last_backup.backup_dir.relative_path(); + path.push(&archive_name); - let stream = WrappedReaderStream::new(reader); + let index = match env.datastore.open_dynamic_reader(path) { + Ok(index) => index, + Err(_) => { + env.log(format!("there is no last backup for archive '{}'", archive_name)); + return Ok(empty_response); + } + }; - // fixme: set size, content type? - let response = http::Response::builder() - .status(200) - .body(Body::wrap_stream(stream))?; + env.log(format!("download last backup index for archive '{}'", archive_name)); - Ok(Box::new(future::ok(response))) + let count = index.index_count(); + for pos in 0..count { + let (start, end, digest) = index.chunk_info(pos)?; + let size = (end - start) as u32; + env.register_chunk(digest, size)?; + } + + let reader = DigestListEncoder::new(Box::new(index)); + + let stream = WrappedReaderStream::new(reader); + + // fixme: set size, content type? + let response = http::Response::builder() + .status(200) + .body(Body::wrap_stream(stream))?; + + Ok(response) + }.boxed() } #[sortable] @@ -642,60 +646,62 @@ fn fixed_chunk_index( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let env: &BackupEnvironment = rpcenv.as_ref(); + async move { + let env: &BackupEnvironment = rpcenv.as_ref(); - let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); + let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); - if !archive_name.ends_with(".fidx") { - bail!("wrong archive extension: '{}'", archive_name); - } - - let empty_response = { - Response::builder() - .status(StatusCode::OK) - .body(Body::empty())? - }; - - let last_backup = match &env.last_backup { - Some(info) => info, - None => return Ok(Box::new(future::ok(empty_response))), - }; - - let mut path = last_backup.backup_dir.relative_path(); - path.push(&archive_name); - - let index = match env.datastore.open_fixed_reader(path) { - Ok(index) => index, - Err(_) => { - env.log(format!("there is no last backup for archive '{}'", archive_name)); - return Ok(Box::new(future::ok(empty_response))); + if !archive_name.ends_with(".fidx") { + bail!("wrong archive extension: '{}'", archive_name); } - }; - env.log(format!("download last backup index for archive '{}'", archive_name)); + let empty_response = { + Response::builder() + .status(StatusCode::OK) + .body(Body::empty())? + }; - let count = index.index_count(); - let image_size = index.index_bytes(); - for pos in 0..count { - let digest = index.index_digest(pos).unwrap(); - // Note: last chunk can be smaller - let start = (pos*index.chunk_size) as u64; - let mut end = start + index.chunk_size as u64; - if end > image_size { end = image_size; } - let size = (end - start) as u32; - env.register_chunk(*digest, size)?; - } + let last_backup = match &env.last_backup { + Some(info) => info, + None => return Ok(empty_response), + }; - let reader = DigestListEncoder::new(Box::new(index)); + let mut path = last_backup.backup_dir.relative_path(); + path.push(&archive_name); - let stream = WrappedReaderStream::new(reader); + let index = match env.datastore.open_fixed_reader(path) { + Ok(index) => index, + Err(_) => { + env.log(format!("there is no last backup for archive '{}'", archive_name)); + return Ok(empty_response); + } + }; - // fixme: set size, content type? - let response = http::Response::builder() - .status(200) - .body(Body::wrap_stream(stream))?; + env.log(format!("download last backup index for archive '{}'", archive_name)); - Ok(Box::new(future::ok(response))) + let count = index.index_count(); + let image_size = index.index_bytes(); + for pos in 0..count { + let digest = index.index_digest(pos).unwrap(); + // Note: last chunk can be smaller + let start = (pos*index.chunk_size) as u64; + let mut end = start + index.chunk_size as u64; + if end > image_size { end = image_size; } + let size = (end - start) as u32; + env.register_chunk(*digest, size)?; + } + + let reader = DigestListEncoder::new(Box::new(index)); + + let stream = WrappedReaderStream::new(reader); + + // fixme: set size, content type? + let response = http::Response::builder() + .status(200) + .body(Body::wrap_stream(stream))?; + + Ok(response) + }.boxed() } diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs index 620dff5e..4990b7fd 100644 --- a/src/api2/backup/upload_chunk.rs +++ b/src/api2/backup/upload_chunk.rs @@ -115,34 +115,30 @@ fn upload_fixed_chunk( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let wid = tools::required_integer_param(¶m, "wid")? as usize; - let size = tools::required_integer_param(¶m, "size")? as u32; - let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as u32; + async move { + let wid = tools::required_integer_param(¶m, "wid")? as usize; + let size = tools::required_integer_param(¶m, "size")? as u32; + let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as u32; - let digest_str = tools::required_string_param(¶m, "digest")?; - let digest = proxmox::tools::hex_to_digest(digest_str)?; + let digest_str = tools::required_string_param(¶m, "digest")?; + let digest = proxmox::tools::hex_to_digest(digest_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); - let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size); + let (digest, size, compressed_size, is_duplicate) = + UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?; - let resp = upload - .then(move |result| { - let env: &BackupEnvironment = rpcenv.as_ref(); + env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?; + let digest_str = proxmox::tools::digest_to_hex(&digest); + env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); - let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| { - env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?; - let digest_str = proxmox::tools::digest_to_hex(&digest); - env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); - Ok(json!(digest_str)) - }); + let result = Ok(json!(digest_str)); - future::ok(env.format_response(result)) - }); - - Ok(Box::new(resp)) + Ok(env.format_response(result)) + } + .boxed() } #[sortable] @@ -177,34 +173,29 @@ fn upload_dynamic_chunk( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let wid = tools::required_integer_param(¶m, "wid")? as usize; - let size = tools::required_integer_param(¶m, "size")? as u32; - let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as u32; + async move { + let wid = tools::required_integer_param(¶m, "wid")? as usize; + let size = tools::required_integer_param(¶m, "size")? as u32; + let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as u32; - let digest_str = tools::required_string_param(¶m, "digest")?; - let digest = proxmox::tools::hex_to_digest(digest_str)?; + let digest_str = tools::required_string_param(¶m, "digest")?; + let digest = proxmox::tools::hex_to_digest(digest_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); - let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size); + let (digest, size, compressed_size, is_duplicate) = + UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size) + .await?; - let resp = upload - .then(move |result| { - let env: &BackupEnvironment = rpcenv.as_ref(); + env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?; + let digest_str = proxmox::tools::digest_to_hex(&digest); + env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); - let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| { - env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?; - let digest_str = proxmox::tools::digest_to_hex(&digest); - env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); - Ok(json!(digest_str)) - }); - - future::ok(env.format_response(result)) - }); - - Ok(Box::new(resp)) + let result = Ok(json!(digest_str)); + Ok(env.format_response(result)) + }.boxed() } pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new( @@ -218,29 +209,30 @@ fn upload_speedtest( _param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let resp = req_body - .map_err(Error::from) - .try_fold(0, |size: usize, chunk| { - let sum = size + chunk.len(); - //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum); - future::ok::(sum) - }) - .then(move |result| { - match result { - Ok(size) => { - println!("UPLOAD END {} bytes", size); - } - Err(err) => { - println!("Upload error: {}", err); - } + async move { + + let result = req_body + .map_err(Error::from) + .try_fold(0, |size: usize, chunk| { + let sum = size + chunk.len(); + //println!("UPLOAD {} bytes, sum {}", chunk.len(), sum); + future::ok::(sum) + }) + .await; + + match result { + Ok(size) => { + println!("UPLOAD END {} bytes", size); } - let env: &BackupEnvironment = rpcenv.as_ref(); - future::ok(env.format_response(Ok(Value::Null))) - }); - - Ok(Box::new(resp)) + Err(err) => { + println!("Upload error: {}", err); + } + } + let env: &BackupEnvironment = rpcenv.as_ref(); + Ok(env.format_response(Ok(Value::Null))) + }.boxed() } #[sortable] @@ -265,40 +257,32 @@ fn upload_blob( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); - let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as usize; + async move { + let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); + let encoded_size = tools::required_integer_param(¶m, "encoded-size")? as usize; + let env: &BackupEnvironment = rpcenv.as_ref(); - let env: &BackupEnvironment = rpcenv.as_ref(); + if !file_name.ends_with(".blob") { + bail!("wrong blob file extension: '{}'", file_name); + } - if !file_name.ends_with(".blob") { - bail!("wrong blob file extension: '{}'", file_name); - } + let data = req_body + .map_err(Error::from) + .try_fold(Vec::new(), |mut acc, chunk| { + acc.extend_from_slice(&*chunk); + future::ok::<_, Error>(acc) + }) + .await?; - let env2 = env.clone(); - let env3 = env.clone(); + if encoded_size != data.len() { + bail!("got blob with unexpected length ({} != {})", encoded_size, data.len()); + } - let resp = req_body - .map_err(Error::from) - .try_fold(Vec::new(), |mut acc, chunk| { - acc.extend_from_slice(&*chunk); - future::ok::<_, Error>(acc) - }) - .and_then(move |data| async move { - if encoded_size != data.len() { - bail!("got blob with unexpected length ({} != {})", encoded_size, data.len()); - } + env.add_blob(&file_name, data)?; - env2.add_blob(&file_name, data)?; - - Ok(()) - }) - .and_then(move |_| { - future::ok(env3.format_response(Ok(Value::Null))) - }) - ; - - Ok(Box::new(resp)) + Ok(env.format_response(Ok(Value::Null))) + }.boxed() } diff --git a/src/api2/reader.rs b/src/api2/reader.rs index 575803fa..efd04bdf 100644 --- a/src/api2/reader.rs +++ b/src/api2/reader.rs @@ -49,92 +49,94 @@ fn upgrade_to_backup_reader_protocol( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let debug = param["debug"].as_bool().unwrap_or(false); + async move { + let debug = param["debug"].as_bool().unwrap_or(false); - let store = tools::required_string_param(¶m, "store")?.to_owned(); - let datastore = DataStore::lookup_datastore(&store)?; + let store = tools::required_string_param(¶m, "store")?.to_owned(); + let datastore = DataStore::lookup_datastore(&store)?; - let backup_type = tools::required_string_param(¶m, "backup-type")?; - let backup_id = tools::required_string_param(¶m, "backup-id")?; - let backup_time = tools::required_integer_param(¶m, "backup-time")?; + let backup_type = tools::required_string_param(¶m, "backup-type")?; + let backup_id = tools::required_string_param(¶m, "backup-id")?; + let backup_time = tools::required_integer_param(¶m, "backup-time")?; - let protocols = parts - .headers - .get("UPGRADE") - .ok_or_else(|| format_err!("missing Upgrade header"))? + let protocols = parts + .headers + .get("UPGRADE") + .ok_or_else(|| format_err!("missing Upgrade header"))? .to_str()?; - if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() { - bail!("invalid protocol name"); - } + if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() { + bail!("invalid protocol name"); + } - if parts.version >= http::version::Version::HTTP_2 { - bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); - } + if parts.version >= http::version::Version::HTTP_2 { + bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); + } - let username = rpcenv.get_user().unwrap(); - let env_type = rpcenv.env_type(); + let username = rpcenv.get_user().unwrap(); + let env_type = rpcenv.env_type(); - let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); - let path = datastore.base_path(); + let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); + let path = datastore.base_path(); - //let files = BackupInfo::list_files(&path, &backup_dir)?; + //let files = BackupInfo::list_files(&path, &backup_dir)?; - let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp()); + let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp()); - WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| { - let mut env = ReaderEnvironment::new( - env_type, username.clone(), worker.clone(), datastore, backup_dir); + WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| { + let mut env = ReaderEnvironment::new( + env_type, username.clone(), worker.clone(), datastore, backup_dir); - env.debug = debug; + env.debug = debug; - env.log(format!("starting new backup reader datastore '{}': {:?}", store, path)); + env.log(format!("starting new backup reader datastore '{}': {:?}", store, path)); - let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug); + let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug); - let abort_future = worker.abort_future(); + let abort_future = worker.abort_future(); - let req_fut = req_body - .on_upgrade() - .map_err(Error::from) - .and_then({ - let env = env.clone(); - move |conn| { - env.debug("protocol upgrade done"); + let req_fut = req_body + .on_upgrade() + .map_err(Error::from) + .and_then({ + let env = env.clone(); + move |conn| { + env.debug("protocol upgrade done"); - let mut http = hyper::server::conn::Http::new(); - http.http2_only(true); - // increase window size: todo - find optiomal size - let window_size = 32*1024*1024; // max = (1 << 31) - 2 - http.http2_initial_stream_window_size(window_size); - http.http2_initial_connection_window_size(window_size); + let mut http = hyper::server::conn::Http::new(); + http.http2_only(true); + // increase window size: todo - find optiomal size + let window_size = 32*1024*1024; // max = (1 << 31) - 2 + http.http2_initial_stream_window_size(window_size); + http.http2_initial_connection_window_size(window_size); - http.serve_connection(conn, service) - .map_err(Error::from) - } - }); - let abort_future = abort_future - .map(|_| Err(format_err!("task aborted"))); + http.serve_connection(conn, service) + .map_err(Error::from) + } + }); + let abort_future = abort_future + .map(|_| Err(format_err!("task aborted"))); - use futures::future::Either; - futures::future::select(req_fut, abort_future) - .map(|res| match res { - Either::Left((Ok(res), _)) => Ok(res), - Either::Left((Err(err), _)) => Err(err), - Either::Right((Ok(res), _)) => Ok(res), - Either::Right((Err(err), _)) => Err(err), - }) - .map_ok(move |_| env.log("reader finished sucessfully")) - })?; + use futures::future::Either; + futures::future::select(req_fut, abort_future) + .map(|res| match res { + Either::Left((Ok(res), _)) => Ok(res), + Either::Left((Err(err), _)) => Err(err), + Either::Right((Ok(res), _)) => Ok(res), + Either::Right((Err(err), _)) => Err(err), + }) + .map_ok(move |_| env.log("reader finished sucessfully")) + })?; - let response = Response::builder() - .status(StatusCode::SWITCHING_PROTOCOLS) - .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) - .body(Body::empty())?; + let response = Response::builder() + .status(StatusCode::SWITCHING_PROTOCOLS) + .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) + .body(Body::empty())?; - Ok(Box::new(futures::future::ok(response))) + Ok(response) + }.boxed() } pub const READER_API_ROUTER: Router = Router::new() @@ -170,38 +172,38 @@ fn download_file( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let env: &ReaderEnvironment = rpcenv.as_ref(); - let env2 = env.clone(); + async move { + let env: &ReaderEnvironment = rpcenv.as_ref(); - let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); + let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); - let mut path = env.datastore.base_path(); - path.push(env.backup_dir.relative_path()); - path.push(&file_name); + let mut path = env.datastore.base_path(); + path.push(env.backup_dir.relative_path()); + path.push(&file_name); - let path2 = path.clone(); - let path3 = path.clone(); + let path2 = path.clone(); + let path3 = path.clone(); - let response_future = tokio::fs::File::open(path) - .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) - .and_then(move |file| { - env2.log(format!("download {:?}", path3)); - let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) - .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); + let file = tokio::fs::File::open(path) + .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) + .await?; - let body = Body::wrap_stream(payload); + env.log(format!("download {:?}", path3)); - // fixme: set other headers ? - futures::future::ok(Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/octet-stream") - .body(body) - .unwrap()) - }); + let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) + .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); - Ok(Box::new(response_future)) + let body = Body::wrap_stream(payload); + + // fixme: set other headers ? + Ok(Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(body) + .unwrap()) + }.boxed() } #[sortable] @@ -221,33 +223,32 @@ fn download_chunk( param: Value, _info: &ApiMethod, rpcenv: Box, -) -> Result { +) -> ApiFuture { - let env: &ReaderEnvironment = rpcenv.as_ref(); + async move { + let env: &ReaderEnvironment = rpcenv.as_ref(); - let digest_str = tools::required_string_param(¶m, "digest")?; - let digest = proxmox::tools::hex_to_digest(digest_str)?; + let digest_str = tools::required_string_param(¶m, "digest")?; + let digest = proxmox::tools::hex_to_digest(digest_str)?; - let (path, _) = env.datastore.chunk_path(&digest); - let path2 = path.clone(); + let (path, _) = env.datastore.chunk_path(&digest); + let path2 = path.clone(); - env.debug(format!("download chunk {:?}", path)); + env.debug(format!("download chunk {:?}", path)); - let response_future = tokio::fs::read(path) - .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err))) - .and_then(move |data| { - let body = Body::from(data); + let data = tokio::fs::read(path) + .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err))) + .await?; - // fixme: set other headers ? - futures::future::ok( - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/octet-stream") - .body(body) - .unwrap()) - }); + let body = Body::from(data); - Ok(Box::new(response_future)) + // fixme: set other headers ? + Ok(Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/octet-stream") + .body(body) + .unwrap()) + }.boxed() } /* this is too slow @@ -302,7 +303,7 @@ fn speedtest( _param: Value, _info: &ApiMethod, _rpcenv: Box, -) -> Result { +) -> ApiFuture { let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...] @@ -314,5 +315,5 @@ fn speedtest( .body(body) .unwrap(); - Ok(Box::new(future::ok(response))) + future::ok(response).boxed() } diff --git a/src/server/h2service.rs b/src/server/h2service.rs index c8a1f839..5f89f8f6 100644 --- a/src/server/h2service.rs +++ b/src/server/h2service.rs @@ -43,7 +43,7 @@ impl H2Service { let (path, components) = match tools::normalize_uri_path(parts.uri.path()) { Ok((p,c)) => (p, c), - Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))), + Err(err) => return future::err(http_err!(BAD_REQUEST, err.to_string())).boxed(), }; self.debug(format!("{} {}", method, path)); @@ -55,17 +55,17 @@ impl H2Service { match self.router.find_method(&components, method, &mut uri_param) { None => { let err = http_err!(NOT_FOUND, "Path not found.".to_string()); - Box::new(future::ok((formatter.format_error)(err))) + future::ok((formatter.format_error)(err)).boxed() } Some(api_method) => { match api_method.handler { ApiHandler::Sync(_) => { crate::server::rest::handle_sync_api_request( - self.rpcenv.clone(), api_method, formatter, parts, body, uri_param) + self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed() } ApiHandler::Async(_) => { crate::server::rest::handle_async_api_request( - self.rpcenv.clone(), api_method, formatter, parts, body, uri_param) + self.rpcenv.clone(), api_method, formatter, parts, body, uri_param).boxed() } } } diff --git a/src/server/rest.rs b/src/server/rest.rs index 36a4117e..ca146556 100644 --- a/src/server/rest.rs +++ b/src/server/rest.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use failure::*; -use futures::future::{self, Either, FutureExt, TryFutureExt}; +use futures::future::{self, FutureExt, TryFutureExt}; use futures::stream::TryStreamExt; use hyper::header; use hyper::http::request::Parts; @@ -17,7 +17,7 @@ use tokio::fs::File; use url::form_urlencoded; use proxmox::api::http_err; -use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, HttpError}; +use proxmox::api::{ApiHandler, ApiMethod, HttpError}; use proxmox::api::{RpcEnvironment, RpcEnvironmentType}; use proxmox::api::schema::{parse_simple_value, verify_json_object, parse_parameter_strings}; @@ -125,7 +125,7 @@ impl tower_service::Service> for ApiService { let method = req.method().clone(); let peer = self.peer; - Pin::from(handle_request(self.api_config.clone(), req)) + handle_request(self.api_config.clone(), req) .map(move |result| match result { Ok(res) => { log_response(&peer, method, &path, &res); @@ -149,13 +149,13 @@ impl tower_service::Service> for ApiService { } } -fn get_request_parameters_async( +async fn get_request_parameters_async( info: &'static ApiMethod, parts: Parts, req_body: Body, uri_param: HashMap, -) -> Box> + Send> -{ +) -> Result { + let mut is_json = false; if let Some(value) = parts.headers.get(header::CONTENT_TYPE) { @@ -166,13 +166,11 @@ fn get_request_parameters_async( Ok(Some("application/json")) => { is_json = true; } - _ => { - return Box::new(future::err(http_err!(BAD_REQUEST, "unsupported content type".to_string()))); - } + _ => bail!("unsupported content type {:?}", value.to_str()), } } - let resp = req_body + let body = req_body .map_err(|err| http_err!(BAD_REQUEST, format!("Promlems reading request body: {}", err))) .try_fold(Vec::new(), |mut acc, chunk| async move { if acc.len() + chunk.len() < 64*1024 { //fimxe: max request body size? @@ -181,57 +179,55 @@ fn get_request_parameters_async( } else { Err(http_err!(BAD_REQUEST, "Request body too large".to_string())) } - }) - .and_then(move |body| async move { - let utf8 = std::str::from_utf8(&body)?; + }).await?; - let obj_schema = &info.parameters; + let utf8 = std::str::from_utf8(&body) + .map_err(|err| format_err!("Request body not uft8: {}", err))?; - if is_json { - let mut params: Value = serde_json::from_str(utf8)?; - for (k, v) in uri_param { - if let Some((_optional, prop_schema)) = obj_schema.lookup(&k) { - params[&k] = parse_simple_value(&v, prop_schema)?; - } - } - verify_json_object(¶ms, obj_schema)?; - return Ok(params); + let obj_schema = &info.parameters; + + if is_json { + let mut params: Value = serde_json::from_str(utf8)?; + for (k, v) in uri_param { + if let Some((_optional, prop_schema)) = obj_schema.lookup(&k) { + params[&k] = parse_simple_value(&v, prop_schema)?; } + } + verify_json_object(¶ms, obj_schema)?; + return Ok(params); + } - let mut param_list: Vec<(String, String)> = vec![]; + let mut param_list: Vec<(String, String)> = vec![]; - if !utf8.is_empty() { - for (k, v) in form_urlencoded::parse(utf8.as_bytes()).into_owned() { - param_list.push((k, v)); - } - } + if !utf8.is_empty() { + for (k, v) in form_urlencoded::parse(utf8.as_bytes()).into_owned() { + param_list.push((k, v)); + } + } - if let Some(query_str) = parts.uri.query() { - for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() { - if k == "_dc" { continue; } // skip extjs "disable cache" parameter - param_list.push((k, v)); - } - } + if let Some(query_str) = parts.uri.query() { + for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() { + if k == "_dc" { continue; } // skip extjs "disable cache" parameter + param_list.push((k, v)); + } + } - for (k, v) in uri_param { - param_list.push((k.clone(), v.clone())); - } + for (k, v) in uri_param { + param_list.push((k.clone(), v.clone())); + } - let params = parse_parameter_strings(¶m_list, obj_schema, true)?; + let params = parse_parameter_strings(¶m_list, obj_schema, true)?; - Ok(params) - }.boxed()); - - Box::new(resp) + Ok(params) } struct NoLogExtension(); -fn proxy_protected_request( +async fn proxy_protected_request( info: &'static ApiMethod, mut parts: Parts, req_body: Body, -) -> ApiFuture { +) -> Result, Error> { let mut uri_parts = parts.uri.clone().into_parts(); @@ -243,96 +239,77 @@ fn proxy_protected_request( let request = Request::from_parts(parts, req_body); + let reload_timezone = info.reload_timezone; + let resp = hyper::client::Client::new() .request(request) .map_err(Error::from) .map_ok(|mut resp| { resp.extensions_mut().insert(NoLogExtension()); resp - }); + }) + .await?; + if reload_timezone { unsafe { tzset(); } } - let reload_timezone = info.reload_timezone; - Box::new(async move { - let result = resp.await; - if reload_timezone { - unsafe { - tzset(); - } - } - result - }) + Ok(resp) } -pub fn handle_sync_api_request( +pub async fn handle_sync_api_request( mut rpcenv: Env, info: &'static ApiMethod, formatter: &'static OutputFormatter, parts: Parts, req_body: Body, uri_param: HashMap, -) -> ApiFuture -{ +) -> Result, Error> { + let handler = match info.handler { - ApiHandler::Async(_) => { - panic!("fixme"); - } + ApiHandler::Async(_) => bail!("handle_sync_api_request: internal error (called with Async handler)"), ApiHandler::Sync(handler) => handler, }; - - let params = get_request_parameters_async(info, parts, req_body, uri_param); + + let params = get_request_parameters_async(info, parts, req_body, uri_param).await?; let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000); - let resp = Pin::from(params) - .and_then(move |params| { - let mut delay = false; - - let resp = match (handler)(params, info, &mut rpcenv) { - Ok(data) => (formatter.format_data)(data, &rpcenv), - Err(err) => { - if let Some(httperr) = err.downcast_ref::() { - if httperr.code == StatusCode::UNAUTHORIZED { - delay = true; - } - } - (formatter.format_error)(err) + let mut delay = false; + + let resp = match (handler)(params, info, &mut rpcenv) { + Ok(data) => (formatter.format_data)(data, &rpcenv), + Err(err) => { + if let Some(httperr) = err.downcast_ref::() { + if httperr.code == StatusCode::UNAUTHORIZED { + delay = true; } - }; - - if info.reload_timezone { - unsafe { tzset() }; } + (formatter.format_error)(err) + } + }; - if delay { - Either::Left(delayed_response(resp, delay_unauth_time)) - } else { - Either::Right(future::ok(resp)) - } - }) - .or_else(move |err| { - future::ok((formatter.format_error)(err)) - }); + if info.reload_timezone { unsafe { tzset(); } } - Box::new(resp) + if delay { + tokio::timer::delay(delay_unauth_time).await; + } + + Ok(resp) } -pub fn handle_async_api_request( +pub async fn handle_async_api_request( rpcenv: Env, info: &'static ApiMethod, formatter: &'static OutputFormatter, parts: Parts, req_body: Body, uri_param: HashMap, -) -> ApiFuture -{ +) -> Result, Error> { + let handler = match info.handler { - ApiHandler::Sync(_) => { - panic!("fixme"); - } + ApiHandler::Sync(_) => bail!("handle_async_api_request: internal error (called with Sync handler)"), ApiHandler::Async(handler) => handler, }; - + // fixme: convert parameters to Json let mut param_list: Vec<(String, String)> = vec![]; @@ -350,18 +327,14 @@ pub fn handle_async_api_request( let params = match parse_parameter_strings(¶m_list, &info.parameters, true) { Ok(v) => v, Err(err) => { - let resp = (formatter.format_error)(Error::from(err)); - return Box::new(future::ok(resp)); + return Ok((formatter.format_error)(Error::from(err))); } }; - match (handler)(parts, req_body, params, info, Box::new(rpcenv)) { - Ok(future) => future, - Err(err) => { - let resp = (formatter.format_error)(err); - Box::new(future::ok(resp)) - } - } + + let resp = (handler)(parts, req_body, params, info, Box::new(rpcenv)).await?; + + Ok(resp) } fn get_index(username: Option, token: Option) -> Response { @@ -491,9 +464,9 @@ async fn chuncked_static_file_download(filename: PathBuf) -> Result ApiFuture { +async fn handle_static_file_download(filename: PathBuf) -> Result, Error> { - let response = tokio::fs::metadata(filename.clone()) + tokio::fs::metadata(filename.clone()) .map_err(|err| http_err!(BAD_REQUEST, format!("File access problems: {}", err))) .and_then(|metadata| async move { if metadata.len() < 1024*32 { @@ -501,9 +474,8 @@ fn handle_static_file_download(filename: PathBuf) -> ApiFuture { } else { chuncked_static_file_download(filename).await } - }); - - Box::new(response) + }) + .await } fn extract_auth_data(headers: &http::HeaderMap) -> (Option, Option) { @@ -548,24 +520,12 @@ fn check_auth(method: &hyper::Method, ticket: &Option, token: &Option, - delay_unauth_time: std::time::Instant, -) -> Result, Error> { - tokio::timer::delay(delay_unauth_time).await; - Ok(resp) -} - -pub fn handle_request(api: Arc, req: Request) -> ApiFuture { +pub async fn handle_request(api: Arc, req: Request) -> Result, Error> { let (parts, body) = req.into_parts(); let method = parts.method.clone(); - - let (path, components) = match tools::normalize_uri_path(parts.uri.path()) { - Ok((p,c)) => (p, c), - Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))), - }; + let (path, components) = tools::normalize_uri_path(parts.uri.path())?; let comp_len = components.len(); @@ -580,13 +540,13 @@ pub fn handle_request(api: Arc, req: Request) -> ApiFuture { if comp_len >= 1 && components[0] == "api2" { if comp_len >= 2 { + let format = components[1]; + let formatter = match format { "json" => &JSON_FORMATTER, "extjs" => &EXTJS_FORMATTER, - _ => { - return Box::new(future::err(http_err!(BAD_REQUEST, format!("Unsupported output format '{}'.", format)))); - } + _ => bail!("Unsupported output format '{}'.", format), }; let mut uri_param = HashMap::new(); @@ -605,9 +565,8 @@ pub fn handle_request(api: Arc, req: Request) -> ApiFuture { Err(err) => { // always delay unauthorized calls by 3 seconds (from start of request) let err = http_err!(UNAUTHORIZED, format!("permission check failed - {}", err)); - return Box::new( - delayed_response((formatter.format_error)(err), delay_unauth_time) - ); + tokio::timer::delay(delay_unauth_time).await; + return Ok((formatter.format_error)(err)); } } } @@ -615,29 +574,29 @@ pub fn handle_request(api: Arc, req: Request) -> ApiFuture { match api.find_method(&components[2..], method, &mut uri_param) { None => { let err = http_err!(NOT_FOUND, "Path not found.".to_string()); - return Box::new(future::ok((formatter.format_error)(err))); + return Ok((formatter.format_error)(err)); } Some(api_method) => { if api_method.protected && env_type == RpcEnvironmentType::PUBLIC { - return proxy_protected_request(api_method, parts, body); + return proxy_protected_request(api_method, parts, body).await; } else { match api_method.handler { ApiHandler::Sync(_) => { - return handle_sync_api_request(rpcenv, api_method, formatter, parts, body, uri_param); + return handle_sync_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await; } ApiHandler::Async(_) => { - return handle_async_api_request(rpcenv, api_method, formatter, parts, body, uri_param); + return handle_async_api_request(rpcenv, api_method, formatter, parts, body, uri_param).await; } } } } } } - } else { + } else { // not Auth required for accessing files! if method != hyper::Method::GET { - return Box::new(future::err(http_err!(BAD_REQUEST, "Unsupported method".to_string()))); + bail!("Unsupported HTTP method {}", method); } if comp_len == 0 { @@ -646,20 +605,21 @@ pub fn handle_request(api: Arc, req: Request) -> ApiFuture { match check_auth(&method, &ticket, &token) { Ok(username) => { let new_token = assemble_csrf_prevention_token(csrf_secret(), &username); - return Box::new(future::ok(get_index(Some(username), Some(new_token)))); + return Ok(get_index(Some(username), Some(new_token))); } _ => { - return Box::new(delayed_response(get_index(None, None), delay_unauth_time)); + tokio::timer::delay(delay_unauth_time).await; + return Ok(get_index(None, None)); } } } else { - return Box::new(future::ok(get_index(None, None))); + return Ok(get_index(None, None)); } } else { let filename = api.find_alias(&components); - return handle_static_file_download(filename); + return handle_static_file_download(filename).await; } } - Box::new(future::err(http_err!(NOT_FOUND, "Path not found.".to_string()))) + Err(http_err!(NOT_FOUND, "Path not found.".to_string())) }