src/api2/admin/datastore/backup.rs: verify chunk offset
This clearly shows that the current approach does not work - seems we get chunks out of order.
This commit is contained in:
parent
34114e2606
commit
417cb0731e
@ -230,13 +230,19 @@ pub fn api_method_dynamic_append() -> ApiMethod {
|
||||
ApiMethod::new(
|
||||
dynamic_append,
|
||||
ObjectSchema::new("Append chunk to dynamic index writer.")
|
||||
.required("wid", IntegerSchema::new("Dynamic writer ID.")
|
||||
.minimum(1)
|
||||
.maximum(256)
|
||||
)
|
||||
.required("digest-list", ArraySchema::new(
|
||||
"Chunk digest list.",
|
||||
StringSchema::new("Chunk digest.").into())
|
||||
)
|
||||
.required("wid", IntegerSchema::new("Dynamic writer ID.")
|
||||
.minimum(1)
|
||||
.maximum(256)
|
||||
.required("offset-list", ArraySchema::new(
|
||||
"Chunk offset list.",
|
||||
IntegerSchema::new("Corresponding chunk offsets.")
|
||||
.minimum(0)
|
||||
.into())
|
||||
)
|
||||
)
|
||||
}
|
||||
@ -249,17 +255,23 @@ fn dynamic_append (
|
||||
|
||||
let wid = tools::required_integer_param(¶m, "wid")? as usize;
|
||||
let digest_list = tools::required_array_param(¶m, "digest-list")?;
|
||||
let offset_list = tools::required_array_param(¶m, "offset-list")?;
|
||||
|
||||
println!("DIGEST LIST LEN {}", digest_list.len());
|
||||
|
||||
if offset_list.len() != digest_list.len() {
|
||||
bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
|
||||
}
|
||||
|
||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||
|
||||
for item in digest_list {
|
||||
for (i, item) in digest_list.iter().enumerate() {
|
||||
let digest_str = item.as_str().unwrap();
|
||||
let digest = crate::tools::hex_to_digest(digest_str)?;
|
||||
let offset = offset_list[i].as_u64().unwrap();
|
||||
let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
|
||||
|
||||
env.dynamic_writer_append_chunk(wid, size, &digest)?;
|
||||
env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
|
||||
|
||||
env.log(format!("sucessfully added chunk {} to dynamic index {}", digest_str, wid));
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ impl BackupEnvironment {
|
||||
}
|
||||
|
||||
/// Append chunk to dynamic writer
|
||||
pub fn dynamic_writer_append_chunk(&self, wid: usize, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
|
||||
pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
state.ensure_unfinished()?;
|
||||
@ -136,6 +136,11 @@ impl BackupEnvironment {
|
||||
data.offset += size as u64;
|
||||
data.chunk_count += 1;
|
||||
|
||||
if data.offset != offset {
|
||||
bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
|
||||
data.name, data.offset, offset);
|
||||
}
|
||||
|
||||
data.index.add_chunk(data.offset, digest)?;
|
||||
|
||||
Ok(())
|
||||
|
@ -62,6 +62,9 @@ pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod {
|
||||
.minimum(1)
|
||||
.maximum(256)
|
||||
)
|
||||
.required("offset", IntegerSchema::new("Chunk offset (end of chunk).")
|
||||
.minimum(0)
|
||||
)
|
||||
.required("size", IntegerSchema::new("Chunk size.")
|
||||
.minimum(1)
|
||||
.maximum(1024*1024*16)
|
||||
@ -78,8 +81,10 @@ fn upload_dynamic_chunk(
|
||||
) -> Result<BoxFut, Error> {
|
||||
|
||||
let size = tools::required_integer_param(¶m, "size")? as u32;
|
||||
let offset = tools::required_integer_param(¶m, "offset")? as u64;
|
||||
let wid = tools::required_integer_param(¶m, "wid")? as usize;
|
||||
|
||||
println!("upload_dynamic_chunk: {} bytes, offset {}", size, offset);
|
||||
|
||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||
|
||||
@ -91,7 +96,7 @@ fn upload_dynamic_chunk(
|
||||
|
||||
let result = result.and_then(|(digest, size)| {
|
||||
env.register_chunk(digest, size)?;
|
||||
env.dynamic_writer_append_chunk(wid, size, &digest)?;
|
||||
env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
|
||||
Ok(json!(tools::digest_to_hex(&digest)))
|
||||
});
|
||||
|
||||
|
@ -461,7 +461,7 @@ impl BackupClient {
|
||||
.and_then(move |res| {
|
||||
let wid = res.as_u64().unwrap();
|
||||
Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
|
||||
.and_then(move |(chunk_count, size, _speed)| {
|
||||
.and_then(move |(chunk_count, size, _speed)| {
|
||||
let param = json!({
|
||||
"wid": wid ,
|
||||
"chunk-count": chunk_count,
|
||||
@ -575,22 +575,26 @@ impl BackupClient {
|
||||
|
||||
match merged_chunk_info {
|
||||
MergedChunkInfo::New(chunk_info) => {
|
||||
println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&chunk_info.digest), chunk_info.data.len());
|
||||
let param = json!({ "wid": wid, "size" : chunk_info.data.len() });
|
||||
println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&chunk_info.digest),
|
||||
chunk_info.data.len(), chunk_info.offset);
|
||||
let param = json!({ "wid": wid, "offset": chunk_info.offset, "size" : chunk_info.data.len() });
|
||||
request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
|
||||
upload_data = Some(chunk_info.data.freeze());
|
||||
}
|
||||
MergedChunkInfo::Known(chunk_list) => {
|
||||
let mut digest_list = vec![];
|
||||
for (_offset, digest) in chunk_list {
|
||||
//println!("append existing chunk ({} bytes)", chunk_info.data.len());
|
||||
let mut offset_list = vec![];
|
||||
for (offset, digest) in chunk_list {
|
||||
println!("append chunk {} (offset {})", tools::digest_to_hex(&digest), offset);
|
||||
digest_list.push(tools::digest_to_hex(&digest));
|
||||
offset_list.push(offset);
|
||||
}
|
||||
println!("append existing chunks ({})", digest_list.len());
|
||||
let param = json!({ "wid": wid, "digest-list": digest_list });
|
||||
println!("append existing chunks list len ({})", digest_list.len());
|
||||
let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
|
||||
request = H2Client::request_builder("localhost", "PUT", "dynamic_index", None).unwrap();
|
||||
request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json"));
|
||||
let param_data = bytes::Bytes::from(param.to_string().as_bytes());
|
||||
println!("DATALEN {}", param_data.len());
|
||||
upload_data = Some(param_data);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user