src/api2/admin/datastore/backup.rs: use separate api entry points for chunk upload

So that we can provide better statistics (patches follows).
This commit is contained in:
Dietmar Maurer 2019-05-30 08:10:06 +02:00
parent 390e83c9b9
commit 642322b433
5 changed files with 129 additions and 21 deletions

View File

@ -151,8 +151,8 @@ fn backup_api() -> Router {
let router = Router::new() let router = Router::new()
.subdir( .subdir(
"upload_chunk", Router::new() "dynamic_chunk", Router::new()
.upload(api_method_upload_chunk()) .upload(api_method_upload_dynamic_chunk())
) )
.subdir( .subdir(
"dynamic_index", Router::new() "dynamic_index", Router::new()
@ -164,6 +164,10 @@ fn backup_api() -> Router {
"dynamic_close", Router::new() "dynamic_close", Router::new()
.post(api_method_close_dynamic_index()) .post(api_method_close_dynamic_index())
) )
.subdir(
"fixed_chunk", Router::new()
.upload(api_method_upload_fixed_chunk())
)
.subdir( .subdir(
"fixed_index", Router::new() "fixed_index", Router::new()
.download(api_method_fixed_chunk_index()) .download(api_method_fixed_chunk_index())

View File

@ -112,6 +112,54 @@ impl BackupEnvironment {
Ok(()) Ok(())
} }
pub fn register_fixed_chunk(
&self,
wid: usize,
digest: [u8; 32],
size: u32,
compressed_size: u32,
is_duplicate: bool,
) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.fixed_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
};
if size != data.chunk_size {
bail!("fixed writer '{}' - got unexpected chunk size ({} != {}", data.name, size, data.chunk_size);
}
state.known_chunks.insert(digest, size);
Ok(())
}
pub fn register_dynamic_chunk(
&self,
wid: usize,
digest: [u8; 32],
size: u32,
compressed_size: u32,
is_duplicate: bool,
) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
let mut data = match state.dynamic_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("dynamic writer '{}' not registered", wid),
};
state.known_chunks.insert(digest, size);
Ok(())
}
pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> { pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();

View File

@ -85,7 +85,7 @@ impl BackupService {
impl hyper::service::Service for BackupService { impl hyper::service::Service for BackupService {
type ReqBody = Body; type ReqBody = Body;
type ResBody = Body; type ResBody = Body;
type Error = hyper::Error; type Error = Error;
type Future = Box<Future<Item = Response<Body>, Error = Self::Error> + Send>; type Future = Box<Future<Item = Response<Body>, Error = Self::Error> + Send>;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future { fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
@ -97,10 +97,10 @@ impl hyper::service::Service for BackupService {
match result { match result {
Ok(res) => { Ok(res) => {
Self::log_response(worker, method, &path, &res); Self::log_response(worker, method, &path, &res);
Ok::<_, hyper::Error>(res) Ok::<_, Error>(res)
} }
Err(err) => { Err(err) => {
if let Some(apierr) = err.downcast_ref::<HttpError>() { if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone())); let mut resp = Response::new(Body::from(apierr.message.clone()));
resp.extensions_mut().insert(ErrorMessageExtension(apierr.message.clone())); resp.extensions_mut().insert(ErrorMessageExtension(apierr.message.clone()));
*resp.status_mut() = apierr.code; *resp.status_mut() = apierr.code;

View File

@ -28,10 +28,10 @@ impl UploadChunk {
} }
impl Future for UploadChunk { impl Future for UploadChunk {
type Item = ([u8; 32], u32); type Item = ([u8; 32], u32, u32, bool);
type Error = failure::Error; type Error = failure::Error;
fn poll(&mut self) -> Poll<([u8; 32], u32), failure::Error> { fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> {
loop { loop {
match try_ready!(self.stream.poll()) { match try_ready!(self.stream.poll()) {
Some(chunk) => { Some(chunk) => {
@ -45,19 +45,23 @@ impl Future for UploadChunk {
bail!("uploaded chunk has unexpected size."); bail!("uploaded chunk has unexpected size.");
} }
let (_is_duplicate, digest, _compressed_size) = self.store.insert_chunk(&self.chunk)?; let (is_duplicate, digest, compressed_size) = self.store.insert_chunk(&self.chunk)?;
return Ok(Async::Ready((digest, self.size))) return Ok(Async::Ready((digest, self.size, compressed_size as u32, is_duplicate)))
} }
} }
} }
} }
} }
pub fn api_method_upload_chunk() -> ApiAsyncMethod { pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod {
ApiAsyncMethod::new( ApiAsyncMethod::new(
upload_chunk, upload_fixed_chunk,
ObjectSchema::new("Upload a new chunk.") ObjectSchema::new("Upload a new chunk.")
.required("wid", IntegerSchema::new("Fixed writer ID.")
.minimum(1)
.maximum(256)
)
.required("size", IntegerSchema::new("Chunk size.") .required("size", IntegerSchema::new("Chunk size.")
.minimum(1) .minimum(1)
.maximum(1024*1024*16) .maximum(1024*1024*16)
@ -65,7 +69,7 @@ pub fn api_method_upload_chunk() -> ApiAsyncMethod {
) )
} }
fn upload_chunk( fn upload_fixed_chunk(
_parts: Parts, _parts: Parts,
req_body: Body, req_body: Body,
param: Value, param: Value,
@ -73,6 +77,7 @@ fn upload_chunk(
rpcenv: Box<RpcEnvironment>, rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> { ) -> Result<BoxFut, Error> {
let wid = tools::required_integer_param(&param, "wid")? as usize;
let size = tools::required_integer_param(&param, "size")? as u32; let size = tools::required_integer_param(&param, "size")? as u32;
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
@ -83,8 +88,55 @@ fn upload_chunk(
.then(move |result| { .then(move |result| {
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
let result = result.and_then(|(digest, size)| { let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
env.register_chunk(digest, size)?; env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = tools::digest_to_hex(&digest);
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
Ok(json!(digest_str))
});
Ok(env.format_response(result))
});
Ok(Box::new(resp))
}
pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upload_dynamic_chunk,
ObjectSchema::new("Upload a new chunk.")
.required("wid", IntegerSchema::new("Dynamic writer ID.")
.minimum(1)
.maximum(256)
)
.required("size", IntegerSchema::new("Chunk size.")
.minimum(1)
.maximum(1024*1024*16)
)
)
}
fn upload_dynamic_chunk(
_parts: Parts,
req_body: Body,
param: Value,
_info: &ApiAsyncMethod,
rpcenv: Box<RpcEnvironment>,
) -> Result<BoxFut, Error> {
let wid = tools::required_integer_param(&param, "wid")? as usize;
let size = tools::required_integer_param(&param, "size")? as u32;
let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), size);
let resp = upload
.then(move |result| {
let env: &BackupEnvironment = rpcenv.as_ref();
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = tools::digest_to_hex(&digest); let digest_str = tools::digest_to_hex(&digest);
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str)); env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
Ok(json!(digest_str)) Ok(json!(digest_str))

View File

@ -462,16 +462,17 @@ impl BackupClient {
} }
let index_path = format!("{}_index", prefix); let index_path = format!("{}_index", prefix);
let index_path2 = index_path.clone();
let close_path = format!("{}_close", prefix); let close_path = format!("{}_close", prefix);
let prefix = prefix.to_owned();
Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone()) Self::download_chunk_list(h2, &index_path, archive_name, known_chunks.clone())
.and_then(move |_| { .and_then(move |_| {
h2_2.post(&index_path, Some(param)) h2_2.post(&index_path, Some(param))
}) })
.and_then(move |res| { .and_then(move |res| {
let wid = res.as_u64().unwrap(); let wid = res.as_u64().unwrap();
Self::upload_chunk_info_stream(h2_3, wid, stream, &index_path2, known_chunks.clone()) Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone())
.and_then(move |(chunk_count, size, _speed)| { .and_then(move |(chunk_count, size, _speed)| {
let param = json!({ let param = json!({
"wid": wid , "wid": wid ,
@ -513,7 +514,7 @@ impl BackupClient {
(verify_queue_tx, verify_result_rx) (verify_queue_tx, verify_result_rx)
} }
fn upload_chunk_queue(h2: H2Client, wid: u64, path: String) -> ( fn append_chunk_queue(h2: H2Client, wid: u64, path: String) -> (
mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>, mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>,
sync::oneshot::Receiver<Result<(), Error>> sync::oneshot::Receiver<Result<(), Error>>
) { ) {
@ -624,7 +625,7 @@ impl BackupClient {
h2: H2Client, h2: H2Client,
wid: u64, wid: u64,
stream: impl Stream<Item=ChunkInfo, Error=Error>, stream: impl Stream<Item=ChunkInfo, Error=Error>,
path: &str, prefix: &str,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> impl Future<Item=(usize, usize, usize), Error=Error> { ) -> impl Future<Item=(usize, usize, usize), Error=Error> {
@ -634,7 +635,10 @@ impl BackupClient {
let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); let stream_len = std::sync::Arc::new(AtomicUsize::new(0));
let stream_len2 = stream_len.clone(); let stream_len2 = stream_len.clone();
let (upload_queue, upload_result) = Self::upload_chunk_queue(h2.clone(), wid, path.to_owned()); let append_chunk_path = format!("{}_index", prefix);
let upload_chunk_path = format!("{}_chunk", prefix);
let (upload_queue, upload_result) = Self::append_chunk_queue(h2.clone(), wid, append_chunk_path.to_owned());
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
@ -663,8 +667,8 @@ impl BackupClient {
println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest), println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest),
chunk_info.data.len(), offset); chunk_info.data.len(), offset);
let param = json!({ "size" : chunk_info.data.len() }); let param = json!({ "wid": wid, "size" : chunk_info.data.len() });
let request = H2Client::request_builder("localhost", "POST", "upload_chunk", Some(param)).unwrap(); let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param)).unwrap();
let upload_data = Some(chunk_info.data.freeze()); let upload_data = Some(chunk_info.data.freeze());
let new_info = MergedChunkInfo::Known(vec![(offset, digest)]); let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);