src/api2/backup.rs: replace upload_config with upload_blob

This commit is contained in:
Dietmar Maurer 2019-06-23 09:48:23 +02:00
parent a38c5d4d12
commit cb08ac3efe
5 changed files with 50 additions and 31 deletions

View File

@ -154,8 +154,8 @@ pub fn backup_api() -> Router {
let router = Router::new()
.subdir(
"config", Router::new()
.upload(api_method_upload_config())
"blob", Router::new()
.upload(api_method_upload_blob())
)
.subdir(
"dynamic_chunk", Router::new()

View File

@ -86,8 +86,8 @@ pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod {
.maximum(1024*1024*16)
)
.required("encoded-size", IntegerSchema::new("Encoded chunk size.")
.minimum(9)
// fixme: .maximum(1024*1024*16+40)
.minimum((std::mem::size_of::<DataChunkHeader>() as isize)+1)
.maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataChunkHeader>() as isize))
)
)
}
@ -142,8 +142,8 @@ pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod {
.maximum(1024*1024*16)
)
.required("encoded-size", IntegerSchema::new("Encoded chunk size.")
.minimum(9)
// fixme: .maximum(1024*1024*16+40)
.minimum((std::mem::size_of::<DataChunkHeader>() as isize) +1)
.maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataChunkHeader>() as isize))
)
)
}
@ -222,19 +222,19 @@ fn upload_speedtest(
Ok(Box::new(resp))
}
pub fn api_method_upload_config() -> ApiAsyncMethod {
pub fn api_method_upload_blob() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upload_config,
ObjectSchema::new("Upload configuration file.")
upload_blob,
ObjectSchema::new("Upload binary blob file.")
.required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
.required("size", IntegerSchema::new("File size.")
.minimum(1)
.maximum(1024*1024*16)
.required("encoded-size", IntegerSchema::new("Encoded blob size.")
.minimum((std::mem::size_of::<DataBlobHeader>() as isize) +1)
.maximum(1024*1024*16+(std::mem::size_of::<EncryptedDataBlobHeader>() as isize))
)
)
}
fn upload_config(
fn upload_blob(
_parts: Parts,
req_body: Body,
param: Value,
@ -243,13 +243,9 @@ fn upload_config(
) -> Result<BoxFut, Error> {
let mut file_name = tools::required_string_param(&param, "file-name")?.to_owned();
let size = tools::required_integer_param(&param, "size")? as usize;
let encoded_size = tools::required_integer_param(&param, "encoded-size")? as usize;
if !file_name.ends_with(".conf") {
bail!("wrong config file extension: '{}'", file_name);
} else {
file_name.push_str(".zstd");
}
file_name.push_str(".blob");
let env: &BackupEnvironment = rpcenv.as_ref();
@ -262,17 +258,24 @@ fn upload_config(
let resp = req_body
.map_err(Error::from)
.concat2()
.fold(Vec::new(), |mut acc, chunk| {
acc.extend_from_slice(&*chunk);
Ok::<_, Error>(acc)
})
.and_then(move |data| {
if size != data.len() {
bail!("got configuration file with unexpected length ({} != {})", size, data.len());
if encoded_size != data.len() {
bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
}
let data = zstd::block::compress(&data, 0)?;
let orig_len = data.len(); // fixme:
tools::file_set_contents(&path, &data, None)?;
let mut blob = DataBlob::from_raw(data)?;
// always comput CRC at server side
blob.set_crc(blob.compute_crc());
env2.debug(format!("upload config {:?} ({} bytes, comp: {})", path, size, data.len()));
tools::file_set_contents(&path, blob.raw_data(), None)?;
env2.debug(format!("upload blob {:?} ({} bytes, comp: {})", path, orig_len, encoded_size));
Ok(())
})

View File

@ -24,6 +24,11 @@ impl DataBlob {
&self.raw_data
}
/// Consume self and returns raw_data
pub fn into_inner(self) -> Vec<u8> {
self.raw_data
}
/// accessor to chunk type (magic number)
pub fn magic(&self) -> &[u8; 8] {
self.raw_data[0..8].try_into().unwrap()
@ -42,7 +47,7 @@ impl DataBlob {
}
/// compute the CRC32 checksum
pub fn compute_crc(&mut self) -> u32 {
pub fn compute_crc(&self) -> u32 {
let mut hasher = crc32fast::Hasher::new();
let start = std::mem::size_of::<DataBlobHeader>(); // start after HEAD
hasher.update(&self.raw_data[start..]);

View File

@ -482,7 +482,7 @@ fn create_backup(
match backup_type {
BackupType::CONFIG => {
println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target);
client.upload_config(&filename, &target).wait()?;
client.upload_blob(&filename, &target, crypt_config.clone(), true).wait()?;
}
BackupType::PXAR => {
println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);

View File

@ -452,10 +452,12 @@ impl BackupClient {
self.canceller.take().unwrap().cancel();
}
pub fn upload_config<P: AsRef<std::path::Path>>(
pub fn upload_blob<P: AsRef<std::path::Path>>(
&self,
src_path: P,
file_name: &str,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
) -> impl Future<Item=(), Error=Error> {
let h2 = self.h2.clone();
@ -464,13 +466,22 @@ impl BackupClient {
let task = tokio::fs::File::open(src_path.clone())
.map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err))
.and_then(|file| {
.and_then(move |file| {
let contents = vec![];
tokio::io::read_to_end(file, contents)
.map_err(Error::from)
.and_then(move |(_, contents)| {
let param = json!({"size": contents.len(), "file-name": file_name });
h2.upload("config", Some(param), contents)
let blob = if let Some(ref crypt_config) = crypt_config {
DataBlob::encode(&contents, Some(crypt_config), compress)?
} else {
DataBlob::encode(&contents, None, compress)?
};
let raw_data = blob.into_inner();
Ok(raw_data)
})
.and_then(move |raw_data| {
let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
h2.upload("blob", Some(param), raw_data)
.map(|_| {})
})
});