src/api2/admin/datastore/backup.rs: implement fixed sized chunk upload api

This commit is contained in:
Dietmar Maurer 2019-05-28 06:18:55 +02:00
parent 29ae5c86a2
commit a42fa400ee
3 changed files with 317 additions and 25 deletions

View File

@ -159,6 +159,16 @@ fn backup_api() -> Router {
"dynamic_close", Router::new()
.post(api_method_close_dynamic_index())
)
.subdir(
"fixed_index", Router::new()
.download(api_method_fixed_chunk_index())
.post(api_method_create_fixed_index())
.put(api_method_fixed_append())
)
.subdir(
"fixed_close", Router::new()
.post(api_method_close_fixed_index())
)
.subdir(
"finish", Router::new()
.post(
@ -177,18 +187,6 @@ fn backup_api() -> Router {
router
}
pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
ApiAsyncMethod::new(
dynamic_chunk_index,
ObjectSchema::new(r###"
Download the dynamic chunk index from the previous backup.
Simply returns an empty list if this is the first backup.
"###
)
.required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
)
}
pub fn api_method_create_dynamic_index() -> ApiMethod {
ApiMethod::new(
create_dynamic_index,
@ -209,7 +207,7 @@ fn create_dynamic_index(
let mut archive_name = name.clone();
if !archive_name.ends_with(".pxar") {
bail!("wrong archive extension");
bail!("wrong archive extension: '{}'", archive_name);
} else {
archive_name.push_str(".didx");
}
@ -227,6 +225,50 @@ fn create_dynamic_index(
Ok(json!(wid))
}
pub fn api_method_create_fixed_index() -> ApiMethod {
ApiMethod::new(
create_fixed_index,
ObjectSchema::new("Create fixed chunk index file.")
.required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
.required("size", IntegerSchema::new("File size.")
.minimum(1)
)
)
}
fn create_fixed_index(
param: Value,
_info: &ApiMethod,
rpcenv: &mut RpcEnvironment,
) -> Result<Value, Error> {
let env: &BackupEnvironment = rpcenv.as_ref();
println!("PARAM: {:?}", param);
let name = tools::required_string_param(&param, "archive-name")?.to_owned();
let size = tools::required_integer_param(&param, "size")? as usize;
let mut archive_name = name.clone();
if !archive_name.ends_with(".img") {
bail!("wrong archive extension: '{}'", archive_name);
} else {
archive_name.push_str(".fidx");
}
let mut path = env.backup_dir.relative_path();
path.push(archive_name);
let chunk_size = 4096*1024; // todo: ??
let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
env.log(format!("created new fixed index {} ({:?})", wid, path));
Ok(json!(wid))
}
pub fn api_method_dynamic_append() -> ApiMethod {
ApiMethod::new(
dynamic_append,
@ -279,6 +321,59 @@ fn dynamic_append (
Ok(Value::Null)
}
pub fn api_method_fixed_append() -> ApiMethod {
ApiMethod::new(
fixed_append,
ObjectSchema::new("Append chunk to fixed index writer.")
.required("wid", IntegerSchema::new("Fixed writer ID.")
.minimum(1)
.maximum(256)
)
.required("digest-list", ArraySchema::new(
"Chunk digest list.",
StringSchema::new("Chunk digest.").into())
)
.required("offset-list", ArraySchema::new(
"Chunk offset list.",
IntegerSchema::new("Corresponding chunk offsets.")
.minimum(0)
.into())
)
)
}
fn fixed_append (
param: Value,
_info: &ApiMethod,
rpcenv: &mut RpcEnvironment,
) -> Result<Value, Error> {
let wid = tools::required_integer_param(&param, "wid")? as usize;
let digest_list = tools::required_array_param(&param, "digest-list")?;
let offset_list = tools::required_array_param(&param, "offset-list")?;
println!("DIGEST LIST LEN {}", digest_list.len());
if offset_list.len() != digest_list.len() {
bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
}
let env: &BackupEnvironment = rpcenv.as_ref();
for (i, item) in digest_list.iter().enumerate() {
let digest_str = item.as_str().unwrap();
let digest = crate::tools::hex_to_digest(digest_str)?;
let offset = offset_list[i].as_u64().unwrap();
let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
println!("DEBUG {} {}", offset, size);
env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
env.log(format!("sucessfully added chunk {} to fixed index {}", digest_str, wid));
}
Ok(Value::Null)
}
pub fn api_method_close_dynamic_index() -> ApiMethod {
ApiMethod::new(
close_dynamic_index,
@ -315,6 +410,41 @@ fn close_dynamic_index (
Ok(Value::Null)
}
pub fn api_method_close_fixed_index() -> ApiMethod {
ApiMethod::new(
close_fixed_index,
ObjectSchema::new("Close fixed index writer.")
.required("wid", IntegerSchema::new("Fixed writer ID.")
.minimum(1)
.maximum(256)
)
.required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
.minimum(1)
)
.required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.")
.minimum(1)
)
)
}
fn close_fixed_index (
param: Value,
_info: &ApiMethod,
rpcenv: &mut RpcEnvironment,
) -> Result<Value, Error> {
let wid = tools::required_integer_param(&param, "wid")? as usize;
let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
let size = tools::required_integer_param(&param, "size")? as u64;
let env: &BackupEnvironment = rpcenv.as_ref();
env.fixed_writer_close(wid, chunk_count, size)?;
env.log(format!("sucessfully closed fixed index {}", wid));
Ok(Value::Null)
}
fn finish_backup (
_param: Value,
@ -329,6 +459,18 @@ fn finish_backup (
Ok(Value::Null)
}
pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod {
ApiAsyncMethod::new(
dynamic_chunk_index,
ObjectSchema::new(r###"
Download the dynamic chunk index from the previous backup.
Simply returns an empty list if this is the first backup.
"###
)
.required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
)
}
fn dynamic_chunk_index(
_parts: Parts,
_req_body: Body,
@ -344,7 +486,7 @@ fn dynamic_chunk_index(
let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
if !archive_name.ends_with(".pxar") {
bail!("wrong archive extension");
bail!("wrong archive extension: '{}'", archive_name);
} else {
archive_name.push_str(".didx");
}
@ -389,3 +531,74 @@ fn dynamic_chunk_index(
Ok(Box::new(future::ok(response)))
}
pub fn api_method_fixed_chunk_index() -> ApiAsyncMethod {
ApiAsyncMethod::new(
fixed_chunk_index,
ObjectSchema::new(r###"
Download the fixed chunk index from the previous backup.
Simply returns an empty list if this is the first backup.
"###
)
.required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
)
}
fn fixed_chunk_index(
_parts: Parts,
_req_body: Body,
param: Value,
_info: &ApiAsyncMethod,
rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> {
let env: &BackupEnvironment = rpcenv.as_ref();
let mut archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
if !archive_name.ends_with(".img") {
bail!("wrong archive extension: '{}'", archive_name);
} else {
archive_name.push_str(".fidx");
}
let empty_response = {
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?
};
let last_backup = match &env.last_backup {
Some(info) => info,
None => return Ok(Box::new(future::ok(empty_response))),
};
let mut path = last_backup.backup_dir.relative_path();
path.push(&archive_name);
let index = match env.datastore.open_fixed_reader(path) {
Ok(index) => index,
Err(_) => {
env.log(format!("there is no last backup for archive '{}'", archive_name));
return Ok(Box::new(future::ok(empty_response)));
}
};
let count = index.index_count();
for pos in 0..count {
let digest = index.index_digest(pos).unwrap();
let size = index.chunk_size as u32;
env.register_chunk(*digest, size)?;
}
let reader = DigestListEncoder::new(Box::new(index));
let stream = WrappedReaderStream::new(reader);
// fixme: set size, content type?
let response = http::Response::builder()
.status(200)
.body(Body::wrap_stream(stream))?;
Ok(Box::new(future::ok(response)))
}

View File

@ -18,10 +18,19 @@ struct DynamicWriterState {
chunk_count: u64,
}
struct FixedWriterState {
name: String,
index: FixedIndexWriter,
size: usize,
chunk_size: u32,
chunk_count: u64,
}
struct SharedBackupState {
finished: bool,
uid_counter: usize,
dynamic_writers: HashMap<usize, DynamicWriterState>,
fixed_writers: HashMap<usize, FixedWriterState>,
known_chunks: HashMap<[u8;32], u32>,
}
@ -70,6 +79,7 @@ impl BackupEnvironment {
finished: false,
uid_counter: 0,
dynamic_writers: HashMap::new(),
fixed_writers: HashMap::new(),
known_chunks: HashMap::new(),
};
@ -122,6 +132,21 @@ impl BackupEnvironment {
Ok(uid)
}
/// Store the writer with an unique ID
pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let uid = state.next_uid();
state.fixed_writers.insert(uid, FixedWriterState {
index, name, chunk_count: 0, size, chunk_size,
});
Ok(uid)
}
/// Append chunk to dynamic writer
pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
@ -146,6 +171,29 @@ impl BackupEnvironment {
Ok(())
}
/// Append chunk to fixed writer
pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.fixed_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
};
data.chunk_count += 1;
if size != data.chunk_size {
bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
}
let pos = (offset as usize)/(data.chunk_size as usize);
data.index.add_digest(pos, digest)?;
Ok(())
}
/// Close dynamic writer
pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
@ -170,6 +218,27 @@ impl BackupEnvironment {
Ok(())
}
/// Close fixed writer
pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.fixed_writers.remove(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
};
if data.chunk_count != chunk_count {
bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
}
data.index.close()?;
Ok(())
}
/// Mark backup as finished
pub fn finish_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();

View File

@ -430,10 +430,12 @@ impl BackupClient {
self.h2.clone().post("finish", None).map(|_| ())
}
pub fn upload_dynamic_stream(
pub fn upload_stream(
&self,
archive_name: &str,
stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
prefix: &str,
fixed_size: Option<u64>,
) -> impl Future<Item=(), Error=Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
@ -452,23 +454,30 @@ impl BackupClient {
let h2_3 = self.h2.clone();
let h2_4 = self.h2.clone();
let param = json!({ "archive-name": archive_name });
let mut param = json!({ "archive-name": archive_name });
if let Some(size) = fixed_size {
param["size"] = size.into();
}
Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone())
let index_path = format!("{}_index", prefix);
let index_path2 = index_path.clone();
let close_path = format!("{}_close", prefix);
Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone())
.and_then(move |_| {
h2_2.post("dynamic_index", Some(param))
h2_2.post(&index_path, Some(param))
})
.and_then(move |res| {
let wid = res.as_u64().unwrap();
Self::upload_stream(h2_3, wid, stream, known_chunks.clone())
Self::upload_chunk_info_stream(h2_3, wid, stream, &index_path2, known_chunks.clone())
.and_then(move |(chunk_count, size, _speed)| {
let param = json!({
"wid": wid ,
"chunk-count": chunk_count,
"size": size,
});
h2_4.post("dynamic_close", Some(param))
})
h2_4.post(&close_path, Some(param))
})
.map(|_| ())
})
}
@ -502,7 +511,7 @@ impl BackupClient {
(verify_queue_tx, verify_result_rx)
}
fn upload_chunk_queue(h2: H2Client, wid: u64) -> (
fn upload_chunk_queue(h2: H2Client, wid: u64, path: String) -> (
mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
sync::oneshot::Receiver<Result<(), Error>>
) {
@ -545,7 +554,7 @@ impl BackupClient {
}
println!("append chunks list len ({})", digest_list.len());
let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list });
let mut request = H2Client::request_builder("localhost", "PUT", "dynamic_index", None).unwrap();
let mut request = H2Client::request_builder("localhost", "PUT", &path, None).unwrap();
request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json"));
let param_data = bytes::Bytes::from(param.to_string().as_bytes());
let upload_data = Some(param_data);
@ -609,10 +618,11 @@ impl BackupClient {
})
}
fn upload_stream(
fn upload_chunk_info_stream(
h2: H2Client,
wid: u64,
stream: impl Stream<Item=ChunkInfo, Error=Error>,
path: &str,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> impl Future<Item=(usize, usize, usize), Error=Error> {
@ -622,7 +632,7 @@ impl BackupClient {
let stream_len = std::sync::Arc::new(AtomicUsize::new(0));
let stream_len2 = stream_len.clone();
let (upload_queue, upload_result) = Self::upload_chunk_queue(h2.clone(), wid);
let (upload_queue, upload_result) = Self::upload_chunk_queue(h2.clone(), wid, path.to_owned());
let start_time = std::time::Instant::now();