//! Sync datastore from remote server use std::convert::TryFrom; use anyhow::{format_err, Error}; use futures::{future::FutureExt, select}; use proxmox_router::{ApiMethod, Permission, Router, RpcEnvironment}; use proxmox_schema::api; use proxmox_sys::task_log; use pbs_api_types::{ Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, }; use pbs_config::CachedUserInfo; use proxmox_rest_server::WorkerTask; use crate::server::jobstate::Job; use crate::server::pull::{pull_store, PullParameters}; pub fn check_pull_privs( auth_id: &Authid, store: &str, ns: Option<&str>, remote: &str, remote_store: &str, delete: bool, ) -> Result<(), Error> { let user_info = CachedUserInfo::new()?; let local_store_ns_acl_path = match ns { Some(ns) => vec!["datastore", store, ns], None => vec!["datastore", store], }; user_info.check_privs( auth_id, &local_store_ns_acl_path, PRIV_DATASTORE_BACKUP, false, )?; user_info.check_privs( auth_id, &["remote", remote, remote_store], PRIV_REMOTE_READ, false, )?; if delete { user_info.check_privs( auth_id, &local_store_ns_acl_path, PRIV_DATASTORE_PRUNE, false, )?; } Ok(()) } impl TryFrom<&SyncJobConfig> for PullParameters { type Error = Error; fn try_from(sync_job: &SyncJobConfig) -> Result { PullParameters::new( &sync_job.store, sync_job.ns.clone().unwrap_or_default(), &sync_job.remote, &sync_job.remote_store, sync_job.remote_ns.clone().unwrap_or_default(), sync_job .owner .as_ref() .unwrap_or_else(|| Authid::root_auth_id()) .clone(), sync_job.remove_vanished, sync_job.max_depth, sync_job.group_filter.clone(), sync_job.limit.clone(), ) } } pub fn do_sync_job( mut job: Job, sync_job: SyncJobConfig, auth_id: &Authid, schedule: Option, to_stdout: bool, ) -> Result { let job_id = format!( "{}:{}:{}:{}:{}", sync_job.remote, sync_job.remote_store, sync_job.store, sync_job.ns.clone().unwrap_or_default(), job.jobname() ); let worker_type = job.jobtype().to_string(); let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store); let upid_str = WorkerTask::spawn( &worker_type, Some(job_id.clone()), auth_id.to_string(), to_stdout, move |worker| async move { job.start(&worker.upid().to_string())?; let worker2 = worker.clone(); let sync_job2 = sync_job.clone(); let worker_future = async move { let pull_params = PullParameters::try_from(&sync_job)?; let client = pull_params.client().await?; task_log!(worker, "Starting datastore sync job '{}'", job_id); if let Some(event_str) = schedule { task_log!(worker, "task triggered by schedule '{}'", event_str); } task_log!( worker, "sync datastore '{}' from '{}/{}'", sync_job.store, sync_job.remote, sync_job.remote_store, ); pull_store(&worker, &client, pull_params).await?; task_log!(worker, "sync job '{}' end", &job_id); Ok(()) }; let mut abort_future = worker2 .abort_future() .map(|_| Err(format_err!("sync aborted"))); let result = select! { worker = worker_future.fuse() => worker, abort = abort_future => abort, }; let status = worker2.create_state(&result); match job.finish(status) { Ok(_) => {} Err(err) => { eprintln!("could not finish job state: {}", err); } } if let Some(email) = email { if let Err(err) = crate::server::send_sync_status(&email, notify, &sync_job2, &result) { eprintln!("send sync notification failed: {}", err); } } result }, )?; Ok(upid_str) } #[api( input: { properties: { store: { schema: DATASTORE_SCHEMA, }, ns: { type: BackupNamespace, optional: true, }, remote: { schema: REMOTE_ID_SCHEMA, }, "remote-store": { schema: DATASTORE_SCHEMA, }, "remote-ns": { type: BackupNamespace, optional: true, }, "remove-vanished": { schema: REMOVE_VANISHED_BACKUPS_SCHEMA, optional: true, }, "max-depth": { schema: NS_MAX_DEPTH_REDUCED_SCHEMA, optional: true, }, "group-filter": { schema: GROUP_FILTER_LIST_SCHEMA, optional: true, }, limit: { type: RateLimitConfig, flatten: true, } }, }, access: { // Note: used parameters are no uri parameters, so we need to test inside function body description: r###"The user needs Datastore.Backup privilege on '/datastore/{store}', and needs to own the backup group. Remote.Read is required on '/remote/{remote}/{remote-store}'. The delete flag additionally requires the Datastore.Prune privilege on '/datastore/{store}'. "###, permission: &Permission::Anybody, }, )] /// Sync store from other repository async fn pull( store: String, ns: Option, remote: String, remote_store: String, remote_ns: Option, remove_vanished: Option, max_depth: Option, group_filter: Option>, limit: RateLimitConfig, _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; let delete = remove_vanished.unwrap_or(false); let ns = ns.unwrap_or_default(); let ns_str = if ns.is_root() { None } else { Some(ns.to_string()) }; check_pull_privs( &auth_id, &store, ns_str.as_deref(), &remote, &remote_store, delete, )?; let pull_params = PullParameters::new( &store, ns, &remote, &remote_store, remote_ns.unwrap_or_default(), auth_id.clone(), remove_vanished, max_depth, group_filter, limit, )?; let client = pull_params.client().await?; // fixme: set to_stdout to false? // FIXME: add namespace to worker id? let upid_str = WorkerTask::spawn( "sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move { task_log!( worker, "pull datastore '{}' from '{}/{}'", store, remote, remote_store, ); let pull_future = pull_store(&worker, &client, pull_params); let future = select! { success = pull_future.fuse() => success, abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, }; let _ = future?; task_log!(worker, "pull datastore '{}' end", store); Ok(()) }, )?; Ok(upid_str) } pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);