src/api2.rs: move backup api to /backup

This commit is contained in:
Dietmar Maurer
2019-06-05 07:23:21 +02:00
parent 97eeea3b4a
commit 7773ccc11f
8 changed files with 9 additions and 9 deletions

View File

@ -0,0 +1,452 @@
use failure::*;
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use serde_json::Value;
use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
use crate::server::WorkerTask;
use crate::backup::*;
use crate::server::formatter::*;
use hyper::{Body, Response};
struct UploadStatistic {
count: u64,
size: u64,
compressed_size: u64,
duplicates: u64,
}
impl UploadStatistic {
fn new() -> Self {
Self {
count: 0,
size: 0,
compressed_size: 0,
duplicates: 0,
}
}
}
struct DynamicWriterState {
name: String,
index: DynamicIndexWriter,
offset: u64,
chunk_count: u64,
upload_stat: UploadStatistic,
}
struct FixedWriterState {
name: String,
index: FixedIndexWriter,
size: usize,
chunk_size: u32,
chunk_count: u64,
upload_stat: UploadStatistic,
}
struct SharedBackupState {
finished: bool,
uid_counter: usize,
file_counter: usize, // sucessfully uploaded files
dynamic_writers: HashMap<usize, DynamicWriterState>,
fixed_writers: HashMap<usize, FixedWriterState>,
known_chunks: HashMap<[u8;32], u32>,
}
impl SharedBackupState {
// Raise error if finished flag is set
fn ensure_unfinished(&self) -> Result<(), Error> {
if self.finished {
bail!("backup already marked as finished.");
}
Ok(())
}
// Get an unique integer ID
pub fn next_uid(&mut self) -> usize {
self.uid_counter += 1;
self.uid_counter
}
}
/// `RpcEnvironmet` implementation for backup service
#[derive(Clone)]
pub struct BackupEnvironment {
env_type: RpcEnvironmentType,
result_attributes: HashMap<String, Value>,
user: String,
pub debug: bool,
pub formatter: &'static OutputFormatter,
pub worker: Arc<WorkerTask>,
pub datastore: Arc<DataStore>,
pub backup_dir: BackupDir,
pub last_backup: Option<BackupInfo>,
state: Arc<Mutex<SharedBackupState>>
}
impl BackupEnvironment {
pub fn new(
env_type: RpcEnvironmentType,
user: String,
worker: Arc<WorkerTask>,
datastore: Arc<DataStore>,
backup_dir: BackupDir,
) -> Self {
let state = SharedBackupState {
finished: false,
uid_counter: 0,
file_counter: 0,
dynamic_writers: HashMap::new(),
fixed_writers: HashMap::new(),
known_chunks: HashMap::new(),
};
Self {
result_attributes: HashMap::new(),
env_type,
user,
worker,
datastore,
debug: false,
formatter: &JSON_FORMATTER,
backup_dir,
last_backup: None,
state: Arc::new(Mutex::new(state)),
}
}
/// Register a Chunk with associated length.
///
/// We do not fully trust clients, so a client may only use registered
/// chunks. Please use this method to register chunks from previous backups.
pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
state.known_chunks.insert(digest, length);
Ok(())
}
/// Register fixed length chunks after upload.
///
/// Like `register_chunk()`, but additionally record statistics for
/// the fixed index writer.
pub fn register_fixed_chunk(
&self,
wid: usize,
digest: [u8; 32],
size: u32,
compressed_size: u32,
is_duplicate: bool,
) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.fixed_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
};
if size != data.chunk_size {
bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
}
// record statistics
data.upload_stat.count += 1;
data.upload_stat.size += size as u64;
data.upload_stat.compressed_size += compressed_size as u64;
if is_duplicate { data.upload_stat.duplicates += 1; }
// register chunk
state.known_chunks.insert(digest, size);
Ok(())
}
/// Register dynamic length chunks after upload.
///
/// Like `register_chunk()`, but additionally record statistics for
/// the dynamic index writer.
pub fn register_dynamic_chunk(
&self,
wid: usize,
digest: [u8; 32],
size: u32,
compressed_size: u32,
is_duplicate: bool,
) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.dynamic_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("dynamic writer '{}' not registered", wid),
};
// record statistics
data.upload_stat.count += 1;
data.upload_stat.size += size as u64;
data.upload_stat.compressed_size += compressed_size as u64;
if is_duplicate { data.upload_stat.duplicates += 1; }
// register chunk
state.known_chunks.insert(digest, size);
Ok(())
}
pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
let state = self.state.lock().unwrap();
match state.known_chunks.get(digest) {
Some(len) => Some(*len),
None => None,
}
}
/// Store the writer with an unique ID
pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let uid = state.next_uid();
state.dynamic_writers.insert(uid, DynamicWriterState {
index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
});
Ok(uid)
}
/// Store the writer with an unique ID
pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let uid = state.next_uid();
state.fixed_writers.insert(uid, FixedWriterState {
index, name, chunk_count: 0, size, chunk_size, upload_stat: UploadStatistic::new(),
});
Ok(uid)
}
/// Append chunk to dynamic writer
pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.dynamic_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("dynamic writer '{}' not registered", wid),
};
if data.offset != offset {
bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
data.name, data.offset, offset);
}
data.offset += size as u64;
data.chunk_count += 1;
data.index.add_chunk(data.offset, digest)?;
Ok(())
}
/// Append chunk to fixed writer
pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.fixed_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
};
data.chunk_count += 1;
if size != data.chunk_size {
bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
}
let pos = (offset as usize)/(data.chunk_size as usize);
data.index.add_digest(pos, digest)?;
Ok(())
}
fn log_upload_stat(&self, archive_name: &str, size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
self.log(format!("Upload statistics for '{}'", archive_name));
self.log(format!("Size: {}", size));
self.log(format!("Chunk count: {}", chunk_count));
self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
if upload_stat.size > 0 {
self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
}
}
/// Close dynamic writer
pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.dynamic_writers.remove(&wid) {
Some(data) => data,
None => bail!("dynamic writer '{}' not registered", wid),
};
if data.chunk_count != chunk_count {
bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
}
if data.offset != size {
bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
}
data.index.close()?;
self.log_upload_stat(&data.name, size, chunk_count, &data.upload_stat);
state.file_counter += 1;
Ok(())
}
/// Close fixed writer
pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.fixed_writers.remove(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
};
if data.chunk_count != chunk_count {
bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
}
let expected_count = data.index.index_length();
if chunk_count != (expected_count as u64) {
bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
}
if size != (data.size as u64) {
bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
}
data.index.close()?;
self.log_upload_stat(&data.name, size, chunk_count, &data.upload_stat);
state.file_counter += 1;
Ok(())
}
/// Mark backup as finished
pub fn finish_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
// test if all writer are correctly closed
state.ensure_unfinished()?;
state.finished = true;
if state.dynamic_writers.len() != 0 {
bail!("found open index writer - unable to finish backup");
}
if state.file_counter == 0 {
bail!("backup does not contain valid files (file count == 0)");
}
Ok(())
}
pub fn log<S: AsRef<str>>(&self, msg: S) {
self.worker.log(msg);
}
pub fn debug<S: AsRef<str>>(&self, msg: S) {
if self.debug { self.worker.log(msg); }
}
pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
match result {
Ok(data) => (self.formatter.format_data)(data, self),
Err(err) => (self.formatter.format_error)(err),
}
}
/// Raise error if finished flag is not set
pub fn ensure_finished(&self) -> Result<(), Error> {
let state = self.state.lock().unwrap();
if !state.finished {
bail!("backup ended but finished flag is not set.");
}
Ok(())
}
/// Remove complete backup
pub fn remove_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.finished = true;
self.datastore.remove_backup_dir(&self.backup_dir)?;
Ok(())
}
}
impl RpcEnvironment for BackupEnvironment {
fn set_result_attrib(&mut self, name: &str, value: Value) {
self.result_attributes.insert(name.into(), value);
}
fn get_result_attrib(&self, name: &str) -> Option<&Value> {
self.result_attributes.get(name)
}
fn env_type(&self) -> RpcEnvironmentType {
self.env_type
}
fn set_user(&mut self, _user: Option<String>) {
panic!("unable to change user");
}
fn get_user(&self) -> Option<String> {
Some(self.user.clone())
}
}
impl AsRef<BackupEnvironment> for RpcEnvironment {
fn as_ref(&self) -> &BackupEnvironment {
self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
}
}
impl AsRef<BackupEnvironment> for Box<RpcEnvironment> {
fn as_ref(&self) -> &BackupEnvironment {
self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
}
}

