use failure::*; use std::sync::{Arc, Mutex}; use std::collections::HashMap; use serde_json::Value; use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType}; use crate::server::WorkerTask; use crate::backup::*; use crate::server::formatter::*; use hyper::{Body, Response}; struct DynamicWriterState { name: String, index: DynamicIndexWriter, offset: u64, chunk_count: u64, } struct FixedWriterState { name: String, index: FixedIndexWriter, size: usize, chunk_size: u32, chunk_count: u64, } struct SharedBackupState { finished: bool, uid_counter: usize, dynamic_writers: HashMap, fixed_writers: HashMap, known_chunks: HashMap<[u8;32], u32>, } impl SharedBackupState { // Raise error if finished flag is set fn ensure_unfinished(&self) -> Result<(), Error> { if self.finished { bail!("backup already marked as finished."); } Ok(()) } // Get an unique integer ID pub fn next_uid(&mut self) -> usize { self.uid_counter += 1; self.uid_counter } } /// `RpcEnvironmet` implementation for backup service #[derive(Clone)] pub struct BackupEnvironment { env_type: RpcEnvironmentType, result_attributes: HashMap, user: String, pub formatter: &'static OutputFormatter, pub worker: Arc, pub datastore: Arc, pub backup_dir: BackupDir, pub last_backup: Option, state: Arc> } impl BackupEnvironment { pub fn new( env_type: RpcEnvironmentType, user: String, worker: Arc, datastore: Arc, backup_dir: BackupDir, ) -> Self { let state = SharedBackupState { finished: false, uid_counter: 0, dynamic_writers: HashMap::new(), fixed_writers: HashMap::new(), known_chunks: HashMap::new(), }; Self { result_attributes: HashMap::new(), env_type, user, worker, datastore, formatter: &JSON_FORMATTER, backup_dir, last_backup: None, state: Arc::new(Mutex::new(state)), } } // Register a Chunk with associated length. A client may only use registered // chunks (we do not trust clients that far ...) pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; state.known_chunks.insert(digest, length); Ok(()) } pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option { let state = self.state.lock().unwrap(); match state.known_chunks.get(digest) { Some(len) => Some(*len), None => None, } } /// Store the writer with an unique ID pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let uid = state.next_uid(); state.dynamic_writers.insert(uid, DynamicWriterState { index, name, offset: 0, chunk_count: 0, }); Ok(uid) } /// Store the writer with an unique ID pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let uid = state.next_uid(); state.fixed_writers.insert(uid, FixedWriterState { index, name, chunk_count: 0, size, chunk_size, }); Ok(uid) } /// Append chunk to dynamic writer pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.dynamic_writers.get_mut(&wid) { Some(data) => data, None => bail!("dynamic writer '{}' not registered", wid), }; if data.offset != offset { bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})", data.name, data.offset, offset); } data.offset += size as u64; data.chunk_count += 1; data.index.add_chunk(data.offset, digest)?; Ok(()) } /// Append chunk to fixed writer pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.fixed_writers.get_mut(&wid) { Some(data) => data, None => bail!("fixed writer '{}' not registered", wid), }; data.chunk_count += 1; if size != data.chunk_size { bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size); } let pos = (offset as usize)/(data.chunk_size as usize); data.index.add_digest(pos, digest)?; Ok(()) } /// Close dynamic writer pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.dynamic_writers.remove(&wid) { Some(data) => data, None => bail!("dynamic writer '{}' not registered", wid), }; if data.chunk_count != chunk_count { bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); } if data.offset != size { bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size); } data.index.close()?; Ok(()) } /// Close fixed writer pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.fixed_writers.remove(&wid) { Some(data) => data, None => bail!("fixed writer '{}' not registered", wid), }; if data.chunk_count != chunk_count { bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); } data.index.close()?; Ok(()) } /// Mark backup as finished pub fn finish_backup(&self) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); // test if all writer are correctly closed state.ensure_unfinished()?; state.finished = true; if state.dynamic_writers.len() != 0 { bail!("found open index writer - unable to finish backup"); } Ok(()) } pub fn log>(&self, msg: S) { self.worker.log(msg); } pub fn format_response(&self, result: Result) -> Response { match result { Ok(data) => (self.formatter.format_data)(data, self), Err(err) => (self.formatter.format_error)(err), } } /// Raise error if finished flag is not set pub fn ensure_finished(&self) -> Result<(), Error> { let state = self.state.lock().unwrap(); if !state.finished { bail!("backup ended but finished flag is not set."); } Ok(()) } /// Remove complete backup pub fn remove_backup(&self) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.finished = true; self.datastore.remove_backup_dir(&self.backup_dir)?; Ok(()) } } impl RpcEnvironment for BackupEnvironment { fn set_result_attrib(&mut self, name: &str, value: Value) { self.result_attributes.insert(name.into(), value); } fn get_result_attrib(&self, name: &str) -> Option<&Value> { self.result_attributes.get(name) } fn env_type(&self) -> RpcEnvironmentType { self.env_type } fn set_user(&mut self, _user: Option) { panic!("unable to change user"); } fn get_user(&self) -> Option { Some(self.user.clone()) } } impl AsRef for RpcEnvironment { fn as_ref(&self) -> &BackupEnvironment { self.as_any().downcast_ref::().unwrap() } } impl AsRef for Box { fn as_ref(&self) -> &BackupEnvironment { self.as_any().downcast_ref::().unwrap() } }