diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index e2339571..2bf71604 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -14,6 +14,7 @@ use crate::tools; use crate::api_schema::router::*; use crate::api_schema::*; use crate::server::WorkerTask; +use crate::backup::*; mod environment; use environment::*; @@ -21,6 +22,10 @@ use environment::*; mod service; use service::*; +mod upload_chunk; +use upload_chunk::*; + + pub fn api_method_upgrade_backup() -> ApiAsyncMethod { ApiAsyncMethod::new( upgrade_to_backup_protocol, @@ -43,6 +48,8 @@ fn upgrade_to_backup_protocol( static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2"; let store = tools::required_string_param(¶m, "store")?; + let datastore = DataStore::lookup_datastore(store)?; + let backup_type = tools::required_string_param(¶m, "backup-type")?; let backup_id = tools::required_string_param(¶m, "backup-id")?; @@ -68,7 +75,7 @@ fn upgrade_to_backup_protocol( let env_type = rpcenv.env_type(); WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| { - let backup_env = BackupEnvironment::new(env_type, username.clone(), worker.clone()); + let backup_env = BackupEnvironment::new(env_type, username.clone(), worker.clone(), datastore); let service = BackupService::new(backup_env, worker.clone()); let abort_future = worker.abort_future(); @@ -116,7 +123,11 @@ fn backup_api() -> Router { ) ); + let chunks = Router::new() + .upload(api_method_upload_chunk()); + let router = Router::new() + .subdir("chunks", chunks) .subdir("test1", test1) .subdir("test2", test2) .list_subdirs(); @@ -124,10 +135,6 @@ fn backup_api() -> Router { router } -fn get_backup_environment(rpcenv: &mut RpcEnvironment) -> &BackupEnvironment { - rpcenv.as_any().downcast_ref::().unwrap() -} - fn test1_get ( _param: Value, _info: &ApiMethod, @@ -136,7 +143,7 @@ fn test1_get ( println!("TYPEID {:?}", (*rpcenv).type_id()); - let env = get_backup_environment(rpcenv); + let env: &BackupEnvironment = rpcenv.as_ref(); env.log("Inside test1_get()"); diff --git a/src/api2/admin/datastore/backup/environment.rs b/src/api2/admin/datastore/backup/environment.rs index 0c22e3d3..d7f735d0 100644 --- a/src/api2/admin/datastore/backup/environment.rs +++ b/src/api2/admin/datastore/backup/environment.rs @@ -5,6 +5,7 @@ use serde_json::Value; use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType}; use crate::server::WorkerTask; +use crate::backup::*; /// `RpcEnvironmet` implementation for backup service #[derive(Clone)] @@ -12,17 +13,18 @@ pub struct BackupEnvironment { env_type: RpcEnvironmentType, result_attributes: HashMap, user: String, - worker: Arc, - + pub worker: Arc, + pub datastore: Arc, } impl BackupEnvironment { - pub fn new(env_type: RpcEnvironmentType, user: String, worker: Arc) -> Self { + pub fn new(env_type: RpcEnvironmentType, user: String, worker: Arc, datastore: Arc) -> Self { Self { result_attributes: HashMap::new(), env_type, user, worker, + datastore, } } @@ -53,3 +55,9 @@ impl RpcEnvironment for BackupEnvironment { Some(self.user.clone()) } } + +impl AsRef for RpcEnvironment { + fn as_ref(&self) -> &BackupEnvironment { + self.as_any().downcast_ref::().unwrap() + } +} diff --git a/src/api2/admin/datastore/backup/upload_chunk.rs b/src/api2/admin/datastore/backup/upload_chunk.rs new file mode 100644 index 00000000..d7bdf048 --- /dev/null +++ b/src/api2/admin/datastore/backup/upload_chunk.rs @@ -0,0 +1,97 @@ +use failure::*; +use futures::*; +use std::sync::Arc; + +use hyper::http::request::Parts; +use hyper::Body; +use serde_json::{json, Value}; + +use crate::tools; +use crate::backup::*; +use crate::api_schema::*; +use crate::api_schema::router::*; + +use super::environment::*; + +pub struct UploadChunk { + stream: Body, + store: Arc, + size: u64, + chunk: Vec, +} + +impl UploadChunk { + + pub fn new(stream: Body, store: Arc, size: u64) -> Self { + Self { stream, store, size, chunk: vec![] } + } +} + +impl Future for UploadChunk { + type Item = Value; + type Error = failure::Error; + + fn poll(&mut self) -> Poll { + loop { + match try_ready!(self.stream.poll()) { + Some(chunk) => { + if (self.chunk.len() + chunk.len()) > (self.size as usize) { + bail!("uploaded chunk is larger than announced."); + } + self.chunk.extend_from_slice(&chunk); + } + None => { + + let (is_duplicate, digest, _compressed_size) = self.store.insert_chunk(&self.chunk)?; + + let result = json!({ + "digest": tools::digest_to_hex(&digest), + "duplicate": is_duplicate, + }); + return Ok(Async::Ready(result)) + } + } + } + } +} + +pub fn api_method_upload_chunk() -> ApiAsyncMethod { + ApiAsyncMethod::new( + upload_chunk, + ObjectSchema::new("Upload chunk.") + .required("size", IntegerSchema::new("Chunk size.") + .minimum(1) + .maximum(1024*16) + ) + ) +} + +fn upload_chunk( + _parts: Parts, + req_body: Body, + param: Value, + _info: &ApiAsyncMethod, + rpcenv: &mut RpcEnvironment, +) -> Result { + + let size = tools::required_integer_param(¶m, "size")?; + + let env: &BackupEnvironment = rpcenv.as_ref(); + + let upload = UploadChunk::new(req_body, env.datastore.clone(), size as u64); + + let abort_future = env.worker.abort_future().then(|_| Ok(Value::Null)); + + let resp = upload.select(abort_future) + .then(move |result| { + use crate::server::formatter::*; + match result { + Ok((result,_)) => Ok(json_response(result)), + Err((err, _)) => Ok(json_format_error(err)), + } + }); + + + Ok(Box::new(resp)) + +} diff --git a/src/server/formatter.rs b/src/server/formatter.rs index b7ad5a65..92e6c45f 100644 --- a/src/server/formatter.rs +++ b/src/server/formatter.rs @@ -19,7 +19,7 @@ pub struct OutputFormatter { static JSON_CONTENT_TYPE: &str = "application/json;charset=UTF-8"; -fn json_response(result: Value) -> Response { +pub fn json_response(result: Value) -> Response { let json_str = result.to_string(); @@ -50,7 +50,7 @@ fn json_format_result(data: Value, rpcenv: &RpcEnvironment) -> Response { json_response(result) } -fn json_format_error(err: Error) -> Response { +pub fn json_format_error(err: Error) -> Response { let mut response = if let Some(apierr) = err.downcast_ref::() { let mut resp = Response::new(Body::from(apierr.message.clone()));