diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index 996a721f..67e726f8 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -89,11 +89,11 @@ fn upgrade_to_backup_protocol( env.log(format!("starting new backup on datastore '{}': {:?}", store, path)); - let service = BackupService::new(env, worker.clone()); + let service = BackupService::new(env.clone(), worker.clone()); let abort_future = worker.abort_future(); - let worker2 = worker.clone(); + let env2 = env.clone(); req_body .on_upgrade() @@ -108,12 +108,20 @@ fn upgrade_to_backup_protocol( .map_err(Error::from) }) .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) + .map_err(|(err, _)| err) .and_then(move |(_result, _)| { - worker2.log("backup finished sucessfully"); + env.ensure_finished()?; + env.log("backup finished sucessfully"); Ok(()) }) - .map_err(|(err, _)| { - err + .then(move |result| { + if let Err(err) = result { + env2.log(format!("backup failed: {}", err)); + env2.log("removing failed backup"); + env2.remove_backup()?; + return Err(err); + } + Ok(()) }) })?; @@ -157,6 +165,15 @@ fn backup_api() -> Router { "dynamic_close", Router::new() .post(api_method_close_dynamic_index()) ) + .subdir( + "finish", Router::new() + .get( + ApiMethod::new( + finish_backup, + ObjectSchema::new("Mark backup as finished.") + ) + ) + ) .subdir("test1", test1) .subdir("test2", test2) .list_subdirs(); @@ -191,7 +208,6 @@ fn create_dynamic_index( ) -> Result { let env: &BackupEnvironment = rpcenv.as_ref(); - env.log("Inside create_dynamic_index"); let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); @@ -207,11 +223,10 @@ fn create_dynamic_index( let chunk_size = 4096*1024; // todo: ?? let index = env.datastore.create_dynamic_writer(&path, chunk_size)?; - let wid = env.register_dynamic_writer(index); + let wid = env.register_dynamic_writer(index)?; env.log(format!("created new dynamic index {} ({:?})", wid, path)); - Ok(json!(wid)) } @@ -244,6 +259,18 @@ fn close_dynamic_index ( } +fn finish_backup ( + _param: Value, + _info: &ApiMethod, + rpcenv: &mut RpcEnvironment, +) -> Result { + + let env: &BackupEnvironment = rpcenv.as_ref(); + + env.finish_backup()?; + + Ok(Value::Null) +} fn test1_get ( _param: Value, @@ -268,7 +295,6 @@ fn dynamic_chunk_index( ) -> Result { let env: &BackupEnvironment = rpcenv.as_ref(); - env.log("Inside dynamic_chunk_index"); let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); diff --git a/src/api2/admin/datastore/backup/environment.rs b/src/api2/admin/datastore/backup/environment.rs index 66b954f0..652f23d3 100644 --- a/src/api2/admin/datastore/backup/environment.rs +++ b/src/api2/admin/datastore/backup/environment.rs @@ -11,10 +11,29 @@ use crate::server::formatter::*; use hyper::{Body, Response}; struct SharedBackupState { + finished: bool, uid_counter: usize, dynamic_writers: HashMap, } +impl SharedBackupState { + + // Raise error if finished flag is set + fn ensure_unfinished(&self) -> Result<(), Error> { + if self.finished { + bail!("backup already marked as finished."); + } + Ok(()) + } + + // Get an unique integer ID + pub fn next_uid(&mut self) -> usize { + self.uid_counter += 1; + self.uid_counter + } +} + + /// `RpcEnvironmet` implementation for backup service #[derive(Clone)] pub struct BackupEnvironment { @@ -39,6 +58,7 @@ impl BackupEnvironment { ) -> Self { let state = SharedBackupState { + finished: false, uid_counter: 0, dynamic_writers: HashMap::new(), }; @@ -56,27 +76,25 @@ impl BackupEnvironment { } } - /// Get an unique integer ID - pub fn next_uid(&self) -> usize { - let mut state = self.state.lock().unwrap(); - state.uid_counter += 1; - state.uid_counter - } - /// Store the writer with an unique ID - pub fn register_dynamic_writer(&self, writer: DynamicIndexWriter) -> usize { - let mut state = self.state.lock().unwrap(); - state.uid_counter += 1; - let uid = state.uid_counter; + pub fn register_dynamic_writer(&self, writer: DynamicIndexWriter) -> Result { + let mut state = self.state.lock().unwrap(); + + state.ensure_unfinished()?; + + let uid = state.next_uid(); state.dynamic_writers.insert(uid, (0, writer)); - uid + + Ok(uid) } /// Append chunk to dynamic writer pub fn dynamic_writer_append_chunk(&self, wid: usize, size: u64, digest: &[u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); + state.ensure_unfinished()?; + let mut data = match state.dynamic_writers.get_mut(&wid) { Some(data) => data, None => bail!("dynamic writer '{}' not registered", wid), @@ -93,6 +111,8 @@ impl BackupEnvironment { pub fn dynamic_writer_close(&self, wid: usize) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); + state.ensure_unfinished()?; + let mut data = match state.dynamic_writers.remove(&wid) { Some(data) => data, None => bail!("dynamic writer '{}' not registered", wid), @@ -103,6 +123,22 @@ impl BackupEnvironment { Ok(()) } + /// Mark backup as finished + pub fn finish_backup(&self) -> Result<(), Error> { + let mut state = self.state.lock().unwrap(); + // test if all writer are correctly closed + + state.ensure_unfinished()?; + + state.finished = true; + + if state.dynamic_writers.len() != 0 { + bail!("found open index writer - unable to finish backup"); + } + + Ok(()) + } + pub fn log>(&self, msg: S) { self.worker.log(msg); } @@ -113,6 +149,25 @@ impl BackupEnvironment { Err(err) => (self.formatter.format_error)(err), } } + + /// Raise error if finished flag is not set + pub fn ensure_finished(&self) -> Result<(), Error> { + let state = self.state.lock().unwrap(); + if !state.finished { + bail!("backup ended but finished flag is not set."); + } + Ok(()) + } + + /// Remove complete backup + pub fn remove_backup(&self) -> Result<(), Error> { + let mut state = self.state.lock().unwrap(); + state.finished = true; + + self.datastore.remove_backup_dir(&self.backup_dir)?; + + Ok(()) + } } impl RpcEnvironment for BackupEnvironment {