120
src/api2/backup/service.rs Normal file
View File

@ -0,0 +1,120 @@
use failure::*;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Arc;
use futures::*;
use hyper::{Body, Request, Response, StatusCode};
use crate::tools;
use crate::api_schema::router::*;
use crate::server::formatter::*;
use crate::server::WorkerTask;
use super::environment::*;
lazy_static!{
static ref BACKUP_ROUTER: Router = super::backup_api();
}
pub struct BackupService {
rpcenv: BackupEnvironment,
worker: Arc<WorkerTask>,
debug: bool,
}
impl BackupService {
pub fn new(rpcenv: BackupEnvironment, worker: Arc<WorkerTask>, debug: bool) -> Self {
Self { rpcenv, worker, debug }
}
pub fn debug<S: AsRef<str>>(&self, msg: S) {
if self.debug { self.worker.log(msg); }
}
fn handle_request(&self, req: Request<Body>) -> BoxFut {
let (parts, body) = req.into_parts();
let method = parts.method.clone();
let (path, components) = match tools::normalize_uri_path(parts.uri.path()) {
Ok((p,c)) => (p, c),
Err(err) => return Box::new(future::err(http_err!(BAD_REQUEST, err.to_string()))),
};
self.debug(format!("{} {}", method, path));
let mut uri_param = HashMap::new();
match BACKUP_ROUTER.find_method(&components, method, &mut uri_param) {
MethodDefinition::None => {
let err = http_err!(NOT_FOUND, "Path not found.".to_string());
return Box::new(future::ok((self.rpcenv.formatter.format_error)(err)));
}
MethodDefinition::Simple(api_method) => {
return crate::server::rest::handle_sync_api_request(
self.rpcenv.clone(), api_method, self.rpcenv.formatter, parts, body, uri_param);
}
MethodDefinition::Async(async_method) => {
return crate::server::rest::handle_async_api_request(
self.rpcenv.clone(), async_method, self.rpcenv.formatter, parts, body, uri_param);
}
}
}
fn log_response(worker: Arc<WorkerTask>, method: hyper::Method, path: &str, resp: &Response<Body>) {
let status = resp.status();
if !status.is_success() {
let reason = status.canonical_reason().unwrap_or("unknown reason");
let mut message = "request failed";
if let Some(data) = resp.extensions().get::<ErrorMessageExtension>() {
message = &data.0;
}
worker.log(format!("{} {}: {} {}: {}", method.as_str(), path, status.as_str(), reason, message));
}
}
}
impl hyper::service::Service for BackupService {
type ReqBody = Body;
type ResBody = Body;
type Error = Error;
type Future = Box<Future<Item = Response<Body>, Error = Self::Error> + Send>;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
let path = req.uri().path().to_owned();
let method = req.method().clone();
let worker = self.worker.clone();
Box::new(self.handle_request(req).then(move |result| {
match result {
Ok(res) => {
Self::log_response(worker, method, &path, &res);
Ok::<_, Error>(res)
}
Err(err) => {
if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone()));
resp.extensions_mut().insert(ErrorMessageExtension(apierr.message.clone()));
*resp.status_mut() = apierr.code;
Self::log_response(worker, method, &path, &resp);
Ok(resp)
} else {
let mut resp = Response::new(Body::from(err.to_string()));
resp.extensions_mut().insert(ErrorMessageExtension(err.to_string()));
*resp.status_mut() = StatusCode::BAD_REQUEST;
Self::log_response(worker, method, &path, &resp);
Ok(resp)
}
}
}
}))
}
}

