client: refactor catalog upload spawning

by pulling out Result type into separate struct

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2021-01-25 14:42:48 +01:00 committed by Wolfgang Bumiller
parent 905a570489
commit 6d233161b0

View File

@ -586,21 +586,21 @@ async fn start_garbage_collection(param: Value) -> Result<Value, Error> {
Ok(Value::Null) Ok(Value::Null)
} }
struct CatalogUploadResult {
catalog_writer: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
result: tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>,
}
fn spawn_catalog_upload( fn spawn_catalog_upload(
client: Arc<BackupWriter>, client: Arc<BackupWriter>,
encrypt: bool, encrypt: bool,
) -> Result< ) -> Result<CatalogUploadResult, Error> {
(
Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>
), Error>
{
let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
let catalog_stream = crate::tools::StdChannelStream(catalog_rx); let catalog_stream = crate::tools::StdChannelStream(catalog_rx);
let catalog_chunk_size = 512*1024; let catalog_chunk_size = 512*1024;
let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
let catalog = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?)); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?));
let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel(); let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
@ -617,7 +617,7 @@ fn spawn_catalog_upload(
let _ = catalog_result_tx.send(catalog_upload_result); let _ = catalog_result_tx.send(catalog_upload_result);
}); });
Ok((catalog, catalog_result_rx)) Ok(CatalogUploadResult { catalog_writer, result: catalog_result_rx })
} }
fn keyfile_parameters(param: &Value) -> Result<(Option<Vec<u8>>, CryptMode), Error> { fn keyfile_parameters(param: &Value) -> Result<(Option<Vec<u8>>, CryptMode), Error> {
@ -990,7 +990,7 @@ async fn create_backup(
let mut manifest = BackupManifest::new(snapshot); let mut manifest = BackupManifest::new(snapshot);
let mut catalog = None; let mut catalog = None;
let mut catalog_result_tx = None; let mut catalog_result_rx = None;
for (backup_type, filename, target, size) in upload_list { for (backup_type, filename, target, size) in upload_list {
match backup_type { match backup_type {
@ -1011,9 +1011,9 @@ async fn create_backup(
BackupSpecificationType::PXAR => { BackupSpecificationType::PXAR => {
// start catalog upload on first use // start catalog upload on first use
if catalog.is_none() { if catalog.is_none() {
let (cat, res) = spawn_catalog_upload(client.clone(), crypt_mode == CryptMode::Encrypt)?; let catalog_upload_res = spawn_catalog_upload(client.clone(), crypt_mode == CryptMode::Encrypt)?;
catalog = Some(cat); catalog = Some(catalog_upload_res.catalog_writer);
catalog_result_tx = Some(res); catalog_result_rx = Some(catalog_upload_res.result);
} }
let catalog = catalog.as_ref().unwrap(); let catalog = catalog.as_ref().unwrap();
@ -1065,7 +1065,7 @@ async fn create_backup(
drop(catalog); // close upload stream drop(catalog); // close upload stream
if let Some(catalog_result_rx) = catalog_result_tx { if let Some(catalog_result_rx) = catalog_result_rx {
let stats = catalog_result_rx.await??; let stats = catalog_result_rx.await??;
manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum, crypt_mode)?; manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum, crypt_mode)?;
} }