From 8bea85b42e5fd4eb8ab842261a9412e941a92dd0 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Thu, 23 May 2019 08:50:36 +0200 Subject: [PATCH] src/api2/admin/datastore/backup.rs: verify file size and chunk count on close --- src/api2/admin/datastore/backup.rs | 15 +++++++-- .../admin/datastore/backup/environment.rs | 33 +++++++++++++++---- src/client/http_client.rs | 13 +++++--- 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index 8ab6cb4e..a00d431b 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -204,8 +204,9 @@ fn create_dynamic_index( let env: &BackupEnvironment = rpcenv.as_ref(); - let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); + let name = tools::required_string_param(¶m, "archive-name")?.to_owned(); + let mut archive_name = name.clone(); if !archive_name.ends_with(".pxar") { bail!("wrong archive extension"); } else { @@ -218,7 +219,7 @@ fn create_dynamic_index( let chunk_size = 4096*1024; // todo: ?? let index = env.datastore.create_dynamic_writer(&path, chunk_size)?; - let wid = env.register_dynamic_writer(index)?; + let wid = env.register_dynamic_writer(index, name)?; env.log(format!("created new dynamic index {} ({:?})", wid, path)); @@ -266,6 +267,12 @@ pub fn api_method_close_dynamic_index() -> ApiMethod { .minimum(1) .maximum(256) ) + .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.") + .minimum(1) + ) + .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.") + .minimum(1) + ) ) } @@ -276,10 +283,12 @@ fn close_dynamic_index ( ) -> Result { let wid = tools::required_integer_param(¶m, "wid")? as usize; + let chunk_count = tools::required_integer_param(¶m, "chunk-count")? as u64; + let size = tools::required_integer_param(¶m, "size")? as u64; let env: &BackupEnvironment = rpcenv.as_ref(); - env.dynamic_writer_close(wid)?; + env.dynamic_writer_close(wid, chunk_count, size)?; env.log(format!("sucessfully closed dynamic index {}", wid)); diff --git a/src/api2/admin/datastore/backup/environment.rs b/src/api2/admin/datastore/backup/environment.rs index 744a27a0..33597bd2 100644 --- a/src/api2/admin/datastore/backup/environment.rs +++ b/src/api2/admin/datastore/backup/environment.rs @@ -10,10 +10,18 @@ use crate::backup::*; use crate::server::formatter::*; use hyper::{Body, Response}; + +struct DynamicWriterState { + name: String, + index: DynamicIndexWriter, + offset: u64, + chunk_count: u64, +} + struct SharedBackupState { finished: bool, uid_counter: usize, - dynamic_writers: HashMap, + dynamic_writers: HashMap, known_chunks: HashMap<[u8;32], u32>, } @@ -100,14 +108,16 @@ impl BackupEnvironment { } /// Store the writer with an unique ID - pub fn register_dynamic_writer(&self, writer: DynamicIndexWriter) -> Result { + pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let uid = state.next_uid(); - state.dynamic_writers.insert(uid, (0, writer)); + state.dynamic_writers.insert(uid, DynamicWriterState { + index, name, offset: 0, chunk_count: 0, + }); Ok(uid) } @@ -123,15 +133,16 @@ impl BackupEnvironment { None => bail!("dynamic writer '{}' not registered", wid), }; - data.0 += size as u64; + data.offset += size as u64; + data.chunk_count += 1; - data.1.add_chunk(data.0, digest)?; + data.index.add_chunk(data.offset, digest)?; Ok(()) } /// Close dynamic writer - pub fn dynamic_writer_close(&self, wid: usize) -> Result<(), Error> { + pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; @@ -141,7 +152,15 @@ impl BackupEnvironment { None => bail!("dynamic writer '{}' not registered", wid), }; - data.1.close()?; + if data.chunk_count != chunk_count { + bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); + } + + if data.offset != size { + bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size); + } + + data.index.close()?; Ok(()) } diff --git a/src/client/http_client.rs b/src/client/http_client.rs index ce16ce76..7855db5e 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -449,8 +449,13 @@ impl BackupClient { .and_then(move |res| { let wid = res.as_u64().unwrap(); Self::upload_stream(h2_3, wid, stream, known_chunks.clone()) - .and_then(move |_size| { - h2_4.post("dynamic_close", Some(json!({ "wid": wid }))) + .and_then(move |(chunk_count, size, _speed)| { + let param = json!({ + "wid": wid , + "chunk-count": chunk_count, + "size": size, + }); + h2_4.post("dynamic_close", Some(param)) }) .map(|_| ()) }) @@ -529,7 +534,7 @@ impl BackupClient { wid: u64, stream: impl Stream, known_chunks: Arc>>, - ) -> impl Future { + ) -> impl Future { let repeat = std::sync::Arc::new(AtomicUsize::new(0)); let repeat2 = repeat.clone(); @@ -593,7 +598,7 @@ impl BackupClient { println!("Average chunk size was {} bytes.", stream_len/repeat); println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); } - Ok(speed) + Ok((repeat, stream_len, speed)) }) }