View File

@ -0,0 +1,249 @@
use failure::*;
use futures::*;
use std::sync::Arc;
use hyper::http::request::Parts;
use hyper::Body;
use serde_json::{json, Value};
use crate::tools;
use crate::backup::*;
use crate::api_schema::*;
use crate::api_schema::router::*;
use super::environment::*;
pub struct UploadChunk {
stream: Body,
store: Arc<DataStore>,
size: u32,
chunk: Vec<u8>,
}
impl UploadChunk {
pub fn new(stream: Body, store: Arc<DataStore>, size: u32) -> Self {
Self { stream, store, size, chunk: vec![] }
}
}
impl Future for UploadChunk {
type Item = ([u8; 32], u32, u32, bool);
type Error = failure::Error;
fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> {
loop {
match try_ready!(self.stream.poll()) {
Some(chunk) => {
if (self.chunk.len() + chunk.len()) > (self.size as usize) {
bail!("uploaded chunk is larger than announced.");
}
self.chunk.extend_from_slice(&chunk);
}
None => {
if self.chunk.len() != (self.size as usize) {
bail!("uploaded chunk has unexpected size.");
}
let (is_duplicate, digest, compressed_size) = self.store.insert_chunk(&self.chunk)?;
return Ok(Async::Ready((digest, self.size, compressed_size as u32, is_duplicate)))
}
}
}
}
}
pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upload_fixed_chunk,
ObjectSchema::new("Upload a new chunk.")
.required("wid", IntegerSchema::new("Fixed writer ID.")
.minimum(1)
.maximum(256)
)
.required("size", IntegerSchema::new("Chunk size.")
.minimum(1)
.maximum(1024*1024*16)
)
)
}
fn upload_fixed_chunk(
_parts: Parts,
req_body: Body,
param: Value,
_info: &ApiAsyncMethod,
rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> {
let wid = tools::required_integer_param(&param, "wid")? as usize;
let size = tools::required_integer_param(&param, "size")? as u32;
let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), size);
let resp = upload
.then(move |result| {
let env: &BackupEnvironment = rpcenv.as_ref();
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = tools::digest_to_hex(&digest);
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
Ok(json!(digest_str))
});
Ok(env.format_response(result))
});
Ok(Box::new(resp))
}
pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upload_dynamic_chunk,
ObjectSchema::new("Upload a new chunk.")
.required("wid", IntegerSchema::new("Dynamic writer ID.")
.minimum(1)
.maximum(256)
)
.required("size", IntegerSchema::new("Chunk size.")
.minimum(1)
.maximum(1024*1024*16)
)
)
}
fn upload_dynamic_chunk(
_parts: Parts,
req_body: Body,
param: Value,
_info: &ApiAsyncMethod,
rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> {
let wid = tools::required_integer_param(&param, "wid")? as usize;
let size = tools::required_integer_param(&param, "size")? as u32;
let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), size);
let resp = upload
.then(move |result| {
let env: &BackupEnvironment = rpcenv.as_ref();
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = tools::digest_to_hex(&digest);
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
Ok(json!(digest_str))
});
Ok(env.format_response(result))
});
Ok(Box::new(resp))
}
pub fn api_method_upload_speedtest() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upload_speedtest,
ObjectSchema::new("Test uploadf speed.")
)
}
fn upload_speedtest(
_parts: Parts,
req_body: Body,
_param: Value,
_info: &ApiAsyncMethod,
rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> {
let resp = req_body
.map_err(Error::from)
.fold(0, |size: usize, chunk| -> Result<usize, Error> {
let sum = size + chunk.len();
//println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
Ok(sum)
})
.then(move |result| {
match result {
Ok(size) => {
println!("UPLOAD END {} bytes", size);
}
Err(err) => {
println!("Upload error: {}", err);
}
}
let env: &BackupEnvironment = rpcenv.as_ref();
Ok(env.format_response(Ok(Value::Null)))
});
Ok(Box::new(resp))
}
pub fn api_method_upload_config() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upload_config,
ObjectSchema::new("Upload configuration file.")
.required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
.required("size", IntegerSchema::new("File size.")
.minimum(1)
.maximum(1024*1024*16)
)
)
}
fn upload_config(
_parts: Parts,
req_body: Body,
param: Value,
_info: &ApiAsyncMethod,
rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> {
let mut file_name = tools::required_string_param(&param, "file-name")?.to_owned();
let size = tools::required_integer_param(&param, "size")? as usize;
if !file_name.ends_with(".conf") {
bail!("wrong config file extension: '{}'", file_name);
} else {
file_name.push_str(".zstd");
}
let env: &BackupEnvironment = rpcenv.as_ref();
let mut path = env.datastore.base_path();
path.push(env.backup_dir.relative_path());
path.push(&file_name);
let env2 = env.clone();
let env3 = env.clone();
let resp = req_body
.map_err(Error::from)
.concat2()
.and_then(move |data| {
if size != data.len() {
bail!("got configuration file with unexpected length ({} != {})", size, data.len());
}
let data = zstd::block::compress(&data, 0)?;
tools::file_set_contents(&path, &data, None)?;
env2.debug(format!("upload config {:?} ({} bytes, comp: {})", path, size, data.len()));
Ok(())
})
.and_then(move |_| {
Ok(env3.format_response(Ok(Value::Null)))
})
;
Ok(Box::new(resp))
}