diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs index 2a1dcb40..e3a22813 100644 --- a/src/api2/config/remote.rs +++ b/src/api2/config/remote.rs @@ -277,7 +277,7 @@ pub fn delete_remote(name: String, digest: Option) -> Result<(), Error> } /// Helper to get client for remote.cfg entry -pub async fn remote_client(remote: Remote) -> Result { +pub async fn remote_client(remote: &Remote) -> Result { let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone()); let client = HttpClient::new( @@ -322,7 +322,7 @@ pub async fn scan_remote_datastores(name: String) -> Result Result<(HttpClient, BackupRepository, Arc), Error> { +impl TryFrom<&SyncJobConfig> for PullParameters { + type Error = Error; - let tgt_store = DataStore::lookup_datastore(store)?; - - let (remote_config, _digest) = pbs_config::remote::config()?; - let remote: Remote = remote_config.lookup("remote", remote)?; - - let src_repo = BackupRepository::new( - Some(remote.config.auth_id.clone()), - Some(remote.config.host.clone()), - remote.config.port, - remote_store.to_string(), - ); - - let client = crate::api2::config::remote::remote_client(remote).await?; - - Ok((client, src_repo, tgt_store)) + fn try_from(sync_job: &SyncJobConfig) -> Result { + PullParameters::new( + &sync_job.store, + &sync_job.remote, + &sync_job.remote_store, + sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(), + sync_job.remove_vanished, + ) + } } pub fn do_sync_job( @@ -94,9 +85,8 @@ pub fn do_sync_job( let worker_future = async move { - let delete = sync_job.remove_vanished.unwrap_or(true); - let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone()); - let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?; + let pull_params = PullParameters::try_from(&sync_job)?; + let client = pull_params.client().await?; task_log!(worker, "Starting datastore sync job '{}'", job_id); if let Some(event_str) = schedule { @@ -110,7 +100,7 @@ pub fn do_sync_job( sync_job.remote_store, ); - pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?; + pull_store(&worker, &client, &pull_params).await?; task_log!(worker, "sync job '{}' end", &job_id); @@ -187,14 +177,21 @@ async fn pull ( check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?; - let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?; + let pull_params = PullParameters::new( + &store, + &remote, + &remote_store, + auth_id.clone(), + remove_vanished, + )?; + let client = pull_params.client().await?; // fixme: set to_stdout to false? let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move { task_log!(worker, "sync datastore '{}' start", store); - let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id); + let pull_future = pull_store(&worker, &client, &pull_params); let future = select!{ success = pull_future.fuse() => success, abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, diff --git a/src/server/pull.rs b/src/server/pull.rs index 5c3f9a18..2c454e2d 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -13,7 +13,7 @@ use http::StatusCode; use proxmox_router::HttpError; -use pbs_api_types::{Authid, SnapshotListItem, GroupListItem}; +use pbs_api_types::{Authid, GroupListItem, Remote, SnapshotListItem}; use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress}; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::dynamic_index::DynamicIndexReader; @@ -33,6 +33,44 @@ use crate::tools::ParallelHandler; // fixme: delete vanished groups // Todo: correctly lock backup groups +pub struct PullParameters { + remote: Remote, + source: BackupRepository, + store: Arc, + owner: Authid, + remove_vanished: bool, +} + +impl PullParameters { + pub fn new( + store: &str, + remote: &str, + remote_store: &str, + owner: Authid, + remove_vanished: Option, + ) -> Result { + let store = DataStore::lookup_datastore(store)?; + + let (remote_config, _digest) = pbs_config::remote::config()?; + let remote: Remote = remote_config.lookup("remote", remote)?; + + let remove_vanished = remove_vanished.unwrap_or(true); + + let source = BackupRepository::new( + Some(remote.config.auth_id.clone()), + Some(remote.config.host.clone()), + remote.config.port, + remote_store.to_string(), + ); + + Ok(Self { remote, source, store, owner, remove_vanished }) + } + + pub async fn client(&self) -> Result { + crate::api2::config::remote::remote_client(&self.remote).await + } +} + async fn pull_index_chunks( worker: &WorkerTask, chunk_reader: RemoteChunkReader, @@ -503,13 +541,11 @@ impl std::fmt::Display for SkipInfo { pub async fn pull_group( worker: &WorkerTask, client: &HttpClient, - src_repo: &BackupRepository, - tgt_store: Arc, + params: &PullParameters, group: &BackupGroup, - delete: bool, progress: &mut StoreProgress, ) -> Result<(), Error> { - let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); + let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store()); let args = json!({ "backup-type": group.backup_type(), @@ -525,7 +561,7 @@ pub async fn pull_group( let fingerprint = client.fingerprint(); - let last_sync = tgt_store.last_successful_backup(group)?; + let last_sync = params.store.last_successful_backup(group)?; let mut remote_snapshots = std::collections::HashSet::new(); @@ -566,16 +602,16 @@ pub async fn pull_group( let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()); let new_client = HttpClient::new( - src_repo.host(), - src_repo.port(), - src_repo.auth_id(), + params.source.host(), + params.source.port(), + params.source.auth_id(), options, )?; let reader = BackupReader::start( new_client, None, - src_repo.store(), + params.source.store(), snapshot.group().backup_type(), snapshot.group().backup_id(), backup_time, @@ -586,7 +622,7 @@ pub async fn pull_group( let result = pull_snapshot_from( worker, reader, - tgt_store.clone(), + params.store.clone(), &snapshot, downloaded_chunks.clone(), ) @@ -598,14 +634,14 @@ pub async fn pull_group( result?; // stop on error } - if delete { - let local_list = group.list_backups(&tgt_store.base_path())?; + if params.remove_vanished { + let local_list = group.list_backups(¶ms.store.base_path())?; for info in local_list { let backup_time = info.backup_dir.backup_time(); if remote_snapshots.contains(&backup_time) { continue; } - if info.backup_dir.is_protected(tgt_store.base_path()) { + if info.backup_dir.is_protected(params.store.base_path()) { task_log!( worker, "don't delete vanished snapshot {:?} (protected)", @@ -614,7 +650,7 @@ pub async fn pull_group( continue; } task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path()); - tgt_store.remove_backup_dir(&info.backup_dir, false)?; + params.store.remove_backup_dir(&info.backup_dir, false)?; } } @@ -628,15 +664,12 @@ pub async fn pull_group( pub async fn pull_store( worker: &WorkerTask, client: &HttpClient, - src_repo: &BackupRepository, - tgt_store: Arc, - delete: bool, - auth_id: Authid, + params: &PullParameters, ) -> Result<(), Error> { // explicit create shared lock to prevent GC on newly created chunks - let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; + let _shared_store_lock = params.store.try_shared_chunk_store_lock()?; - let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); + let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); let mut result = client .get(&path, None) @@ -675,7 +708,7 @@ pub async fn pull_store( progress.done_snapshots = 0; progress.group_snapshots = 0; - let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { + let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, ¶ms.owner) { Ok(result) => result, Err(err) => { task_log!( @@ -689,21 +722,19 @@ pub async fn pull_store( }; // permission check - if auth_id != owner { + if params.owner != owner { // only the owner is allowed to create additional snapshots task_log!( worker, "sync group {} failed - owner check failed ({} != {})", - &group, auth_id, owner + &group, params.owner, owner ); errors = true; // do not stop here, instead continue } else if let Err(err) = pull_group( worker, client, - src_repo, - tgt_store.clone(), + params, &group, - delete, &mut progress, ) .await @@ -717,9 +748,9 @@ pub async fn pull_store( } } - if delete { + if params.remove_vanished { let result: Result<(), Error> = proxmox_lang::try_block!({ - let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?; + let local_groups = BackupInfo::list_backup_groups(¶ms.store.base_path())?; for local_group in local_groups { if new_groups.contains(&local_group) { continue; @@ -730,7 +761,7 @@ pub async fn pull_store( local_group.backup_type(), local_group.backup_id() ); - match tgt_store.remove_backup_group(&local_group) { + match params.store.remove_backup_group(&local_group) { Ok(true) => {}, Ok(false) => { task_log!(worker, "kept some protected snapshots of group '{}'", local_group);