diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index ce4af830..2495fd06 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -151,8 +151,8 @@ fn backup_api() -> Router { let router = Router::new() .subdir( - "upload_chunk", Router::new() - .upload(api_method_upload_chunk()) + "dynamic_chunk", Router::new() + .upload(api_method_upload_dynamic_chunk()) ) .subdir( "dynamic_index", Router::new() @@ -164,6 +164,10 @@ fn backup_api() -> Router { "dynamic_close", Router::new() .post(api_method_close_dynamic_index()) ) + .subdir( + "fixed_chunk", Router::new() + .upload(api_method_upload_fixed_chunk()) + ) .subdir( "fixed_index", Router::new() .download(api_method_fixed_chunk_index()) diff --git a/src/api2/admin/datastore/backup/environment.rs b/src/api2/admin/datastore/backup/environment.rs index 845a5946..0ebf7455 100644 --- a/src/api2/admin/datastore/backup/environment.rs +++ b/src/api2/admin/datastore/backup/environment.rs @@ -112,6 +112,54 @@ impl BackupEnvironment { Ok(()) } + pub fn register_fixed_chunk( + &self, + wid: usize, + digest: [u8; 32], + size: u32, + compressed_size: u32, + is_duplicate: bool, + ) -> Result<(), Error> { + let mut state = self.state.lock().unwrap(); + + state.ensure_unfinished()?; + + let mut data = match state.fixed_writers.get_mut(&wid) { + Some(data) => data, + None => bail!("fixed writer '{}' not registered", wid), + }; + + if size != data.chunk_size { + bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size); + } + + state.known_chunks.insert(digest, size); + + Ok(()) + } + + pub fn register_dynamic_chunk( + &self, + wid: usize, + digest: [u8; 32], + size: u32, + compressed_size: u32, + is_duplicate: bool, + ) -> Result<(), Error> { + let mut state = self.state.lock().unwrap(); + + state.ensure_unfinished()?; + + let mut data = match state.dynamic_writers.get_mut(&wid) { + Some(data) => data, + None => bail!("dynamic writer '{}' not registered", wid), + }; + + state.known_chunks.insert(digest, size); + + Ok(()) + } + pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option { let state = self.state.lock().unwrap(); diff --git a/src/api2/admin/datastore/backup/service.rs b/src/api2/admin/datastore/backup/service.rs index 325b1d69..ca9dd029 100644 --- a/src/api2/admin/datastore/backup/service.rs +++ b/src/api2/admin/datastore/backup/service.rs @@ -85,7 +85,7 @@ impl BackupService { impl hyper::service::Service for BackupService { type ReqBody = Body; type ResBody = Body; - type Error = hyper::Error; + type Error = Error; type Future = Box, Error = Self::Error> + Send>; fn call(&mut self, req: Request) -> Self::Future { @@ -97,10 +97,10 @@ impl hyper::service::Service for BackupService { match result { Ok(res) => { Self::log_response(worker, method, &path, &res); - Ok::<_, hyper::Error>(res) + Ok::<_, Error>(res) } Err(err) => { - if let Some(apierr) = err.downcast_ref::() { + if let Some(apierr) = err.downcast_ref::() { let mut resp = Response::new(Body::from(apierr.message.clone())); resp.extensions_mut().insert(ErrorMessageExtension(apierr.message.clone())); *resp.status_mut() = apierr.code; diff --git a/src/api2/admin/datastore/backup/upload_chunk.rs b/src/api2/admin/datastore/backup/upload_chunk.rs index dd6c202f..ae0d3db2 100644 --- a/src/api2/admin/datastore/backup/upload_chunk.rs +++ b/src/api2/admin/datastore/backup/upload_chunk.rs @@ -28,10 +28,10 @@ impl UploadChunk { } impl Future for UploadChunk { - type Item = ([u8; 32], u32); + type Item = ([u8; 32], u32, u32, bool); type Error = failure::Error; - fn poll(&mut self) -> Poll<([u8; 32], u32), failure::Error> { + fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> { loop { match try_ready!(self.stream.poll()) { Some(chunk) => { @@ -45,19 +45,23 @@ impl Future for UploadChunk { bail!("uploaded chunk has unexpected size."); } - let (_is_duplicate, digest, _compressed_size) = self.store.insert_chunk(&self.chunk)?; + let (is_duplicate, digest, compressed_size) = self.store.insert_chunk(&self.chunk)?; - return Ok(Async::Ready((digest, self.size))) + return Ok(Async::Ready((digest, self.size, compressed_size as u32, is_duplicate))) } } } } } -pub fn api_method_upload_chunk() -> ApiAsyncMethod { +pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod { ApiAsyncMethod::new( - upload_chunk, + upload_fixed_chunk, ObjectSchema::new("Upload a new chunk.") + .required("wid", IntegerSchema::new("Fixed writer ID.") + .minimum(1) + .maximum(256) + ) .required("size", IntegerSchema::new("Chunk size.") .minimum(1) .maximum(1024*1024*16) @@ -65,7 +69,7 @@ pub fn api_method_upload_chunk() -> ApiAsyncMethod { ) } -fn upload_chunk( +fn upload_fixed_chunk( _parts: Parts, req_body: Body, param: Value, @@ -73,6 +77,7 @@ fn upload_chunk( rpcenv: Box, ) -> Result { + let wid = tools::required_integer_param(¶m, "wid")? as usize; let size = tools::required_integer_param(¶m, "size")? as u32; let env: &BackupEnvironment = rpcenv.as_ref(); @@ -83,8 +88,55 @@ fn upload_chunk( .then(move |result| { let env: &BackupEnvironment = rpcenv.as_ref(); - let result = result.and_then(|(digest, size)| { - env.register_chunk(digest, size)?; + let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| { + env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?; + let digest_str = tools::digest_to_hex(&digest); + env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); + Ok(json!(digest_str)) + }); + + Ok(env.format_response(result)) + }); + + Ok(Box::new(resp)) +} + +pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod { + ApiAsyncMethod::new( + upload_dynamic_chunk, + ObjectSchema::new("Upload a new chunk.") + .required("wid", IntegerSchema::new("Dynamic writer ID.") + .minimum(1) + .maximum(256) + ) + .required("size", IntegerSchema::new("Chunk size.") + .minimum(1) + .maximum(1024*1024*16) + ) + ) +} + +fn upload_dynamic_chunk( + _parts: Parts, + req_body: Body, + param: Value, + _info: &ApiAsyncMethod, + rpcenv: Box, +) -> Result { + + let wid = tools::required_integer_param(¶m, "wid")? as usize; + let size = tools::required_integer_param(¶m, "size")? as u32; + + let env: &BackupEnvironment = rpcenv.as_ref(); + + let upload = UploadChunk::new(req_body, env.datastore.clone(), size); + + let resp = upload + .then(move |result| { + let env: &BackupEnvironment = rpcenv.as_ref(); + + let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| { + env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?; let digest_str = tools::digest_to_hex(&digest); env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); Ok(json!(digest_str)) diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 0e39d0ad..8927ffb8 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -462,16 +462,17 @@ impl BackupClient { } let index_path = format!("{}_index", prefix); - let index_path2 = index_path.clone(); let close_path = format!("{}_close", prefix); + let prefix = prefix.to_owned(); + Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone()) .and_then(move |_| { h2_2.post(&index_path, Some(param)) }) .and_then(move |res| { let wid = res.as_u64().unwrap(); - Self::upload_chunk_info_stream(h2_3, wid, stream, &index_path2, known_chunks.clone()) + Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone()) .and_then(move |(chunk_count, size, _speed)| { let param = json!({ "wid": wid , @@ -513,7 +514,7 @@ impl BackupClient { (verify_queue_tx, verify_result_rx) } - fn upload_chunk_queue(h2: H2Client, wid: u64, path: String) -> ( + fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> ( mpsc::Sender<(MergedChunkInfo, Option)>, sync::oneshot::Receiver> ) { @@ -624,7 +625,7 @@ impl BackupClient { h2: H2Client, wid: u64, stream: impl Stream, - path: &str, + prefix: &str, known_chunks: Arc>>, ) -> impl Future { @@ -634,7 +635,10 @@ impl BackupClient { let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); let stream_len2 = stream_len.clone(); - let (upload_queue, upload_result) = Self::upload_chunk_queue(h2.clone(), wid, path.to_owned()); + let append_chunk_path = format!("{}_index", prefix); + let upload_chunk_path = format!("{}_chunk", prefix); + + let (upload_queue, upload_result) = Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned()); let start_time = std::time::Instant::now(); @@ -663,8 +667,8 @@ impl BackupClient { println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest), chunk_info.data.len(), offset); - let param = json!({ "size" : chunk_info.data.len() }); - let request = H2Client::request_builder("localhost", "POST", "upload_chunk", Some(param)).unwrap(); + let param = json!({ "wid": wid, "size" : chunk_info.data.len() }); + let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param)).unwrap(); let upload_data = Some(chunk_info.data.freeze()); let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);