pull/sync: extract passed along vars into struct

this is basically the sync job config without ID and some stuff
converted already, and a convenient helper to generate the http client
from it.

Suggested-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
Reviewed-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2021-10-28 15:00:52 +02:00 committed by Thomas Lamprecht
parent e2e7560d5e
commit 6e9e6c7a54
3 changed files with 91 additions and 63 deletions

View File

@ -277,7 +277,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
} }
/// Helper to get client for remote.cfg entry /// Helper to get client for remote.cfg entry
pub async fn remote_client(remote: Remote) -> Result<HttpClient, Error> { pub async fn remote_client(remote: &Remote) -> Result<HttpClient, Error> {
let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone()); let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
let client = HttpClient::new( let client = HttpClient::new(
@ -322,7 +322,7 @@ pub async fn scan_remote_datastores(name: String) -> Result<Vec<DataStoreListIte
api_err) api_err)
}; };
let client = remote_client(remote) let client = remote_client(&remote)
.await .await
.map_err(map_remote_err)?; .map_err(map_remote_err)?;
let api_res = client let api_res = client

View File

@ -1,5 +1,5 @@
//! Sync datastore from remote server //! Sync datastore from remote server
use std::sync::{Arc}; use std::convert::TryFrom;
use anyhow::{format_err, Error}; use anyhow::{format_err, Error};
use futures::{select, future::FutureExt}; use futures::{select, future::FutureExt};
@ -7,18 +7,18 @@ use futures::{select, future::FutureExt};
use proxmox_schema::api; use proxmox_schema::api;
use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission}; use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
use pbs_client::{HttpClient, BackupRepository};
use pbs_api_types::{ use pbs_api_types::{
Remote, Authid, SyncJobConfig, Authid, SyncJobConfig,
DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
}; };
use pbs_tools::task_log; use pbs_tools::task_log;
use proxmox_rest_server::WorkerTask; use proxmox_rest_server::WorkerTask;
use pbs_config::CachedUserInfo; use pbs_config::CachedUserInfo;
use pbs_datastore::DataStore;
use crate::server::{jobstate::Job, pull::pull_store}; use crate::server::pull::{PullParameters, pull_store};
use crate::server::jobstate::Job;
pub fn check_pull_privs( pub fn check_pull_privs(
auth_id: &Authid, auth_id: &Authid,
@ -40,27 +40,18 @@ pub fn check_pull_privs(
Ok(()) Ok(())
} }
pub async fn get_pull_parameters( impl TryFrom<&SyncJobConfig> for PullParameters {
store: &str, type Error = Error;
remote: &str,
remote_store: &str,
) -> Result<(HttpClient, BackupRepository, Arc<DataStore>), Error> {
let tgt_store = DataStore::lookup_datastore(store)?; fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
PullParameters::new(
let (remote_config, _digest) = pbs_config::remote::config()?; &sync_job.store,
let remote: Remote = remote_config.lookup("remote", remote)?; &sync_job.remote,
&sync_job.remote_store,
let src_repo = BackupRepository::new( sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
Some(remote.config.auth_id.clone()), sync_job.remove_vanished,
Some(remote.config.host.clone()), )
remote.config.port, }
remote_store.to_string(),
);
let client = crate::api2::config::remote::remote_client(remote).await?;
Ok((client, src_repo, tgt_store))
} }
pub fn do_sync_job( pub fn do_sync_job(
@ -94,9 +85,8 @@ pub fn do_sync_job(
let worker_future = async move { let worker_future = async move {
let delete = sync_job.remove_vanished.unwrap_or(true); let pull_params = PullParameters::try_from(&sync_job)?;
let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone()); let client = pull_params.client().await?;
let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
task_log!(worker, "Starting datastore sync job '{}'", job_id); task_log!(worker, "Starting datastore sync job '{}'", job_id);
if let Some(event_str) = schedule { if let Some(event_str) = schedule {
@ -110,7 +100,7 @@ pub fn do_sync_job(
sync_job.remote_store, sync_job.remote_store,
); );
pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?; pull_store(&worker, &client, &pull_params).await?;
task_log!(worker, "sync job '{}' end", &job_id); task_log!(worker, "sync job '{}' end", &job_id);
@ -187,14 +177,21 @@ async fn pull (
check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?; check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?;
let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?; let pull_params = PullParameters::new(
&store,
&remote,
&remote_store,
auth_id.clone(),
remove_vanished,
)?;
let client = pull_params.client().await?;
// fixme: set to_stdout to false? // fixme: set to_stdout to false?
let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move { let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
task_log!(worker, "sync datastore '{}' start", store); task_log!(worker, "sync datastore '{}' start", store);
let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id); let pull_future = pull_store(&worker, &client, &pull_params);
let future = select!{ let future = select!{
success = pull_future.fuse() => success, success = pull_future.fuse() => success,
abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,

View File

@ -13,7 +13,7 @@ use http::StatusCode;
use proxmox_router::HttpError; use proxmox_router::HttpError;
use pbs_api_types::{Authid, SnapshotListItem, GroupListItem}; use pbs_api_types::{Authid, GroupListItem, Remote, SnapshotListItem};
use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress}; use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
use pbs_datastore::data_blob::DataBlob; use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::dynamic_index::DynamicIndexReader;
@ -33,6 +33,44 @@ use crate::tools::ParallelHandler;
// fixme: delete vanished groups // fixme: delete vanished groups
// Todo: correctly lock backup groups // Todo: correctly lock backup groups
pub struct PullParameters {
remote: Remote,
source: BackupRepository,
store: Arc<DataStore>,
owner: Authid,
remove_vanished: bool,
}
impl PullParameters {
pub fn new(
store: &str,
remote: &str,
remote_store: &str,
owner: Authid,
remove_vanished: Option<bool>,
) -> Result<Self, Error> {
let store = DataStore::lookup_datastore(store)?;
let (remote_config, _digest) = pbs_config::remote::config()?;
let remote: Remote = remote_config.lookup("remote", remote)?;
let remove_vanished = remove_vanished.unwrap_or(true);
let source = BackupRepository::new(
Some(remote.config.auth_id.clone()),
Some(remote.config.host.clone()),
remote.config.port,
remote_store.to_string(),
);
Ok(Self { remote, source, store, owner, remove_vanished })
}
pub async fn client(&self) -> Result<HttpClient, Error> {
crate::api2::config::remote::remote_client(&self.remote).await
}
}
async fn pull_index_chunks<I: IndexFile>( async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask, worker: &WorkerTask,
chunk_reader: RemoteChunkReader, chunk_reader: RemoteChunkReader,
@ -503,13 +541,11 @@ impl std::fmt::Display for SkipInfo {
pub async fn pull_group( pub async fn pull_group(
worker: &WorkerTask, worker: &WorkerTask,
client: &HttpClient, client: &HttpClient,
src_repo: &BackupRepository, params: &PullParameters,
tgt_store: Arc<DataStore>,
group: &BackupGroup, group: &BackupGroup,
delete: bool,
progress: &mut StoreProgress, progress: &mut StoreProgress,
) -> Result<(), Error> { ) -> Result<(), Error> {
let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store()); let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
let args = json!({ let args = json!({
"backup-type": group.backup_type(), "backup-type": group.backup_type(),
@ -525,7 +561,7 @@ pub async fn pull_group(
let fingerprint = client.fingerprint(); let fingerprint = client.fingerprint();
let last_sync = tgt_store.last_successful_backup(group)?; let last_sync = params.store.last_successful_backup(group)?;
let mut remote_snapshots = std::collections::HashSet::new(); let mut remote_snapshots = std::collections::HashSet::new();
@ -566,16 +602,16 @@ pub async fn pull_group(
let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()); let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
let new_client = HttpClient::new( let new_client = HttpClient::new(
src_repo.host(), params.source.host(),
src_repo.port(), params.source.port(),
src_repo.auth_id(), params.source.auth_id(),
options, options,
)?; )?;
let reader = BackupReader::start( let reader = BackupReader::start(
new_client, new_client,
None, None,
src_repo.store(), params.source.store(),
snapshot.group().backup_type(), snapshot.group().backup_type(),
snapshot.group().backup_id(), snapshot.group().backup_id(),
backup_time, backup_time,
@ -586,7 +622,7 @@ pub async fn pull_group(
let result = pull_snapshot_from( let result = pull_snapshot_from(
worker, worker,
reader, reader,
tgt_store.clone(), params.store.clone(),
&snapshot, &snapshot,
downloaded_chunks.clone(), downloaded_chunks.clone(),
) )
@ -598,14 +634,14 @@ pub async fn pull_group(
result?; // stop on error result?; // stop on error
} }
if delete { if params.remove_vanished {
let local_list = group.list_backups(&tgt_store.base_path())?; let local_list = group.list_backups(&params.store.base_path())?;
for info in local_list { for info in local_list {
let backup_time = info.backup_dir.backup_time(); let backup_time = info.backup_dir.backup_time();
if remote_snapshots.contains(&backup_time) { if remote_snapshots.contains(&backup_time) {
continue; continue;
} }
if info.backup_dir.is_protected(tgt_store.base_path()) { if info.backup_dir.is_protected(params.store.base_path()) {
task_log!( task_log!(
worker, worker,
"don't delete vanished snapshot {:?} (protected)", "don't delete vanished snapshot {:?} (protected)",
@ -614,7 +650,7 @@ pub async fn pull_group(
continue; continue;
} }
task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path()); task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
tgt_store.remove_backup_dir(&info.backup_dir, false)?; params.store.remove_backup_dir(&info.backup_dir, false)?;
} }
} }
@ -628,15 +664,12 @@ pub async fn pull_group(
pub async fn pull_store( pub async fn pull_store(
worker: &WorkerTask, worker: &WorkerTask,
client: &HttpClient, client: &HttpClient,
src_repo: &BackupRepository, params: &PullParameters,
tgt_store: Arc<DataStore>,
delete: bool,
auth_id: Authid,
) -> Result<(), Error> { ) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks // explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?; let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store()); let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
let mut result = client let mut result = client
.get(&path, None) .get(&path, None)
@ -675,7 +708,7 @@ pub async fn pull_store(
progress.done_snapshots = 0; progress.done_snapshots = 0;
progress.group_snapshots = 0; progress.group_snapshots = 0;
let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) { let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, &params.owner) {
Ok(result) => result, Ok(result) => result,
Err(err) => { Err(err) => {
task_log!( task_log!(
@ -689,21 +722,19 @@ pub async fn pull_store(
}; };
// permission check // permission check
if auth_id != owner { if params.owner != owner {
// only the owner is allowed to create additional snapshots // only the owner is allowed to create additional snapshots
task_log!( task_log!(
worker, worker,
"sync group {} failed - owner check failed ({} != {})", "sync group {} failed - owner check failed ({} != {})",
&group, auth_id, owner &group, params.owner, owner
); );
errors = true; // do not stop here, instead continue errors = true; // do not stop here, instead continue
} else if let Err(err) = pull_group( } else if let Err(err) = pull_group(
worker, worker,
client, client,
src_repo, params,
tgt_store.clone(),
&group, &group,
delete,
&mut progress, &mut progress,
) )
.await .await
@ -717,9 +748,9 @@ pub async fn pull_store(
} }
} }
if delete { if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({ let result: Result<(), Error> = proxmox_lang::try_block!({
let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?; let local_groups = BackupInfo::list_backup_groups(&params.store.base_path())?;
for local_group in local_groups { for local_group in local_groups {
if new_groups.contains(&local_group) { if new_groups.contains(&local_group) {
continue; continue;
@ -730,7 +761,7 @@ pub async fn pull_store(
local_group.backup_type(), local_group.backup_type(),
local_group.backup_id() local_group.backup_id()
); );
match tgt_store.remove_backup_group(&local_group) { match params.store.remove_backup_group(&local_group) {
Ok(true) => {}, Ok(true) => {},
Ok(false) => { Ok(false) => {
task_log!(worker, "kept some protected snapshots of group '{}'", local_group); task_log!(worker, "kept some protected snapshots of group '{}'", local_group);