src/api2/admin/datastore/backup/environment.rs: register/lookup known chunks

This commit is contained in:
Dietmar Maurer 2019-05-20 18:05:10 +02:00
parent 97f03eff13
commit a09c0e38d8
3 changed files with 35 additions and 12 deletions

View File

@ -261,11 +261,10 @@ fn dynamic_append (
let env: &BackupEnvironment = rpcenv.as_ref();
let _size = 0;
let _digest = crate::tools::hex_to_digest(digest_str)?;
let digest = crate::tools::hex_to_digest(digest_str)?;
let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk"))?;
// fixme: lookup digest and chunk size, then add
//env.dynamic_writer_append_chunk(wid, size, &digest)?;
env.dynamic_writer_append_chunk(wid, size, &digest)?;
env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid));

View File

@ -14,6 +14,7 @@ struct SharedBackupState {
finished: bool,
uid_counter: usize,
dynamic_writers: HashMap<usize, (u64 /* offset */, DynamicIndexWriter)>,
known_chunks: HashMap<[u8;32], u32>,
}
impl SharedBackupState {
@ -61,6 +62,7 @@ impl BackupEnvironment {
finished: false,
uid_counter: 0,
dynamic_writers: HashMap::new(),
known_chunks: HashMap::new(),
};
Self {
@ -76,6 +78,27 @@ impl BackupEnvironment {
}
}
// Register a Chunk with associated length. A client may only use registered
// chunks (we do not trust clients that far ...)
pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
state.known_chunks.insert(digest, length);
Ok(())
}
pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
let state = self.state.lock().unwrap();
match state.known_chunks.get(digest) {
Some(len) => Some(*len),
None => None,
}
}
/// Store the writer with an unique ID
pub fn register_dynamic_writer(&self, writer: DynamicIndexWriter) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
@ -90,7 +113,7 @@ impl BackupEnvironment {
}
/// Append chunk to dynamic writer
pub fn dynamic_writer_append_chunk(&self, wid: usize, size: u64, digest: &[u8; 32]) -> Result<(), Error> {
pub fn dynamic_writer_append_chunk(&self, wid: usize, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
@ -100,7 +123,7 @@ impl BackupEnvironment {
None => bail!("dynamic writer '{}' not registered", wid),
};
data.0 += size;
data.0 += size as u64;
data.1.add_chunk(data.0, digest)?;

View File

@ -16,22 +16,22 @@ use super::environment::*;
pub struct UploadChunk {
stream: Body,
store: Arc<DataStore>,
size: u64,
size: u32,
chunk: Vec<u8>,
}
impl UploadChunk {
pub fn new(stream: Body, store: Arc<DataStore>, size: u64) -> Self {
pub fn new(stream: Body, store: Arc<DataStore>, size: u32) -> Self {
Self { stream, store, size, chunk: vec![] }
}
}
impl Future for UploadChunk {
type Item = ([u8; 32], u64);
type Item = ([u8; 32], u32);
type Error = failure::Error;
fn poll(&mut self) -> Poll<([u8; 32], u64), failure::Error> {
fn poll(&mut self) -> Poll<([u8; 32], u32), failure::Error> {
loop {
match try_ready!(self.stream.poll()) {
Some(chunk) => {
@ -77,19 +77,20 @@ fn upload_dynamic_chunk(
rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> {
let size = tools::required_integer_param(&param, "size")?;
let size = tools::required_integer_param(&param, "size")? as u32;
let wid = tools::required_integer_param(&param, "wid")? as usize;
let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), size as u64);
let upload = UploadChunk::new(req_body, env.datastore.clone(), size);
let resp = upload
.then(move |result| {
let env: &BackupEnvironment = rpcenv.as_ref();
let result = result.and_then(|(digest, size)| {
env.register_chunk(digest, size)?;
env.dynamic_writer_append_chunk(wid, size, &digest)?;
Ok(json!(tools::digest_to_hex(&digest)))
});