2019-05-09 16:01:24 +00:00
|
|
|
use failure::*;
|
2019-05-10 08:25:40 +00:00
|
|
|
use std::sync::{Arc, Mutex};
|
2019-05-08 10:41:58 +00:00
|
|
|
use std::collections::HashMap;
|
|
|
|
|
|
|
|
use serde_json::Value;
|
|
|
|
|
|
|
|
use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
|
|
|
|
use crate::server::WorkerTask;
|
2019-05-09 11:06:09 +00:00
|
|
|
use crate::backup::*;
|
2019-05-09 16:01:24 +00:00
|
|
|
use crate::server::formatter::*;
|
|
|
|
use hyper::{Body, Response};
|
2019-05-08 10:41:58 +00:00
|
|
|
|
2019-05-10 08:25:40 +00:00
|
|
|
struct SharedBackupState {
|
2019-05-15 10:58:55 +00:00
|
|
|
finished: bool,
|
2019-05-10 08:25:40 +00:00
|
|
|
uid_counter: usize,
|
|
|
|
dynamic_writers: HashMap<usize, (u64 /* offset */, DynamicIndexWriter)>,
|
2019-05-20 16:05:10 +00:00
|
|
|
known_chunks: HashMap<[u8;32], u32>,
|
2019-05-10 08:25:40 +00:00
|
|
|
}
|
|
|
|
|
2019-05-15 10:58:55 +00:00
|
|
|
impl SharedBackupState {
|
|
|
|
|
|
|
|
// Raise error if finished flag is set
|
|
|
|
fn ensure_unfinished(&self) -> Result<(), Error> {
|
|
|
|
if self.finished {
|
|
|
|
bail!("backup already marked as finished.");
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get an unique integer ID
|
|
|
|
pub fn next_uid(&mut self) -> usize {
|
|
|
|
self.uid_counter += 1;
|
|
|
|
self.uid_counter
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-05-08 10:41:58 +00:00
|
|
|
/// `RpcEnvironmet` implementation for backup service
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct BackupEnvironment {
|
|
|
|
env_type: RpcEnvironmentType,
|
|
|
|
result_attributes: HashMap<String, Value>,
|
|
|
|
user: String,
|
2019-05-09 16:01:24 +00:00
|
|
|
pub formatter: &'static OutputFormatter,
|
2019-05-09 11:06:09 +00:00
|
|
|
pub worker: Arc<WorkerTask>,
|
|
|
|
pub datastore: Arc<DataStore>,
|
2019-05-10 08:25:40 +00:00
|
|
|
pub backup_dir: BackupDir,
|
2019-05-11 09:21:13 +00:00
|
|
|
pub last_backup: Option<BackupInfo>,
|
2019-05-10 08:25:40 +00:00
|
|
|
state: Arc<Mutex<SharedBackupState>>
|
2019-05-08 10:41:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl BackupEnvironment {
|
2019-05-10 08:25:40 +00:00
|
|
|
pub fn new(
|
|
|
|
env_type: RpcEnvironmentType,
|
|
|
|
user: String,
|
|
|
|
worker: Arc<WorkerTask>,
|
|
|
|
datastore: Arc<DataStore>,
|
|
|
|
backup_dir: BackupDir,
|
|
|
|
) -> Self {
|
|
|
|
|
|
|
|
let state = SharedBackupState {
|
2019-05-15 10:58:55 +00:00
|
|
|
finished: false,
|
2019-05-10 08:25:40 +00:00
|
|
|
uid_counter: 0,
|
|
|
|
dynamic_writers: HashMap::new(),
|
2019-05-20 16:05:10 +00:00
|
|
|
known_chunks: HashMap::new(),
|
2019-05-10 08:25:40 +00:00
|
|
|
};
|
|
|
|
|
2019-05-08 10:41:58 +00:00
|
|
|
Self {
|
|
|
|
result_attributes: HashMap::new(),
|
|
|
|
env_type,
|
|
|
|
user,
|
|
|
|
worker,
|
2019-05-09 11:06:09 +00:00
|
|
|
datastore,
|
2019-05-09 16:01:24 +00:00
|
|
|
formatter: &JSON_FORMATTER,
|
2019-05-10 08:25:40 +00:00
|
|
|
backup_dir,
|
2019-05-11 09:21:13 +00:00
|
|
|
last_backup: None,
|
2019-05-10 08:25:40 +00:00
|
|
|
state: Arc::new(Mutex::new(state)),
|
2019-05-08 10:41:58 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-05-20 16:05:10 +00:00
|
|
|
// Register a Chunk with associated length. A client may only use registered
|
|
|
|
// chunks (we do not trust clients that far ...)
|
|
|
|
pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
|
|
|
|
state.ensure_unfinished()?;
|
|
|
|
|
|
|
|
state.known_chunks.insert(digest, length);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
|
|
|
|
let state = self.state.lock().unwrap();
|
|
|
|
|
|
|
|
match state.known_chunks.get(digest) {
|
|
|
|
Some(len) => Some(*len),
|
|
|
|
None => None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-05-15 10:58:55 +00:00
|
|
|
/// Store the writer with an unique ID
|
|
|
|
pub fn register_dynamic_writer(&self, writer: DynamicIndexWriter) -> Result<usize, Error> {
|
2019-05-10 08:25:40 +00:00
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
|
2019-05-15 10:58:55 +00:00
|
|
|
state.ensure_unfinished()?;
|
|
|
|
|
|
|
|
let uid = state.next_uid();
|
2019-05-10 08:25:40 +00:00
|
|
|
|
|
|
|
state.dynamic_writers.insert(uid, (0, writer));
|
2019-05-15 10:58:55 +00:00
|
|
|
|
|
|
|
Ok(uid)
|
2019-05-10 08:25:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Append chunk to dynamic writer
|
2019-05-20 16:05:10 +00:00
|
|
|
pub fn dynamic_writer_append_chunk(&self, wid: usize, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
|
2019-05-10 08:25:40 +00:00
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
|
2019-05-15 10:58:55 +00:00
|
|
|
state.ensure_unfinished()?;
|
|
|
|
|
2019-05-10 08:25:40 +00:00
|
|
|
let mut data = match state.dynamic_writers.get_mut(&wid) {
|
|
|
|
Some(data) => data,
|
|
|
|
None => bail!("dynamic writer '{}' not registered", wid),
|
|
|
|
};
|
|
|
|
|
2019-05-20 16:05:10 +00:00
|
|
|
data.0 += size as u64;
|
2019-05-10 08:25:40 +00:00
|
|
|
|
|
|
|
data.1.add_chunk(data.0, digest)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-05-15 05:58:05 +00:00
|
|
|
/// Close dynamic writer
|
|
|
|
pub fn dynamic_writer_close(&self, wid: usize) -> Result<(), Error> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
|
2019-05-15 10:58:55 +00:00
|
|
|
state.ensure_unfinished()?;
|
|
|
|
|
2019-05-15 05:58:05 +00:00
|
|
|
let mut data = match state.dynamic_writers.remove(&wid) {
|
|
|
|
Some(data) => data,
|
|
|
|
None => bail!("dynamic writer '{}' not registered", wid),
|
|
|
|
};
|
|
|
|
|
|
|
|
data.1.close()?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-05-15 10:58:55 +00:00
|
|
|
/// Mark backup as finished
|
|
|
|
pub fn finish_backup(&self) -> Result<(), Error> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
// test if all writer are correctly closed
|
|
|
|
|
|
|
|
state.ensure_unfinished()?;
|
|
|
|
|
|
|
|
state.finished = true;
|
|
|
|
|
|
|
|
if state.dynamic_writers.len() != 0 {
|
|
|
|
bail!("found open index writer - unable to finish backup");
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-05-08 10:41:58 +00:00
|
|
|
pub fn log<S: AsRef<str>>(&self, msg: S) {
|
|
|
|
self.worker.log(msg);
|
|
|
|
}
|
2019-05-09 16:01:24 +00:00
|
|
|
|
|
|
|
pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
|
|
|
|
match result {
|
|
|
|
Ok(data) => (self.formatter.format_data)(data, self),
|
|
|
|
Err(err) => (self.formatter.format_error)(err),
|
|
|
|
}
|
|
|
|
}
|
2019-05-15 10:58:55 +00:00
|
|
|
|
|
|
|
/// Raise error if finished flag is not set
|
|
|
|
pub fn ensure_finished(&self) -> Result<(), Error> {
|
|
|
|
let state = self.state.lock().unwrap();
|
|
|
|
if !state.finished {
|
|
|
|
bail!("backup ended but finished flag is not set.");
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Remove complete backup
|
|
|
|
pub fn remove_backup(&self) -> Result<(), Error> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
state.finished = true;
|
|
|
|
|
|
|
|
self.datastore.remove_backup_dir(&self.backup_dir)?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2019-05-08 10:41:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl RpcEnvironment for BackupEnvironment {
|
|
|
|
|
|
|
|
fn set_result_attrib(&mut self, name: &str, value: Value) {
|
|
|
|
self.result_attributes.insert(name.into(), value);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_result_attrib(&self, name: &str) -> Option<&Value> {
|
|
|
|
self.result_attributes.get(name)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn env_type(&self) -> RpcEnvironmentType {
|
|
|
|
self.env_type
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_user(&mut self, _user: Option<String>) {
|
|
|
|
panic!("unable to change user");
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_user(&self) -> Option<String> {
|
|
|
|
Some(self.user.clone())
|
|
|
|
}
|
|
|
|
}
|
2019-05-09 11:06:09 +00:00
|
|
|
|
|
|
|
impl AsRef<BackupEnvironment> for RpcEnvironment {
|
|
|
|
fn as_ref(&self) -> &BackupEnvironment {
|
|
|
|
self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
|
|
|
|
}
|
|
|
|
}
|
2019-05-09 16:01:24 +00:00
|
|
|
impl AsRef<BackupEnvironment> for Box<RpcEnvironment> {
|
|
|
|
fn as_ref(&self) -> &BackupEnvironment {
|
|
|
|
self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
|
|
|
|
}
|
|
|
|
}
|