use failure::*; use lazy_static::lazy_static; use std::sync::Arc; use futures::*; use hyper::header::{self, HeaderValue, UPGRADE}; use hyper::{Body, Response, StatusCode}; use hyper::http::request::Parts; //use chrono::{Local, TimeZone}; use serde_json::Value; use crate::tools; use crate::api_schema::router::*; use crate::api_schema::*; use crate::server::{WorkerTask, H2Service}; use crate::backup::*; use crate::api2::types::*; mod environment; use environment::*; pub fn router() -> Router { Router::new() .upgrade(api_method_upgrade_backup()) } pub fn api_method_upgrade_backup() -> ApiAsyncMethod { ApiAsyncMethod::new( upgrade_to_backup_reader_protocol, ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "').")) .required("store", StringSchema::new("Datastore name.")) .required("backup-type", StringSchema::new("Backup type.") .format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"])))) .required("backup-id", StringSchema::new("Backup ID.")) .required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)") .minimum(1_547_797_308)) .optional("debug", BooleanSchema::new("Enable verbose debug logging.")) ) } fn upgrade_to_backup_reader_protocol( parts: Parts, req_body: Body, param: Value, _info: &ApiAsyncMethod, rpcenv: Box, ) -> Result { let debug = param["debug"].as_bool().unwrap_or(false); let store = tools::required_string_param(¶m, "store")?.to_owned(); let datastore = DataStore::lookup_datastore(&store)?; let backup_type = tools::required_string_param(¶m, "backup-type")?; let backup_id = tools::required_string_param(¶m, "backup-id")?; let backup_time = tools::required_integer_param(¶m, "backup-time")?; let protocols = parts .headers .get("UPGRADE") .ok_or_else(|| format_err!("missing Upgrade header"))? .to_str()?; if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() { bail!("invalid protocol name"); } if parts.version >= http::version::Version::HTTP_2 { bail!("unexpected http version '{:?}' (expected version < 2)", parts.version); } let username = rpcenv.get_user().unwrap(); let env_type = rpcenv.env_type(); let backup_dir = BackupDir::new(backup_type, backup_id, backup_time); let path = datastore.base_path(); //let files = BackupInfo::list_files(&path, &backup_dir)?; let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp()); WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| { let mut env = ReaderEnvironment::new( env_type, username.clone(), worker.clone(), datastore, backup_dir); env.debug = debug; env.log(format!("starting new backup reader datastore '{}': {:?}", store, path)); let service = H2Service::new(env.clone(), worker.clone(), &READER_ROUTER, debug); let abort_future = worker.abort_future(); let req_fut = req_body .on_upgrade() .map_err(Error::from) .and_then({ let env = env.clone(); move |conn| { env.debug("protocol upgrade done"); let mut http = hyper::server::conn::Http::new(); http.http2_only(true); // increase window size: todo - find optiomal size let window_size = 32*1024*1024; // max = (1 << 31) - 2 http.http2_initial_stream_window_size(window_size); http.http2_initial_connection_window_size(window_size); http.serve_connection(conn, service) .map_err(Error::from) } }); let abort_future = abort_future .map(|_| Err(format_err!("task aborted"))); use futures::future::Either; futures::future::select(req_fut, abort_future) .map(|res| match res { Either::Left((Ok(res), _)) => Ok(res), Either::Left((Err(err), _)) => Err(err), Either::Right((Ok(res), _)) => Ok(res), Either::Right((Err(err), _)) => Err(err), }) .map_ok(move |_| env.log("reader finished sucessfully")) })?; let response = Response::builder() .status(StatusCode::SWITCHING_PROTOCOLS) .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!())) .body(Body::empty())?; Ok(Box::new(futures::future::ok(response))) } lazy_static!{ static ref READER_ROUTER: Router = reader_api(); } pub fn reader_api() -> Router { Router::new() .subdir( "chunk", Router::new() .download(api_method_download_chunk()) ) .subdir( "download", Router::new() .download(api_method_download_file()) ) .subdir( "speedtest", Router::new() .download(api_method_speedtest()) ) } pub fn api_method_download_file() -> ApiAsyncMethod { ApiAsyncMethod::new( download_file, ObjectSchema::new("Download specified file.") .required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) ) } fn download_file( _parts: Parts, _req_body: Body, param: Value, _info: &ApiAsyncMethod, rpcenv: Box, ) -> Result { let env: &ReaderEnvironment = rpcenv.as_ref(); let env2 = env.clone(); let file_name = tools::required_string_param(¶m, "file-name")?.to_owned(); let mut path = env.datastore.base_path(); path.push(env.backup_dir.relative_path()); path.push(&file_name); let path2 = path.clone(); let path3 = path.clone(); let response_future = tokio::fs::File::open(path) .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) .and_then(move |file| { env2.log(format!("download {:?}", path3)); let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); let body = Body::wrap_stream(payload); // fixme: set other headers ? futures::future::ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/octet-stream") .body(body) .unwrap()) }); Ok(Box::new(response_future)) } pub fn api_method_download_chunk() -> ApiAsyncMethod { ApiAsyncMethod::new( download_chunk, ObjectSchema::new("Download specified chunk.") .required("digest", CHUNK_DIGEST_SCHEMA.clone()) ) } fn download_chunk( _parts: Parts, _req_body: Body, param: Value, _info: &ApiAsyncMethod, rpcenv: Box, ) -> Result { let env: &ReaderEnvironment = rpcenv.as_ref(); let digest_str = tools::required_string_param(¶m, "digest")?; let digest = proxmox::tools::hex_to_digest(digest_str)?; let (path, _) = env.datastore.chunk_path(&digest); let path2 = path.clone(); env.debug(format!("download chunk {:?}", path)); let response_future = tokio::fs::read(path) .map_err(move |err| http_err!(BAD_REQUEST, format!("reading file {:?} failed: {}", path2, err))) .and_then(move |data| { let body = Body::from(data); // fixme: set other headers ? futures::future::ok( Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/octet-stream") .body(body) .unwrap()) }); Ok(Box::new(response_future)) } /* this is too slow fn download_chunk_old( _parts: Parts, _req_body: Body, param: Value, _info: &ApiAsyncMethod, rpcenv: Box, ) -> Result { let env: &ReaderEnvironment = rpcenv.as_ref(); let env2 = env.clone(); let digest_str = tools::required_string_param(¶m, "digest")?; let digest = proxmox::tools::hex_to_digest(digest_str)?; let (path, _) = env.datastore.chunk_path(&digest); let path2 = path.clone(); let path3 = path.clone(); let response_future = tokio::fs::File::open(path) .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) .and_then(move |file| { env2.debug(format!("download chunk {:?}", path3)); let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); let body = Body::wrap_stream(payload); // fixme: set other headers ? futures::future::ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/octet-stream") .body(body) .unwrap()) }); Ok(Box::new(response_future)) } */ pub fn api_method_speedtest() -> ApiAsyncMethod { ApiAsyncMethod::new( speedtest, ObjectSchema::new("Test 4M block download speed.") ) } fn speedtest( _parts: Parts, _req_body: Body, _param: Value, _info: &ApiAsyncMethod, _rpcenv: Box, ) -> Result { let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...] let body = Body::from(buffer); let response = Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/octet-stream") .body(body) .unwrap(); Ok(Box::new(future::ok(response))) }