client: factor out UploadOptions

to reduce function signature complexity.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2021-01-25 14:42:52 +01:00 committed by Wolfgang Bumiller
parent 9cc1415ef5
commit e43b9175c0
2 changed files with 80 additions and 43 deletions

View File

@ -280,7 +280,6 @@ pub async fn api_datastore_latest_snapshot(
async fn backup_directory<P: AsRef<Path>>( async fn backup_directory<P: AsRef<Path>>(
client: &BackupWriter, client: &BackupWriter,
previous_manifest: Option<Arc<BackupManifest>>,
dir_path: P, dir_path: P,
archive_name: &str, archive_name: &str,
chunk_size: Option<usize>, chunk_size: Option<usize>,
@ -290,8 +289,7 @@ async fn backup_directory<P: AsRef<Path>>(
catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>, catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
exclude_pattern: Vec<MatchEntry>, exclude_pattern: Vec<MatchEntry>,
entries_max: usize, entries_max: usize,
compress: bool, upload_options: UploadOptions,
encrypt: bool,
) -> Result<BackupStats, Error> { ) -> Result<BackupStats, Error> {
let pxar_stream = PxarBackupStream::open( let pxar_stream = PxarBackupStream::open(
@ -317,8 +315,12 @@ async fn backup_directory<P: AsRef<Path>>(
} }
}); });
if upload_options.fixed_size.is_some() {
bail!("cannot backup directory with fixed chunk size!");
}
let stats = client let stats = client
.upload_stream(previous_manifest, archive_name, stream, "dynamic", None, compress, encrypt) .upload_stream(archive_name, stream, upload_options)
.await?; .await?;
Ok(stats) Ok(stats)
@ -326,14 +328,10 @@ async fn backup_directory<P: AsRef<Path>>(
async fn backup_image<P: AsRef<Path>>( async fn backup_image<P: AsRef<Path>>(
client: &BackupWriter, client: &BackupWriter,
previous_manifest: Option<Arc<BackupManifest>>,
image_path: P, image_path: P,
archive_name: &str, archive_name: &str,
image_size: u64,
chunk_size: Option<usize>, chunk_size: Option<usize>,
compress: bool, upload_options: UploadOptions,
encrypt: bool,
_verbose: bool,
) -> Result<BackupStats, Error> { ) -> Result<BackupStats, Error> {
let path = image_path.as_ref().to_owned(); let path = image_path.as_ref().to_owned();
@ -345,8 +343,12 @@ async fn backup_image<P: AsRef<Path>>(
let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024)); let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
if upload_options.fixed_size.is_none() {
bail!("cannot backup image with dynamic chunk size!");
}
let stats = client let stats = client
.upload_stream(previous_manifest, archive_name, stream, "fixed", Some(image_size), compress, encrypt) .upload_stream(archive_name, stream, upload_options)
.await?; .await?;
Ok(stats) Ok(stats)
@ -604,9 +606,15 @@ fn spawn_catalog_upload(
let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel(); let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
let upload_options = UploadOptions {
encrypt,
compress: true,
..UploadOptions::default()
};
tokio::spawn(async move { tokio::spawn(async move {
let catalog_upload_result = client let catalog_upload_result = client
.upload_stream(None, CATALOG_NAME, catalog_chunk_stream, "dynamic", None, true, encrypt) .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
.await; .await;
if let Err(ref err) = catalog_upload_result { if let Err(ref err) = catalog_upload_result {
@ -995,16 +1003,28 @@ async fn create_backup(
for (backup_type, filename, target, size) in upload_list { for (backup_type, filename, target, size) in upload_list {
match backup_type { match backup_type {
BackupSpecificationType::CONFIG => { BackupSpecificationType::CONFIG => {
let upload_options = UploadOptions {
compress: true,
encrypt: crypt_mode == CryptMode::Encrypt,
..UploadOptions::default()
};
println!("Upload config file '{}' to '{}' as {}", filename, repo, target); println!("Upload config file '{}' to '{}' as {}", filename, repo, target);
let stats = client let stats = client
.upload_blob_from_file(&filename, &target, true, crypt_mode == CryptMode::Encrypt) .upload_blob_from_file(&filename, &target, upload_options)
.await?; .await?;
manifest.add_file(target, stats.size, stats.csum, crypt_mode)?; manifest.add_file(target, stats.size, stats.csum, crypt_mode)?;
} }
BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ? BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ?
let upload_options = UploadOptions {
compress: true,
encrypt: crypt_mode == CryptMode::Encrypt,
..UploadOptions::default()
};
println!("Upload log file '{}' to '{}' as {}", filename, repo, target); println!("Upload log file '{}' to '{}' as {}", filename, repo, target);
let stats = client let stats = client
.upload_blob_from_file(&filename, &target, true, crypt_mode == CryptMode::Encrypt) .upload_blob_from_file(&filename, &target, upload_options)
.await?; .await?;
manifest.add_file(target, stats.size, stats.csum, crypt_mode)?; manifest.add_file(target, stats.size, stats.csum, crypt_mode)?;
} }
@ -1019,9 +1039,15 @@ async fn create_backup(
println!("Upload directory '{}' to '{}' as {}", filename, repo, target); println!("Upload directory '{}' to '{}' as {}", filename, repo, target);
catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
let upload_options = UploadOptions {
previous_manifest: previous_manifest.clone(),
compress: true,
encrypt: crypt_mode == CryptMode::Encrypt,
..UploadOptions::default()
};
let stats = backup_directory( let stats = backup_directory(
&client, &client,
previous_manifest.clone(),
&filename, &filename,
&target, &target,
chunk_size_opt, chunk_size_opt,
@ -1031,24 +1057,27 @@ async fn create_backup(
catalog.clone(), catalog.clone(),
pattern_list.clone(), pattern_list.clone(),
entries_max as usize, entries_max as usize,
true, upload_options,
crypt_mode == CryptMode::Encrypt,
).await?; ).await?;
manifest.add_file(target, stats.size, stats.csum, crypt_mode)?; manifest.add_file(target, stats.size, stats.csum, crypt_mode)?;
catalog.lock().unwrap().end_directory()?; catalog.lock().unwrap().end_directory()?;
} }
BackupSpecificationType::IMAGE => { BackupSpecificationType::IMAGE => {
println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
let upload_options = UploadOptions {
previous_manifest: previous_manifest.clone(),
fixed_size: Some(size),
compress: true,
encrypt: crypt_mode == CryptMode::Encrypt,
};
let stats = backup_image( let stats = backup_image(
&client, &client,
previous_manifest.clone(),
&filename, &filename,
&target, &target,
size,
chunk_size_opt, chunk_size_opt,
true, upload_options,
crypt_mode == CryptMode::Encrypt,
verbose,
).await?; ).await?;
manifest.add_file(target, stats.size, stats.csum, crypt_mode)?; manifest.add_file(target, stats.size, stats.csum, crypt_mode)?;
} }
@ -1074,8 +1103,9 @@ async fn create_backup(
if let Some(rsa_encrypted_key) = rsa_encrypted_key { if let Some(rsa_encrypted_key) = rsa_encrypted_key {
let target = ENCRYPTED_KEY_BLOB_NAME; let target = ENCRYPTED_KEY_BLOB_NAME;
println!("Upload RSA encoded key to '{:?}' as {}", repo, target); println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
let options = UploadOptions { compress: false, encrypt: false, ..UploadOptions::default() };
let stats = client let stats = client
.upload_blob_from_data(rsa_encrypted_key, target, false, false) .upload_blob_from_data(rsa_encrypted_key, target, options)
.await?; .await?;
manifest.add_file(target.to_string(), stats.size, stats.csum, crypt_mode)?; manifest.add_file(target.to_string(), stats.size, stats.csum, crypt_mode)?;
@ -1087,8 +1117,9 @@ async fn create_backup(
if verbose { println!("Upload index.json to '{}'", repo) }; if verbose { println!("Upload index.json to '{}'", repo) };
let options = UploadOptions { compress: true, encrypt: false, ..UploadOptions::default() };
client client
.upload_blob_from_data(manifest.into_bytes(), MANIFEST_BLOB_NAME, true, false) .upload_blob_from_data(manifest.into_bytes(), MANIFEST_BLOB_NAME, options)
.await?; .await?;
client.finish().await?; client.finish().await?;

View File

@ -39,6 +39,15 @@ pub struct BackupStats {
pub csum: [u8; 32], pub csum: [u8; 32],
} }
/// Options for uploading blobs/streams to the server
#[derive(Default, Clone)]
pub struct UploadOptions {
pub previous_manifest: Option<Arc<BackupManifest>>,
pub compress: bool,
pub encrypt: bool,
pub fixed_size: Option<u64>,
}
type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>; type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>; type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
@ -168,13 +177,12 @@ impl BackupWriter {
&self, &self,
data: Vec<u8>, data: Vec<u8>,
file_name: &str, file_name: &str,
compress: bool, options: UploadOptions,
encrypt: bool,
) -> Result<BackupStats, Error> { ) -> Result<BackupStats, Error> {
let blob = match (encrypt, &self.crypt_config) { let blob = match (options.encrypt, &self.crypt_config) {
(false, _) => DataBlob::encode(&data, None, compress)?, (false, _) => DataBlob::encode(&data, None, options.compress)?,
(true, None) => bail!("requested encryption without a crypt config"), (true, None) => bail!("requested encryption without a crypt config"),
(true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), compress)?, (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), options.compress)?,
}; };
let raw_data = blob.into_inner(); let raw_data = blob.into_inner();
@ -190,8 +198,7 @@ impl BackupWriter {
&self, &self,
src_path: P, src_path: P,
file_name: &str, file_name: &str,
compress: bool, options: UploadOptions,
encrypt: bool,
) -> Result<BackupStats, Error> { ) -> Result<BackupStats, Error> {
let src_path = src_path.as_ref(); let src_path = src_path.as_ref();
@ -206,34 +213,33 @@ impl BackupWriter {
.await .await
.map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?; .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
self.upload_blob_from_data(contents, file_name, compress, encrypt).await self.upload_blob_from_data(contents, file_name, options).await
} }
pub async fn upload_stream( pub async fn upload_stream(
&self, &self,
previous_manifest: Option<Arc<BackupManifest>>,
archive_name: &str, archive_name: &str,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>, stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
prefix: &str, options: UploadOptions,
fixed_size: Option<u64>,
compress: bool,
encrypt: bool,
) -> Result<BackupStats, Error> { ) -> Result<BackupStats, Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new())); let known_chunks = Arc::new(Mutex::new(HashSet::new()));
let mut param = json!({ "archive-name": archive_name }); let mut param = json!({ "archive-name": archive_name });
if let Some(size) = fixed_size { let prefix = if let Some(size) = options.fixed_size {
param["size"] = size.into(); param["size"] = size.into();
} "fixed"
} else {
"dynamic"
};
if encrypt && self.crypt_config.is_none() { if options.encrypt && self.crypt_config.is_none() {
bail!("requested encryption without a crypt config"); bail!("requested encryption without a crypt config");
} }
let index_path = format!("{}_index", prefix); let index_path = format!("{}_index", prefix);
let close_path = format!("{}_close", prefix); let close_path = format!("{}_close", prefix);
if let Some(manifest) = previous_manifest { if let Some(manifest) = options.previous_manifest {
// try, but ignore errors // try, but ignore errors
match archive_type(archive_name) { match archive_type(archive_name) {
Ok(ArchiveType::FixedIndex) => { Ok(ArchiveType::FixedIndex) => {
@ -255,8 +261,8 @@ impl BackupWriter {
stream, stream,
&prefix, &prefix,
known_chunks.clone(), known_chunks.clone(),
if encrypt { self.crypt_config.clone() } else { None }, if options.encrypt { self.crypt_config.clone() } else { None },
compress, options.compress,
self.verbose, self.verbose,
) )
.await?; .await?;