From a42fa400eec76fb29e0ae666885239c54bdf30eb Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Tue, 28 May 2019 06:18:55 +0200 Subject: [PATCH] src/api2/admin/datastore/backup.rs: implement fixed sized chunk upload api --- src/api2/admin/datastore/backup.rs | 241 +++++++++++++++++- .../admin/datastore/backup/environment.rs | 69 +++++ src/client/http_client.rs | 32 ++- 3 files changed, 317 insertions(+), 25 deletions(-) diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index 5c610245..9f2ea707 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -159,6 +159,16 @@ fn backup_api() -> Router { "dynamic_close", Router::new() .post(api_method_close_dynamic_index()) ) + .subdir( + "fixed_index", Router::new() + .download(api_method_fixed_chunk_index()) + .post(api_method_create_fixed_index()) + .put(api_method_fixed_append()) + ) + .subdir( + "fixed_close", Router::new() + .post(api_method_close_fixed_index()) + ) .subdir( "finish", Router::new() .post( @@ -177,18 +187,6 @@ fn backup_api() -> Router { router } -pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod { - ApiAsyncMethod::new( - dynamic_chunk_index, - ObjectSchema::new(r###" -Download the dynamic chunk index from the previous backup. -Simply returns an empty list if this is the first backup. -"### - ) - .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) - ) -} - pub fn api_method_create_dynamic_index() -> ApiMethod { ApiMethod::new( create_dynamic_index, @@ -209,7 +207,7 @@ fn create_dynamic_index( let mut archive_name = name.clone(); if !archive_name.ends_with(".pxar") { - bail!("wrong archive extension"); + bail!("wrong archive extension: '{}'", archive_name); } else { archive_name.push_str(".didx"); } @@ -227,6 +225,50 @@ fn create_dynamic_index( Ok(json!(wid)) } +pub fn api_method_create_fixed_index() -> ApiMethod { + ApiMethod::new( + create_fixed_index, + ObjectSchema::new("Create fixed chunk index file.") + .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) + .required("size", IntegerSchema::new("File size.") + .minimum(1) + ) + ) +} + +fn create_fixed_index( + param: Value, + _info: &ApiMethod, + rpcenv: &mut RpcEnvironment, +) -> Result { + + let env: &BackupEnvironment = rpcenv.as_ref(); + + println!("PARAM: {:?}", param); + + let name = tools::required_string_param(¶m, "archive-name")?.to_owned(); + let size = tools::required_integer_param(¶m, "size")? as usize; + + let mut archive_name = name.clone(); + if !archive_name.ends_with(".img") { + bail!("wrong archive extension: '{}'", archive_name); + } else { + archive_name.push_str(".fidx"); + } + + let mut path = env.backup_dir.relative_path(); + path.push(archive_name); + + let chunk_size = 4096*1024; // todo: ?? + + let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?; + let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?; + + env.log(format!("created new fixed index {} ({:?})", wid, path)); + + Ok(json!(wid)) +} + pub fn api_method_dynamic_append() -> ApiMethod { ApiMethod::new( dynamic_append, @@ -279,6 +321,59 @@ fn dynamic_append ( Ok(Value::Null) } +pub fn api_method_fixed_append() -> ApiMethod { + ApiMethod::new( + fixed_append, + ObjectSchema::new("Append chunk to fixed index writer.") + .required("wid", IntegerSchema::new("Fixed writer ID.") + .minimum(1) + .maximum(256) + ) + .required("digest-list", ArraySchema::new( + "Chunk digest list.", + StringSchema::new("Chunk digest.").into()) + ) + .required("offset-list", ArraySchema::new( + "Chunk offset list.", + IntegerSchema::new("Corresponding chunk offsets.") + .minimum(0) + .into()) + ) + ) +} + +fn fixed_append ( + param: Value, + _info: &ApiMethod, + rpcenv: &mut RpcEnvironment, +) -> Result { + + let wid = tools::required_integer_param(¶m, "wid")? as usize; + let digest_list = tools::required_array_param(¶m, "digest-list")?; + let offset_list = tools::required_array_param(¶m, "offset-list")?; + + println!("DIGEST LIST LEN {}", digest_list.len()); + + if offset_list.len() != digest_list.len() { + bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len()); + } + + let env: &BackupEnvironment = rpcenv.as_ref(); + + for (i, item) in digest_list.iter().enumerate() { + let digest_str = item.as_str().unwrap(); + let digest = crate::tools::hex_to_digest(digest_str)?; + let offset = offset_list[i].as_u64().unwrap(); + let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?; + println!("DEBUG {} {}", offset, size); + env.fixed_writer_append_chunk(wid, offset, size, &digest)?; + + env.log(format!("sucessfully added chunk {} to fixed index {}", digest_str, wid)); + } + + Ok(Value::Null) +} + pub fn api_method_close_dynamic_index() -> ApiMethod { ApiMethod::new( close_dynamic_index, @@ -315,6 +410,41 @@ fn close_dynamic_index ( Ok(Value::Null) } +pub fn api_method_close_fixed_index() -> ApiMethod { + ApiMethod::new( + close_fixed_index, + ObjectSchema::new("Close fixed index writer.") + .required("wid", IntegerSchema::new("Fixed writer ID.") + .minimum(1) + .maximum(256) + ) + .required("chunk-count", IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.") + .minimum(1) + ) + .required("size", IntegerSchema::new("File size. This is used to verify that the server got all data.") + .minimum(1) + ) + ) +} + +fn close_fixed_index ( + param: Value, + _info: &ApiMethod, + rpcenv: &mut RpcEnvironment, +) -> Result { + + let wid = tools::required_integer_param(¶m, "wid")? as usize; + let chunk_count = tools::required_integer_param(¶m, "chunk-count")? as u64; + let size = tools::required_integer_param(¶m, "size")? as u64; + + let env: &BackupEnvironment = rpcenv.as_ref(); + + env.fixed_writer_close(wid, chunk_count, size)?; + + env.log(format!("sucessfully closed fixed index {}", wid)); + + Ok(Value::Null) +} fn finish_backup ( _param: Value, @@ -329,6 +459,18 @@ fn finish_backup ( Ok(Value::Null) } +pub fn api_method_dynamic_chunk_index() -> ApiAsyncMethod { + ApiAsyncMethod::new( + dynamic_chunk_index, + ObjectSchema::new(r###" +Download the dynamic chunk index from the previous backup. +Simply returns an empty list if this is the first backup. +"### + ) + .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) + ) +} + fn dynamic_chunk_index( _parts: Parts, _req_body: Body, @@ -344,7 +486,7 @@ fn dynamic_chunk_index( let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); if !archive_name.ends_with(".pxar") { - bail!("wrong archive extension"); + bail!("wrong archive extension: '{}'", archive_name); } else { archive_name.push_str(".didx"); } @@ -389,3 +531,74 @@ fn dynamic_chunk_index( Ok(Box::new(future::ok(response))) } + +pub fn api_method_fixed_chunk_index() -> ApiAsyncMethod { + ApiAsyncMethod::new( + fixed_chunk_index, + ObjectSchema::new(r###" +Download the fixed chunk index from the previous backup. +Simply returns an empty list if this is the first backup. +"### + ) + .required("archive-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone()) + ) +} + +fn fixed_chunk_index( + _parts: Parts, + _req_body: Body, + param: Value, + _info: &ApiAsyncMethod, + rpcenv: Box, +) -> Result { + + let env: &BackupEnvironment = rpcenv.as_ref(); + + let mut archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); + + if !archive_name.ends_with(".img") { + bail!("wrong archive extension: '{}'", archive_name); + } else { + archive_name.push_str(".fidx"); + } + + let empty_response = { + Response::builder() + .status(StatusCode::OK) + .body(Body::empty())? + }; + + let last_backup = match &env.last_backup { + Some(info) => info, + None => return Ok(Box::new(future::ok(empty_response))), + }; + + let mut path = last_backup.backup_dir.relative_path(); + path.push(&archive_name); + + let index = match env.datastore.open_fixed_reader(path) { + Ok(index) => index, + Err(_) => { + env.log(format!("there is no last backup for archive '{}'", archive_name)); + return Ok(Box::new(future::ok(empty_response))); + } + }; + + let count = index.index_count(); + for pos in 0..count { + let digest = index.index_digest(pos).unwrap(); + let size = index.chunk_size as u32; + env.register_chunk(*digest, size)?; + } + + let reader = DigestListEncoder::new(Box::new(index)); + + let stream = WrappedReaderStream::new(reader); + + // fixme: set size, content type? + let response = http::Response::builder() + .status(200) + .body(Body::wrap_stream(stream))?; + + Ok(Box::new(future::ok(response))) +} diff --git a/src/api2/admin/datastore/backup/environment.rs b/src/api2/admin/datastore/backup/environment.rs index da5b5f98..b99d9b29 100644 --- a/src/api2/admin/datastore/backup/environment.rs +++ b/src/api2/admin/datastore/backup/environment.rs @@ -18,10 +18,19 @@ struct DynamicWriterState { chunk_count: u64, } +struct FixedWriterState { + name: String, + index: FixedIndexWriter, + size: usize, + chunk_size: u32, + chunk_count: u64, +} + struct SharedBackupState { finished: bool, uid_counter: usize, dynamic_writers: HashMap, + fixed_writers: HashMap, known_chunks: HashMap<[u8;32], u32>, } @@ -70,6 +79,7 @@ impl BackupEnvironment { finished: false, uid_counter: 0, dynamic_writers: HashMap::new(), + fixed_writers: HashMap::new(), known_chunks: HashMap::new(), }; @@ -122,6 +132,21 @@ impl BackupEnvironment { Ok(uid) } + /// Store the writer with an unique ID + pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result { + let mut state = self.state.lock().unwrap(); + + state.ensure_unfinished()?; + + let uid = state.next_uid(); + + state.fixed_writers.insert(uid, FixedWriterState { + index, name, chunk_count: 0, size, chunk_size, + }); + + Ok(uid) + } + /// Append chunk to dynamic writer pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); @@ -146,6 +171,29 @@ impl BackupEnvironment { Ok(()) } + /// Append chunk to fixed writer + pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { + let mut state = self.state.lock().unwrap(); + + state.ensure_unfinished()?; + + let mut data = match state.fixed_writers.get_mut(&wid) { + Some(data) => data, + None => bail!("fixed writer '{}' not registered", wid), + }; + + data.chunk_count += 1; + + if size != data.chunk_size { + bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size); + } + + let pos = (offset as usize)/(data.chunk_size as usize); + data.index.add_digest(pos, digest)?; + + Ok(()) + } + /// Close dynamic writer pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); @@ -170,6 +218,27 @@ impl BackupEnvironment { Ok(()) } + /// Close fixed writer + pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64) -> Result<(), Error> { + let mut state = self.state.lock().unwrap(); + + state.ensure_unfinished()?; + + let mut data = match state.fixed_writers.remove(&wid) { + Some(data) => data, + None => bail!("fixed writer '{}' not registered", wid), + }; + + if data.chunk_count != chunk_count { + bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); + } + + + data.index.close()?; + + Ok(()) + } + /// Mark backup as finished pub fn finish_backup(&self) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 0cb00731..d47b0deb 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -430,10 +430,12 @@ impl BackupClient { self.h2.clone().post("finish", None).map(|_| ()) } - pub fn upload_dynamic_stream( + pub fn upload_stream( &self, archive_name: &str, stream: impl Stream, + prefix: &str, + fixed_size: Option, ) -> impl Future { let known_chunks = Arc::new(Mutex::new(HashSet::new())); @@ -452,23 +454,30 @@ impl BackupClient { let h2_3 = self.h2.clone(); let h2_4 = self.h2.clone(); - let param = json!({ "archive-name": archive_name }); + let mut param = json!({ "archive-name": archive_name }); + if let Some(size) = fixed_size { + param["size"] = size.into(); + } - Self::download_chunk_list(h2, "dynamic_index", archive_name, known_chunks.clone()) + let index_path = format!("{}_index", prefix); + let index_path2 = index_path.clone(); + let close_path = format!("{}_close", prefix); + + Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone()) .and_then(move |_| { - h2_2.post("dynamic_index", Some(param)) + h2_2.post(&index_path, Some(param)) }) .and_then(move |res| { let wid = res.as_u64().unwrap(); - Self::upload_stream(h2_3, wid, stream, known_chunks.clone()) + Self::upload_chunk_info_stream(h2_3, wid, stream, &index_path2, known_chunks.clone()) .and_then(move |(chunk_count, size, _speed)| { let param = json!({ "wid": wid , "chunk-count": chunk_count, "size": size, }); - h2_4.post("dynamic_close", Some(param)) - }) + h2_4.post(&close_path, Some(param)) + }) .map(|_| ()) }) } @@ -502,7 +511,7 @@ impl BackupClient { (verify_queue_tx, verify_result_rx) } - fn upload_chunk_queue(h2: H2Client, wid: u64) -> ( + fn upload_chunk_queue(h2: H2Client, wid: u64, path: String) -> ( mpsc::Sender<(MergedChunkInfo, Option)>, sync::oneshot::Receiver> ) { @@ -545,7 +554,7 @@ impl BackupClient { } println!("append chunks list len ({})", digest_list.len()); let param = json!({ "wid": wid, "digest-list": digest_list, "offset-list": offset_list }); - let mut request = H2Client::request_builder("localhost", "PUT", "dynamic_index", None).unwrap(); + let mut request = H2Client::request_builder("localhost", "PUT", &path, None).unwrap(); request.headers_mut().insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static("application/json")); let param_data = bytes::Bytes::from(param.to_string().as_bytes()); let upload_data = Some(param_data); @@ -609,10 +618,11 @@ impl BackupClient { }) } - fn upload_stream( + fn upload_chunk_info_stream( h2: H2Client, wid: u64, stream: impl Stream, + path: &str, known_chunks: Arc>>, ) -> impl Future { @@ -622,7 +632,7 @@ impl BackupClient { let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); let stream_len2 = stream_len.clone(); - let (upload_queue, upload_result) = Self::upload_chunk_queue(h2.clone(), wid); + let (upload_queue, upload_result) = Self::upload_chunk_queue(h2.clone(), wid, path.to_owned()); let start_time = std::time::Instant::now();