src/api2/admin/datastore/backup.rs: verify file size and chunk count on close
This commit is contained in:
parent
1179e15842
commit
8bea85b42e
@ -204,8 +204,9 @@ fn create_dynamic_index(
|
|||||||
|
|
||||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||||
|
|
||||||
let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
|
let name = tools::required_string_param(¶m, "archive-name")?.to_owned();
|
||||||
|
|
||||||
|
let mut archive_name = name.clone();
|
||||||
if !archive_name.ends_with(".pxar") {
|
if !archive_name.ends_with(".pxar") {
|
||||||
bail!("wrong archive extension");
|
bail!("wrong archive extension");
|
||||||
} else {
|
} else {
|
||||||
@ -218,7 +219,7 @@ fn create_dynamic_index(
|
|||||||
let chunk_size = 4096*1024; // todo: ??
|
let chunk_size = 4096*1024; // todo: ??
|
||||||
|
|
||||||
let index = env.datastore.create_dynamic_writer(&path, chunk_size)?;
|
let index = env.datastore.create_dynamic_writer(&path, chunk_size)?;
|
||||||
let wid = env.register_dynamic_writer(index)?;
|
let wid = env.register_dynamic_writer(index, name)?;
|
||||||
|
|
||||||
env.log(format!("created new dynamic index {} ({:?})", wid, path));
|
env.log(format!("created new dynamic index {} ({:?})", wid, path));
|
||||||
|
|
||||||
@ -266,6 +267,12 @@ pub fn api_method_close_dynamic_index() -> ApiMethod {
|
|||||||
.minimum(1)
|
.minimum(1)
|
||||||
.maximum(256)
|
.maximum(256)
|
||||||
)
|
)
|
||||||
|
.required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
|
||||||
|
.minimum(1)
|
||||||
|
)
|
||||||
|
.required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
|
||||||
|
.minimum(1)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,10 +283,12 @@ fn close_dynamic_index (
|
|||||||
) -> Result<Value, Error> {
|
) -> Result<Value, Error> {
|
||||||
|
|
||||||
let wid = tools::required_integer_param(¶m, "wid")? as usize;
|
let wid = tools::required_integer_param(¶m, "wid")? as usize;
|
||||||
|
let chunk_count = tools::required_integer_param(¶m, "chunk-count")? as u64;
|
||||||
|
let size = tools::required_integer_param(¶m, "size")? as u64;
|
||||||
|
|
||||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||||
|
|
||||||
env.dynamic_writer_close(wid)?;
|
env.dynamic_writer_close(wid, chunk_count, size)?;
|
||||||
|
|
||||||
env.log(format!("sucessfully closed dynamic index {}", wid));
|
env.log(format!("sucessfully closed dynamic index {}", wid));
|
||||||
|
|
||||||
|
@ -10,10 +10,18 @@ use crate::backup::*;
|
|||||||
use crate::server::formatter::*;
|
use crate::server::formatter::*;
|
||||||
use hyper::{Body, Response};
|
use hyper::{Body, Response};
|
||||||
|
|
||||||
|
|
||||||
|
struct DynamicWriterState {
|
||||||
|
name: String,
|
||||||
|
index: DynamicIndexWriter,
|
||||||
|
offset: u64,
|
||||||
|
chunk_count: u64,
|
||||||
|
}
|
||||||
|
|
||||||
struct SharedBackupState {
|
struct SharedBackupState {
|
||||||
finished: bool,
|
finished: bool,
|
||||||
uid_counter: usize,
|
uid_counter: usize,
|
||||||
dynamic_writers: HashMap<usize, (u64 /* offset */, DynamicIndexWriter)>,
|
dynamic_writers: HashMap<usize, DynamicWriterState>,
|
||||||
known_chunks: HashMap<[u8;32], u32>,
|
known_chunks: HashMap<[u8;32], u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,14 +108,16 @@ impl BackupEnvironment {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Store the writer with an unique ID
|
/// Store the writer with an unique ID
|
||||||
pub fn register_dynamic_writer(&self, writer: DynamicIndexWriter) -> Result<usize, Error> {
|
pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
state.ensure_unfinished()?;
|
state.ensure_unfinished()?;
|
||||||
|
|
||||||
let uid = state.next_uid();
|
let uid = state.next_uid();
|
||||||
|
|
||||||
state.dynamic_writers.insert(uid, (0, writer));
|
state.dynamic_writers.insert(uid, DynamicWriterState {
|
||||||
|
index, name, offset: 0, chunk_count: 0,
|
||||||
|
});
|
||||||
|
|
||||||
Ok(uid)
|
Ok(uid)
|
||||||
}
|
}
|
||||||
@ -123,15 +133,16 @@ impl BackupEnvironment {
|
|||||||
None => bail!("dynamic writer '{}' not registered", wid),
|
None => bail!("dynamic writer '{}' not registered", wid),
|
||||||
};
|
};
|
||||||
|
|
||||||
data.0 += size as u64;
|
data.offset += size as u64;
|
||||||
|
data.chunk_count += 1;
|
||||||
|
|
||||||
data.1.add_chunk(data.0, digest)?;
|
data.index.add_chunk(data.offset, digest)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Close dynamic writer
|
/// Close dynamic writer
|
||||||
pub fn dynamic_writer_close(&self, wid: usize) -> Result<(), Error> {
|
pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
state.ensure_unfinished()?;
|
state.ensure_unfinished()?;
|
||||||
@ -141,7 +152,15 @@ impl BackupEnvironment {
|
|||||||
None => bail!("dynamic writer '{}' not registered", wid),
|
None => bail!("dynamic writer '{}' not registered", wid),
|
||||||
};
|
};
|
||||||
|
|
||||||
data.1.close()?;
|
if data.chunk_count != chunk_count {
|
||||||
|
bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
if data.offset != size {
|
||||||
|
bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
|
||||||
|
}
|
||||||
|
|
||||||
|
data.index.close()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -449,8 +449,13 @@ impl BackupClient {
|
|||||||
.and_then(move |res| {
|
.and_then(move |res| {
|
||||||
let wid = res.as_u64().unwrap();
|
let wid = res.as_u64().unwrap();
|
||||||
Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
|
Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
|
||||||
.and_then(move |_size| {
|
.and_then(move |(chunk_count, size, _speed)| {
|
||||||
h2_4.post("dynamic_close", Some(json!({ "wid": wid })))
|
let param = json!({
|
||||||
|
"wid": wid ,
|
||||||
|
"chunk-count": chunk_count,
|
||||||
|
"size": size,
|
||||||
|
});
|
||||||
|
h2_4.post("dynamic_close", Some(param))
|
||||||
})
|
})
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
})
|
})
|
||||||
@ -529,7 +534,7 @@ impl BackupClient {
|
|||||||
wid: u64,
|
wid: u64,
|
||||||
stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
|
stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
|
||||||
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
) -> impl Future<Item=usize, Error=Error> {
|
) -> impl Future<Item=(usize, usize, usize), Error=Error> {
|
||||||
|
|
||||||
let repeat = std::sync::Arc::new(AtomicUsize::new(0));
|
let repeat = std::sync::Arc::new(AtomicUsize::new(0));
|
||||||
let repeat2 = repeat.clone();
|
let repeat2 = repeat.clone();
|
||||||
@ -593,7 +598,7 @@ impl BackupClient {
|
|||||||
println!("Average chunk size was {} bytes.", stream_len/repeat);
|
println!("Average chunk size was {} bytes.", stream_len/repeat);
|
||||||
println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
|
println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
|
||||||
}
|
}
|
||||||
Ok(speed)
|
Ok((repeat, stream_len, speed))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user