backup-api: allow to create DynamicIndexWriter and add chunks to to

This commit is contained in:
Dietmar Maurer
2019-05-10 10:25:40 +02:00
parent 35a2d8a6a6
commit f9578f3c79
3 changed files with 139 additions and 31 deletions

View File

@ -1,6 +1,7 @@
use failure::*;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::path::PathBuf;
use serde_json::Value;
@ -10,6 +11,11 @@ use crate::backup::*;
use crate::server::formatter::*;
use hyper::{Body, Response};
struct SharedBackupState {
uid_counter: usize,
dynamic_writers: HashMap<usize, (u64 /* offset */, DynamicIndexWriter)>,
}
/// `RpcEnvironmet` implementation for backup service
#[derive(Clone)]
pub struct BackupEnvironment {
@ -19,10 +25,26 @@ pub struct BackupEnvironment {
pub formatter: &'static OutputFormatter,
pub worker: Arc<WorkerTask>,
pub datastore: Arc<DataStore>,
pub backup_dir: BackupDir,
pub path: PathBuf,
state: Arc<Mutex<SharedBackupState>>
}
impl BackupEnvironment {
pub fn new(env_type: RpcEnvironmentType, user: String, worker: Arc<WorkerTask>, datastore: Arc<DataStore>) -> Self {
pub fn new(
env_type: RpcEnvironmentType,
user: String,
worker: Arc<WorkerTask>,
datastore: Arc<DataStore>,
backup_dir: BackupDir,
path: PathBuf,
) -> Self {
let state = SharedBackupState {
uid_counter: 0,
dynamic_writers: HashMap::new(),
};
Self {
result_attributes: HashMap::new(),
env_type,
@ -30,9 +52,45 @@ impl BackupEnvironment {
worker,
datastore,
formatter: &JSON_FORMATTER,
backup_dir,
path,
state: Arc::new(Mutex::new(state)),
}
}
/// Get an unique integer ID
pub fn next_uid(&self) -> usize {
let mut state = self.state.lock().unwrap();
state.uid_counter += 1;
state.uid_counter
}
/// Store the writer with an unique ID
pub fn register_dynamic_writer(&self, writer: DynamicIndexWriter) -> usize {
let mut state = self.state.lock().unwrap();
state.uid_counter += 1;
let uid = state.uid_counter;
state.dynamic_writers.insert(uid, (0, writer));
uid
}
/// Append chunk to dynamic writer
pub fn dynamic_writer_append_chunk(&self, wid: usize, size: u64, digest: &[u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
let mut data = match state.dynamic_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("dynamic writer '{}' not registered", wid),
};
data.0 += size;
data.1.add_chunk(data.0, digest)?;
Ok(())
}
pub fn log<S: AsRef<str>>(&self, msg: S) {
self.worker.log(msg);
}

View File

@ -28,10 +28,10 @@ impl UploadChunk {
}
impl Future for UploadChunk {
type Item = Value;
type Item = ([u8; 32], u64);
type Error = failure::Error;
fn poll(&mut self) -> Poll<Value, failure::Error> {
fn poll(&mut self) -> Poll<([u8; 32], u64), failure::Error> {
loop {
match try_ready!(self.stream.poll()) {
Some(chunk) => {
@ -41,24 +41,27 @@ impl Future for UploadChunk {
self.chunk.extend_from_slice(&chunk);
}
None => {
if self.chunk.len() != (self.size as usize) {
bail!("uploaded chunk has unexpected size.");
}
let (is_duplicate, digest, _compressed_size) = self.store.insert_chunk(&self.chunk)?;
let (_is_duplicate, digest, _compressed_size) = self.store.insert_chunk(&self.chunk)?;
let result = json!({
"digest": tools::digest_to_hex(&digest),
"duplicate": is_duplicate,
});
return Ok(Async::Ready(result))
return Ok(Async::Ready((digest, self.size)))
}
}
}
}
}
pub fn api_method_upload_chunk() -> ApiAsyncMethod {
pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upload_chunk,
ObjectSchema::new("Upload chunk.")
upload_dynamic_chunk,
ObjectSchema::new("Upload chunk for dynamic index writer (variable sized chunks).")
.required("wid", IntegerSchema::new("Dynamic writer ID.")
.minimum(1)
.maximum(256)
)
.required("size", IntegerSchema::new("Chunk size.")
.minimum(1)
.maximum(1024*1024*16)
@ -66,7 +69,7 @@ pub fn api_method_upload_chunk() -> ApiAsyncMethod {
)
}
fn upload_chunk(
fn upload_dynamic_chunk(
_parts: Parts,
req_body: Body,
param: Value,
@ -75,22 +78,24 @@ fn upload_chunk(
) -> Result<BoxFut, Error> {
let size = tools::required_integer_param(&param, "size")?;
let wid = tools::required_integer_param(&param, "wid")? as usize;
let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), size as u64);
// fixme: do we really need abort here? We alread do that on level above.
let abort_future = env.worker.abort_future().then(|_| Ok(Value::Null));
let resp = upload.select(abort_future)
.and_then(|(result, _)| Ok(result))
.map_err(|(err, _)| err)
.then(move |res| {
let resp = upload
.then(move |result| {
let env: &BackupEnvironment = rpcenv.as_ref();
Ok(env.format_response(res))
let result = result.and_then(|(digest, size)| {
env.dynamic_writer_append_chunk(wid, size, &digest)?;
Ok(json!(tools::digest_to_hex(&digest)))
});
Ok(env.format_response(result))
});
Ok(Box::new(resp))
}