From f9578f3c799295bd8f6ec170fa4c45fd3f29bd79 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 10 May 2019 10:25:40 +0200 Subject: [PATCH] backup-api: allow to create DynamicIndexWriter and add chunks to to --- src/api2/admin/datastore/backup.rs | 61 +++++++++++++++--- .../admin/datastore/backup/environment.rs | 62 ++++++++++++++++++- .../admin/datastore/backup/upload_chunk.rs | 47 +++++++------- 3 files changed, 139 insertions(+), 31 deletions(-) diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index 8c1a8d49..365a9900 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -8,7 +8,7 @@ use hyper::{Body, Response, StatusCode}; use hyper::http::request::Parts; use chrono::{Local, TimeZone}; -use serde_json::Value; +use serde_json::{json, Value}; use crate::tools; use crate::api_schema::router::*; @@ -52,8 +52,7 @@ fn upgrade_to_backup_protocol( let backup_type = tools::required_string_param(¶m, "backup-type")?; let backup_id = tools::required_string_param(¶m, "backup-id")?; - - let _backup_time = Local.timestamp(Local::now().timestamp(), 0); + let backup_time = Local.timestamp(Local::now().timestamp(), 0); let protocols = parts .headers @@ -74,8 +73,14 @@ fn upgrade_to_backup_protocol( let username = rpcenv.get_user().unwrap(); let env_type = rpcenv.env_type(); + let backup_dir = BackupDir::new(backup_type, backup_id, backup_time.timestamp()); + + let (path, is_new) = datastore.create_backup_dir(&backup_dir)?; + if !is_new { bail!("backup directorty already exists."); } + WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| { - let backup_env = BackupEnvironment::new(env_type, username.clone(), worker.clone(), datastore); + let backup_env = BackupEnvironment::new( + env_type, username.clone(), worker.clone(), datastore, backup_dir, path); let service = BackupService::new(backup_env, worker.clone()); let abort_future = worker.abort_future(); @@ -123,11 +128,15 @@ fn backup_api() -> Router { ) ); - let chunks = Router::new() - .upload(api_method_upload_chunk()); - let router = Router::new() - .subdir("chunks", chunks) + .subdir( + "dynamic_chunk", Router::new() + .upload(api_method_upload_dynamic_chunk()) + ) + .subdir( + "dynamic_index", Router::new() + .post(api_method_create_dynamic_index()) + ) .subdir("test1", test1) .subdir("test2", test2) .list_subdirs(); @@ -135,6 +144,42 @@ fn backup_api() -> Router { router } +pub fn api_method_create_dynamic_index() -> ApiMethod { + ApiMethod::new( + create_dynamic_index, + ObjectSchema::new("Create dynamic chunk index file.") + ) +} + +fn create_dynamic_index( + param: Value, + _info: &ApiMethod, + rpcenv: &mut RpcEnvironment, +) -> Result { + + let env: &BackupEnvironment = rpcenv.as_ref(); + env.log("Inside create_dynamic_index"); + + let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); + + if !archive_name.ends_with(".pxar") { + bail!("wrong archive extension"); + } else { + archive_name.push_str(".didx"); + } + + let mut path = env.path.clone(); + path.push(archive_name); + + let chunk_size = 4096*1024; // todo: ?? + + let index = env.datastore.create_dynamic_writer(path, chunk_size)?; + let uid = env.register_dynamic_writer(index); + + + Ok(json!(uid)) +} + fn test1_get ( _param: Value, _info: &ApiMethod, diff --git a/src/api2/admin/datastore/backup/environment.rs b/src/api2/admin/datastore/backup/environment.rs index 470c44ba..8efa7abe 100644 --- a/src/api2/admin/datastore/backup/environment.rs +++ b/src/api2/admin/datastore/backup/environment.rs @@ -1,6 +1,7 @@ use failure::*; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::collections::HashMap; +use std::path::PathBuf; use serde_json::Value; @@ -10,6 +11,11 @@ use crate::backup::*; use crate::server::formatter::*; use hyper::{Body, Response}; +struct SharedBackupState { + uid_counter: usize, + dynamic_writers: HashMap, +} + /// `RpcEnvironmet` implementation for backup service #[derive(Clone)] pub struct BackupEnvironment { @@ -19,10 +25,26 @@ pub struct BackupEnvironment { pub formatter: &'static OutputFormatter, pub worker: Arc, pub datastore: Arc, + pub backup_dir: BackupDir, + pub path: PathBuf, + state: Arc> } impl BackupEnvironment { - pub fn new(env_type: RpcEnvironmentType, user: String, worker: Arc, datastore: Arc) -> Self { + pub fn new( + env_type: RpcEnvironmentType, + user: String, + worker: Arc, + datastore: Arc, + backup_dir: BackupDir, + path: PathBuf, + ) -> Self { + + let state = SharedBackupState { + uid_counter: 0, + dynamic_writers: HashMap::new(), + }; + Self { result_attributes: HashMap::new(), env_type, @@ -30,9 +52,45 @@ impl BackupEnvironment { worker, datastore, formatter: &JSON_FORMATTER, + backup_dir, + path, + state: Arc::new(Mutex::new(state)), } } + /// Get an unique integer ID + pub fn next_uid(&self) -> usize { + let mut state = self.state.lock().unwrap(); + state.uid_counter += 1; + state.uid_counter + } + + /// Store the writer with an unique ID + pub fn register_dynamic_writer(&self, writer: DynamicIndexWriter) -> usize { + let mut state = self.state.lock().unwrap(); + state.uid_counter += 1; + let uid = state.uid_counter; + + state.dynamic_writers.insert(uid, (0, writer)); + uid + } + + /// Append chunk to dynamic writer + pub fn dynamic_writer_append_chunk(&self, wid: usize, size: u64, digest: &[u8; 32]) -> Result<(), Error> { + let mut state = self.state.lock().unwrap(); + + let mut data = match state.dynamic_writers.get_mut(&wid) { + Some(data) => data, + None => bail!("dynamic writer '{}' not registered", wid), + }; + + data.0 += size; + + data.1.add_chunk(data.0, digest)?; + + Ok(()) + } + pub fn log>(&self, msg: S) { self.worker.log(msg); } diff --git a/src/api2/admin/datastore/backup/upload_chunk.rs b/src/api2/admin/datastore/backup/upload_chunk.rs index 4a6ee7ab..0507809b 100644 --- a/src/api2/admin/datastore/backup/upload_chunk.rs +++ b/src/api2/admin/datastore/backup/upload_chunk.rs @@ -28,10 +28,10 @@ impl UploadChunk { } impl Future for UploadChunk { - type Item = Value; + type Item = ([u8; 32], u64); type Error = failure::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll<([u8; 32], u64), failure::Error> { loop { match try_ready!(self.stream.poll()) { Some(chunk) => { @@ -41,24 +41,27 @@ impl Future for UploadChunk { self.chunk.extend_from_slice(&chunk); } None => { + if self.chunk.len() != (self.size as usize) { + bail!("uploaded chunk has unexpected size."); + } - let (is_duplicate, digest, _compressed_size) = self.store.insert_chunk(&self.chunk)?; + let (_is_duplicate, digest, _compressed_size) = self.store.insert_chunk(&self.chunk)?; - let result = json!({ - "digest": tools::digest_to_hex(&digest), - "duplicate": is_duplicate, - }); - return Ok(Async::Ready(result)) + return Ok(Async::Ready((digest, self.size))) } } } } } -pub fn api_method_upload_chunk() -> ApiAsyncMethod { +pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod { ApiAsyncMethod::new( - upload_chunk, - ObjectSchema::new("Upload chunk.") + upload_dynamic_chunk, + ObjectSchema::new("Upload chunk for dynamic index writer (variable sized chunks).") + .required("wid", IntegerSchema::new("Dynamic writer ID.") + .minimum(1) + .maximum(256) + ) .required("size", IntegerSchema::new("Chunk size.") .minimum(1) .maximum(1024*1024*16) @@ -66,7 +69,7 @@ pub fn api_method_upload_chunk() -> ApiAsyncMethod { ) } -fn upload_chunk( +fn upload_dynamic_chunk( _parts: Parts, req_body: Body, param: Value, @@ -75,22 +78,24 @@ fn upload_chunk( ) -> Result { let size = tools::required_integer_param(¶m, "size")?; + let wid = tools::required_integer_param(¶m, "wid")? as usize; + let env: &BackupEnvironment = rpcenv.as_ref(); let upload = UploadChunk::new(req_body, env.datastore.clone(), size as u64); - // fixme: do we really need abort here? We alread do that on level above. - let abort_future = env.worker.abort_future().then(|_| Ok(Value::Null)); - - let resp = upload.select(abort_future) - .and_then(|(result, _)| Ok(result)) - .map_err(|(err, _)| err) - .then(move |res| { + let resp = upload + .then(move |result| { let env: &BackupEnvironment = rpcenv.as_ref(); - Ok(env.format_response(res)) + + let result = result.and_then(|(digest, size)| { + env.dynamic_writer_append_chunk(wid, size, &digest)?; + Ok(json!(tools::digest_to_hex(&digest))) + }); + + Ok(env.format_response(result)) }); Ok(Box::new(resp)) - }