From 417cb0731e1bfdabe6c4d3a172265d1f5c6a7345 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 24 May 2019 10:05:22 +0200 Subject: [PATCH] src/api2/admin/datastore/backup.rs: verify chunk offset This clearly shows that the current approach does not work - seems we get chunks out of order. --- src/api2/admin/datastore/backup.rs | 22 ++++++++++++++----- .../admin/datastore/backup/environment.rs | 7 +++++- .../admin/datastore/backup/upload_chunk.rs | 7 +++++- src/client/http_client.rs | 18 +++++++++------ 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index fc3df43a..a7b57ebe 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -230,13 +230,19 @@ pub fn api_method_dynamic_append() -> ApiMethod { ApiMethod::new( dynamic_append, ObjectSchema::new("Append chunk to dynamic index writer.") + .required("wid", IntegerSchema::new("Dynamic writer ID.") + .minimum(1) + .maximum(256) + ) .required("digest-list", ArraySchema::new( "Chunk digest list.", StringSchema::new("Chunk digest.").into()) ) - .required("wid", IntegerSchema::new("Dynamic writer ID.") - .minimum(1) - .maximum(256) + .required("offset-list", ArraySchema::new( + "Chunk offset list.", + IntegerSchema::new("Corresponding chunk offsets.") + .minimum(0) + .into()) ) ) } @@ -249,17 +255,23 @@ fn dynamic_append ( let wid = tools::required_integer_param(¶m, "wid")? as usize; let digest_list = tools::required_array_param(¶m, "digest-list")?; + let offset_list = tools::required_array_param(¶m, "offset-list")?; println!("DIGEST LIST LEN {}", digest_list.len()); + if offset_list.len() != digest_list.len() { + bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len()); + } + let env: &BackupEnvironment = rpcenv.as_ref(); - for item in digest_list { + for (i, item) in digest_list.iter().enumerate() { let digest_str = item.as_str().unwrap(); let digest = crate::tools::hex_to_digest(digest_str)?; + let offset = offset_list[i].as_u64().unwrap(); let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?; - env.dynamic_writer_append_chunk(wid, size, &digest)?; + env.dynamic_writer_append_chunk(wid, offset, size, &digest)?; env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid)); } diff --git a/src/api2/admin/datastore/backup/environment.rs b/src/api2/admin/datastore/backup/environment.rs index 33597bd2..da5b5f98 100644 --- a/src/api2/admin/datastore/backup/environment.rs +++ b/src/api2/admin/datastore/backup/environment.rs @@ -123,7 +123,7 @@ impl BackupEnvironment { } /// Append chunk to dynamic writer - pub fn dynamic_writer_append_chunk(&self, wid: usize, size: u32, digest: &[u8; 32]) -> Result<(), Error> { + pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; @@ -136,6 +136,11 @@ impl BackupEnvironment { data.offset += size as u64; data.chunk_count += 1; + if data.offset != offset { + bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})", + data.name, data.offset, offset); + } + data.index.add_chunk(data.offset, digest)?; Ok(()) diff --git a/src/api2/admin/datastore/backup/upload_chunk.rs b/src/api2/admin/datastore/backup/upload_chunk.rs index 8e7577fb..9dc4a6c5 100644 --- a/src/api2/admin/datastore/backup/upload_chunk.rs +++ b/src/api2/admin/datastore/backup/upload_chunk.rs @@ -62,6 +62,9 @@ pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod { .minimum(1) .maximum(256) ) + .required("offset", IntegerSchema::new("Chunk offset (end of chunk).") + .minimum(0) + ) .required("size", IntegerSchema::new("Chunk size.") .minimum(1) .maximum(1024*1024*16) @@ -78,8 +81,10 @@ fn upload_dynamic_chunk( ) -> Result { let size = tools::required_integer_param(¶m, "size")? as u32; + let offset = tools::required_integer_param(¶m, "offset")? as u64; let wid = tools::required_integer_param(¶m, "wid")? as usize; + println!("upload_dynamic_chunk: {} bytes, offset {}", size, offset); let env: &BackupEnvironment = rpcenv.as_ref(); @@ -91,7 +96,7 @@ fn upload_dynamic_chunk( let result = result.and_then(|(digest, size)| { env.register_chunk(digest, size)?; - env.dynamic_writer_append_chunk(wid, size, &digest)?; + env.dynamic_writer_append_chunk(wid, offset, size, &digest)?; Ok(json!(tools::digest_to_hex(&digest))) }); diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 7d835944..9879a252 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -461,7 +461,7 @@ impl BackupClient { .and_then(move |res| { let wid = res.as_u64().unwrap(); Self::upload_stream(h2_3, wid, stream, known_chunks.clone()) - .and_then(move |(chunk_count, size, _speed)| { + .and_then(move |(chunk_count, size, _speed)| { let param = json!({ "wid": wid , "chunk-count": chunk_count, @@ -575,22 +575,26 @@ impl BackupClient { match merged_chunk_info { MergedChunkInfo::New(chunk_info) => { - println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&chunk_info.digest), chunk_info.data.len()); - let param = json!({ "wid": wid, "size" : chunk_info.data.len() }); + println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&chunk_info.digest), + chunk_info.data.len(), chunk_info.offset); + let param = json!({ "wid": wid, "offset": chunk_info.offset, "size" : chunk_info.data.len() }); request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); upload_data = Some(chunk_info.data.freeze()); } MergedChunkInfo::Known(chunk_list) => { let mut digest_list = vec![]; - for (_offset, digest) in chunk_list { - //println!("append existing chunk ({} bytes)", chunk_info.data.len()); + let mut offset_list = vec![]; + for (offset, digest) in chunk_list { + println!("append chunk {} (offset {})", tools::digest_to_hex(&digest), offset); digest_list.push(tools::digest_to_hex(&digest)); + offset_list.push(offset); } - println!("append existing chunks ({})", digest_list.len()); - let param = json!({ "wid": wid, "digest-list": digest_list }); + println!("append existing chunks list len ({})", digest_list.len()); + let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }); request = H2Client::request_builder("localhost", "PUT", "dynamic_index", None).unwrap(); request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json")); let param_data = bytes::Bytes::from(param.to_string().as_bytes()); + println!("DATALEN {}", param_data.len()); upload_data = Some(param_data); } }