sync/pull: make namespace aware
Allow pulling all groups from a certain source namespace, and possibly sub namespaces until max-depth, into a target namespace. If any sub-namespaces get pulled, they will be mapped relatively from the source parent namespace to the target parent namespace. Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com> Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
parent
31aa38b684
commit
c06c1b4bd7
@ -1060,9 +1060,22 @@ pub struct DatastoreWithNamespace {
|
|||||||
impl fmt::Display for DatastoreWithNamespace {
|
impl fmt::Display for DatastoreWithNamespace {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
if self.ns.is_root() {
|
if self.ns.is_root() {
|
||||||
write!(f, "{}", self.store)
|
write!(f, "datastore {}, root namespace", self.store)
|
||||||
} else {
|
} else {
|
||||||
write!(f, "{}/{}", self.store, self.ns)
|
write!(f, "datastore '{}', namespace '{}'", self.store, self.ns)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DatastoreWithNamespace {
|
||||||
|
pub fn acl_path(&self) -> Vec<&str> {
|
||||||
|
let mut path: Vec<&str> = vec!["datastore", &self.store];
|
||||||
|
|
||||||
|
if self.ns.is_root() {
|
||||||
|
path
|
||||||
|
} else {
|
||||||
|
path.extend(self.ns.inner.iter().map(|comp| comp.as_str()));
|
||||||
|
path
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,15 +9,15 @@ use proxmox_schema::*;
|
|||||||
use crate::{
|
use crate::{
|
||||||
Authid, BackupNamespace, BackupType, RateLimitConfig, Userid, BACKUP_GROUP_SCHEMA,
|
Authid, BackupNamespace, BackupType, RateLimitConfig, Userid, BACKUP_GROUP_SCHEMA,
|
||||||
BACKUP_NAMESPACE_SCHEMA, DATASTORE_SCHEMA, DRIVE_NAME_SCHEMA, MEDIA_POOL_NAME_SCHEMA,
|
BACKUP_NAMESPACE_SCHEMA, DATASTORE_SCHEMA, DRIVE_NAME_SCHEMA, MEDIA_POOL_NAME_SCHEMA,
|
||||||
PROXMOX_SAFE_ID_FORMAT, REMOTE_ID_SCHEMA, SINGLE_LINE_COMMENT_SCHEMA,
|
NS_MAX_DEPTH_SCHEMA, PROXMOX_SAFE_ID_FORMAT, REMOTE_ID_SCHEMA, SINGLE_LINE_COMMENT_SCHEMA,
|
||||||
};
|
};
|
||||||
|
|
||||||
const_regex! {
|
const_regex! {
|
||||||
|
|
||||||
/// Regex for verification jobs 'DATASTORE:ACTUAL_JOB_ID'
|
/// Regex for verification jobs 'DATASTORE:ACTUAL_JOB_ID'
|
||||||
pub VERIFICATION_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):");
|
pub VERIFICATION_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):");
|
||||||
/// Regex for sync jobs 'REMOTE:REMOTE_DATASTORE:LOCAL_DATASTORE:ACTUAL_JOB_ID'
|
/// Regex for sync jobs 'REMOTE:REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID'
|
||||||
pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):");
|
pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(?:", BACKUP_NS_RE!(), r"):");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const JOB_ID_SCHEMA: Schema = StringSchema::new("Job ID.")
|
pub const JOB_ID_SCHEMA: Schema = StringSchema::new("Job ID.")
|
||||||
@ -413,6 +413,10 @@ pub const GROUP_FILTER_LIST_SCHEMA: Schema =
|
|||||||
store: {
|
store: {
|
||||||
schema: DATASTORE_SCHEMA,
|
schema: DATASTORE_SCHEMA,
|
||||||
},
|
},
|
||||||
|
ns: {
|
||||||
|
type: BackupNamespace,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
"owner": {
|
"owner": {
|
||||||
type: Authid,
|
type: Authid,
|
||||||
optional: true,
|
optional: true,
|
||||||
@ -423,10 +427,18 @@ pub const GROUP_FILTER_LIST_SCHEMA: Schema =
|
|||||||
"remote-store": {
|
"remote-store": {
|
||||||
schema: DATASTORE_SCHEMA,
|
schema: DATASTORE_SCHEMA,
|
||||||
},
|
},
|
||||||
|
"remote-ns": {
|
||||||
|
type: BackupNamespace,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
"remove-vanished": {
|
"remove-vanished": {
|
||||||
schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
|
schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
|
||||||
optional: true,
|
optional: true,
|
||||||
},
|
},
|
||||||
|
"max-depth": {
|
||||||
|
schema: NS_MAX_DEPTH_SCHEMA,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
comment: {
|
comment: {
|
||||||
optional: true,
|
optional: true,
|
||||||
schema: SINGLE_LINE_COMMENT_SCHEMA,
|
schema: SINGLE_LINE_COMMENT_SCHEMA,
|
||||||
@ -452,11 +464,17 @@ pub struct SyncJobConfig {
|
|||||||
pub id: String,
|
pub id: String,
|
||||||
pub store: String,
|
pub store: String,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub ns: Option<BackupNamespace>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub owner: Option<Authid>,
|
pub owner: Option<Authid>,
|
||||||
pub remote: String,
|
pub remote: String,
|
||||||
pub remote_store: String,
|
pub remote_store: String,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub remote_ns: Option<BackupNamespace>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub remove_vanished: Option<bool>,
|
pub remove_vanished: Option<bool>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub max_depth: usize,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub comment: Option<String>,
|
pub comment: Option<String>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
use ::serde::{Deserialize, Serialize};
|
use ::serde::{Deserialize, Serialize};
|
||||||
use anyhow::{bail, Error};
|
use anyhow::{bail, Error};
|
||||||
use hex::FromHex;
|
use hex::FromHex;
|
||||||
|
use pbs_api_types::BackupNamespace;
|
||||||
|
use pbs_api_types::MAX_NAMESPACE_DEPTH;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
|
use proxmox_router::{http_bail, Permission, Router, RpcEnvironment};
|
||||||
@ -25,11 +27,21 @@ pub fn check_sync_job_read_access(
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(ref ns) = job.ns {
|
||||||
|
let ns_privs = user_info.lookup_privs(auth_id, &["datastore", &job.store, &ns.to_string()]);
|
||||||
|
if ns_privs & PRIV_DATASTORE_AUDIT == 0 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
|
let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
|
||||||
remote_privs & PRIV_REMOTE_AUDIT != 0
|
remote_privs & PRIV_REMOTE_AUDIT != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// user can run the corresponding pull job
|
/// checks whether user can run the corresponding pull job
|
||||||
|
///
|
||||||
|
/// namespace creation/deletion ACL and backup group ownership checks happen in the pull code directly.
|
||||||
|
/// remote side checks/filters remote datastore/namespace/group access.
|
||||||
pub fn check_sync_job_modify_access(
|
pub fn check_sync_job_modify_access(
|
||||||
user_info: &CachedUserInfo,
|
user_info: &CachedUserInfo,
|
||||||
auth_id: &Authid,
|
auth_id: &Authid,
|
||||||
@ -40,6 +52,13 @@ pub fn check_sync_job_modify_access(
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(ref ns) = job.ns {
|
||||||
|
let ns_privs = user_info.lookup_privs(auth_id, &["datastore", &job.store, &ns.to_string()]);
|
||||||
|
if ns_privs & PRIV_DATASTORE_BACKUP == 0 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(true) = job.remove_vanished {
|
if let Some(true) = job.remove_vanished {
|
||||||
if datastore_privs & PRIV_DATASTORE_PRUNE == 0 {
|
if datastore_privs & PRIV_DATASTORE_PRUNE == 0 {
|
||||||
return false;
|
return false;
|
||||||
@ -198,6 +217,10 @@ pub enum DeletableProperty {
|
|||||||
rate_out,
|
rate_out,
|
||||||
/// Delete the burst_out property.
|
/// Delete the burst_out property.
|
||||||
burst_out,
|
burst_out,
|
||||||
|
/// Delete the ns property,
|
||||||
|
ns,
|
||||||
|
/// Delete the remote_ns property,
|
||||||
|
remote_ns,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[api(
|
#[api(
|
||||||
@ -283,10 +306,28 @@ pub fn update_sync_job(
|
|||||||
DeletableProperty::burst_out => {
|
DeletableProperty::burst_out => {
|
||||||
data.limit.burst_out = None;
|
data.limit.burst_out = None;
|
||||||
}
|
}
|
||||||
|
DeletableProperty::ns => {
|
||||||
|
data.ns = None;
|
||||||
|
}
|
||||||
|
DeletableProperty::remote_ns => {
|
||||||
|
data.remote_ns = None;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let check_max_depth = |ns: &BackupNamespace, depth| -> Result<(), Error> {
|
||||||
|
if ns.depth() + depth >= MAX_NAMESPACE_DEPTH {
|
||||||
|
bail!(
|
||||||
|
"namespace and recursion depth exceed limit: {} + {} >= {}",
|
||||||
|
ns.depth(),
|
||||||
|
depth,
|
||||||
|
MAX_NAMESPACE_DEPTH
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(comment) = update.comment {
|
if let Some(comment) = update.comment {
|
||||||
let comment = comment.trim().to_string();
|
let comment = comment.trim().to_string();
|
||||||
if comment.is_empty() {
|
if comment.is_empty() {
|
||||||
@ -299,12 +340,23 @@ pub fn update_sync_job(
|
|||||||
if let Some(store) = update.store {
|
if let Some(store) = update.store {
|
||||||
data.store = store;
|
data.store = store;
|
||||||
}
|
}
|
||||||
|
if let Some(ns) = update.ns {
|
||||||
|
check_max_depth(&ns, update.max_depth.unwrap_or(data.max_depth))?;
|
||||||
|
data.ns = Some(ns);
|
||||||
|
}
|
||||||
if let Some(remote) = update.remote {
|
if let Some(remote) = update.remote {
|
||||||
data.remote = remote;
|
data.remote = remote;
|
||||||
}
|
}
|
||||||
if let Some(remote_store) = update.remote_store {
|
if let Some(remote_store) = update.remote_store {
|
||||||
data.remote_store = remote_store;
|
data.remote_store = remote_store;
|
||||||
}
|
}
|
||||||
|
if let Some(remote_ns) = update.remote_ns {
|
||||||
|
check_max_depth(
|
||||||
|
&remote_ns,
|
||||||
|
update.max_depth.unwrap_or(data.max_depth),
|
||||||
|
)?;
|
||||||
|
data.remote_ns = Some(remote_ns);
|
||||||
|
}
|
||||||
if let Some(owner) = update.owner {
|
if let Some(owner) = update.owner {
|
||||||
data.owner = Some(owner);
|
data.owner = Some(owner);
|
||||||
}
|
}
|
||||||
@ -335,6 +387,15 @@ pub fn update_sync_job(
|
|||||||
if update.remove_vanished.is_some() {
|
if update.remove_vanished.is_some() {
|
||||||
data.remove_vanished = update.remove_vanished;
|
data.remove_vanished = update.remove_vanished;
|
||||||
}
|
}
|
||||||
|
if let Some(max_depth) = update.max_depth {
|
||||||
|
if let Some(ref ns) = data.ns {
|
||||||
|
check_max_depth(ns, max_depth)?;
|
||||||
|
}
|
||||||
|
if let Some(ref ns) = data.remote_ns {
|
||||||
|
check_max_depth(ns, max_depth)?;
|
||||||
|
}
|
||||||
|
data.max_depth = max_depth;
|
||||||
|
}
|
||||||
|
|
||||||
if !check_sync_job_modify_access(&user_info, &auth_id, &data) {
|
if !check_sync_job_modify_access(&user_info, &auth_id, &data) {
|
||||||
bail!("permission check failed");
|
bail!("permission check failed");
|
||||||
@ -453,10 +514,13 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
|
|||||||
id: "regular".to_string(),
|
id: "regular".to_string(),
|
||||||
remote: "remote0".to_string(),
|
remote: "remote0".to_string(),
|
||||||
remote_store: "remotestore1".to_string(),
|
remote_store: "remotestore1".to_string(),
|
||||||
|
remote_ns: None,
|
||||||
store: "localstore0".to_string(),
|
store: "localstore0".to_string(),
|
||||||
|
ns: None,
|
||||||
owner: Some(write_auth_id.clone()),
|
owner: Some(write_auth_id.clone()),
|
||||||
comment: None,
|
comment: None,
|
||||||
remove_vanished: None,
|
remove_vanished: None,
|
||||||
|
max_depth: 0,
|
||||||
group_filter: None,
|
group_filter: None,
|
||||||
schedule: None,
|
schedule: None,
|
||||||
limit: pbs_api_types::RateLimitConfig::default(), // no limit
|
limit: pbs_api_types::RateLimitConfig::default(), // no limit
|
||||||
|
@ -39,6 +39,7 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
|
|||||||
let remote = captures.get(1);
|
let remote = captures.get(1);
|
||||||
let remote_store = captures.get(2);
|
let remote_store = captures.get(2);
|
||||||
let local_store = captures.get(3);
|
let local_store = captures.get(3);
|
||||||
|
let local_ns = captures.get(4).map(|m| m.as_str());
|
||||||
|
|
||||||
if let (Some(remote), Some(remote_store), Some(local_store)) =
|
if let (Some(remote), Some(remote_store), Some(local_store)) =
|
||||||
(remote, remote_store, local_store)
|
(remote, remote_store, local_store)
|
||||||
@ -46,6 +47,7 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
|
|||||||
return check_pull_privs(
|
return check_pull_privs(
|
||||||
auth_id,
|
auth_id,
|
||||||
local_store.as_str(),
|
local_store.as_str(),
|
||||||
|
local_ns,
|
||||||
remote.as_str(),
|
remote.as_str(),
|
||||||
remote_store.as_str(),
|
remote_store.as_str(),
|
||||||
false,
|
false,
|
||||||
|
@ -9,9 +9,9 @@ use proxmox_schema::api;
|
|||||||
use proxmox_sys::task_log;
|
use proxmox_sys::task_log;
|
||||||
|
|
||||||
use pbs_api_types::{
|
use pbs_api_types::{
|
||||||
Authid, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
|
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
|
||||||
GROUP_FILTER_LIST_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
|
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE,
|
||||||
REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
|
PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
|
||||||
};
|
};
|
||||||
use pbs_config::CachedUserInfo;
|
use pbs_config::CachedUserInfo;
|
||||||
use proxmox_rest_server::WorkerTask;
|
use proxmox_rest_server::WorkerTask;
|
||||||
@ -22,13 +22,24 @@ use crate::server::pull::{pull_store, PullParameters};
|
|||||||
pub fn check_pull_privs(
|
pub fn check_pull_privs(
|
||||||
auth_id: &Authid,
|
auth_id: &Authid,
|
||||||
store: &str,
|
store: &str,
|
||||||
|
ns: Option<&str>,
|
||||||
remote: &str,
|
remote: &str,
|
||||||
remote_store: &str,
|
remote_store: &str,
|
||||||
delete: bool,
|
delete: bool,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let user_info = CachedUserInfo::new()?;
|
let user_info = CachedUserInfo::new()?;
|
||||||
|
|
||||||
user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_BACKUP, false)?;
|
let local_store_ns_acl_path = match ns {
|
||||||
|
Some(ns) => vec!["datastore", store, ns],
|
||||||
|
None => vec!["datastore", store],
|
||||||
|
};
|
||||||
|
|
||||||
|
user_info.check_privs(
|
||||||
|
auth_id,
|
||||||
|
&local_store_ns_acl_path,
|
||||||
|
PRIV_DATASTORE_BACKUP,
|
||||||
|
false,
|
||||||
|
)?;
|
||||||
user_info.check_privs(
|
user_info.check_privs(
|
||||||
auth_id,
|
auth_id,
|
||||||
&["remote", remote, remote_store],
|
&["remote", remote, remote_store],
|
||||||
@ -37,7 +48,12 @@ pub fn check_pull_privs(
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
if delete {
|
if delete {
|
||||||
user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_PRUNE, false)?;
|
user_info.check_privs(
|
||||||
|
auth_id,
|
||||||
|
&local_store_ns_acl_path,
|
||||||
|
PRIV_DATASTORE_PRUNE,
|
||||||
|
false,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -49,14 +65,17 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
|
|||||||
fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
|
fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
|
||||||
PullParameters::new(
|
PullParameters::new(
|
||||||
&sync_job.store,
|
&sync_job.store,
|
||||||
|
sync_job.ns.clone().unwrap_or_default(),
|
||||||
&sync_job.remote,
|
&sync_job.remote,
|
||||||
&sync_job.remote_store,
|
&sync_job.remote_store,
|
||||||
|
sync_job.remote_ns.clone().unwrap_or_default(),
|
||||||
sync_job
|
sync_job
|
||||||
.owner
|
.owner
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unwrap_or_else(|| Authid::root_auth_id())
|
.unwrap_or_else(|| Authid::root_auth_id())
|
||||||
.clone(),
|
.clone(),
|
||||||
sync_job.remove_vanished,
|
sync_job.remove_vanished,
|
||||||
|
sync_job.max_depth,
|
||||||
sync_job.group_filter.clone(),
|
sync_job.group_filter.clone(),
|
||||||
sync_job.limit.clone(),
|
sync_job.limit.clone(),
|
||||||
)
|
)
|
||||||
@ -71,10 +90,11 @@ pub fn do_sync_job(
|
|||||||
to_stdout: bool,
|
to_stdout: bool,
|
||||||
) -> Result<String, Error> {
|
) -> Result<String, Error> {
|
||||||
let job_id = format!(
|
let job_id = format!(
|
||||||
"{}:{}:{}:{}",
|
"{}:{}:{}:{}:{}",
|
||||||
sync_job.remote,
|
sync_job.remote,
|
||||||
sync_job.remote_store,
|
sync_job.remote_store,
|
||||||
sync_job.store,
|
sync_job.store,
|
||||||
|
sync_job.ns.clone().unwrap_or_default(),
|
||||||
job.jobname()
|
job.jobname()
|
||||||
);
|
);
|
||||||
let worker_type = job.jobtype().to_string();
|
let worker_type = job.jobtype().to_string();
|
||||||
@ -154,16 +174,28 @@ pub fn do_sync_job(
|
|||||||
store: {
|
store: {
|
||||||
schema: DATASTORE_SCHEMA,
|
schema: DATASTORE_SCHEMA,
|
||||||
},
|
},
|
||||||
|
ns: {
|
||||||
|
type: BackupNamespace,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
remote: {
|
remote: {
|
||||||
schema: REMOTE_ID_SCHEMA,
|
schema: REMOTE_ID_SCHEMA,
|
||||||
},
|
},
|
||||||
"remote-store": {
|
"remote-store": {
|
||||||
schema: DATASTORE_SCHEMA,
|
schema: DATASTORE_SCHEMA,
|
||||||
},
|
},
|
||||||
|
"remote-ns": {
|
||||||
|
type: BackupNamespace,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
"remove-vanished": {
|
"remove-vanished": {
|
||||||
schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
|
schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
|
||||||
optional: true,
|
optional: true,
|
||||||
},
|
},
|
||||||
|
"max-depth": {
|
||||||
|
schema: NS_MAX_DEPTH_SCHEMA,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
"group-filter": {
|
"group-filter": {
|
||||||
schema: GROUP_FILTER_LIST_SCHEMA,
|
schema: GROUP_FILTER_LIST_SCHEMA,
|
||||||
optional: true,
|
optional: true,
|
||||||
@ -186,9 +218,12 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
|
|||||||
/// Sync store from other repository
|
/// Sync store from other repository
|
||||||
async fn pull(
|
async fn pull(
|
||||||
store: String,
|
store: String,
|
||||||
|
ns: Option<BackupNamespace>,
|
||||||
remote: String,
|
remote: String,
|
||||||
remote_store: String,
|
remote_store: String,
|
||||||
|
remote_ns: Option<BackupNamespace>,
|
||||||
remove_vanished: Option<bool>,
|
remove_vanished: Option<bool>,
|
||||||
|
max_depth: Option<usize>,
|
||||||
group_filter: Option<Vec<GroupFilter>>,
|
group_filter: Option<Vec<GroupFilter>>,
|
||||||
limit: RateLimitConfig,
|
limit: RateLimitConfig,
|
||||||
_info: &ApiMethod,
|
_info: &ApiMethod,
|
||||||
@ -197,14 +232,32 @@ async fn pull(
|
|||||||
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
|
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
|
||||||
let delete = remove_vanished.unwrap_or(false);
|
let delete = remove_vanished.unwrap_or(false);
|
||||||
|
|
||||||
check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?;
|
let ns = ns.unwrap_or_default();
|
||||||
|
let max_depth = max_depth.unwrap_or(0);
|
||||||
|
let ns_str = if ns.is_root() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(ns.to_string())
|
||||||
|
};
|
||||||
|
|
||||||
|
check_pull_privs(
|
||||||
|
&auth_id,
|
||||||
|
&store,
|
||||||
|
ns_str.as_deref(),
|
||||||
|
&remote,
|
||||||
|
&remote_store,
|
||||||
|
delete,
|
||||||
|
)?;
|
||||||
|
|
||||||
let pull_params = PullParameters::new(
|
let pull_params = PullParameters::new(
|
||||||
&store,
|
&store,
|
||||||
|
ns,
|
||||||
&remote,
|
&remote,
|
||||||
&remote_store,
|
&remote_store,
|
||||||
|
remote_ns.unwrap_or_default(),
|
||||||
auth_id.clone(),
|
auth_id.clone(),
|
||||||
remove_vanished,
|
remove_vanished,
|
||||||
|
max_depth,
|
||||||
group_filter,
|
group_filter,
|
||||||
limit,
|
limit,
|
||||||
)?;
|
)?;
|
||||||
@ -217,7 +270,13 @@ async fn pull(
|
|||||||
auth_id.to_string(),
|
auth_id.to_string(),
|
||||||
true,
|
true,
|
||||||
move |worker| async move {
|
move |worker| async move {
|
||||||
task_log!(worker, "sync datastore '{}' start", store);
|
task_log!(
|
||||||
|
worker,
|
||||||
|
"pull datastore '{}' from '{}/{}'",
|
||||||
|
store,
|
||||||
|
remote,
|
||||||
|
remote_store,
|
||||||
|
);
|
||||||
|
|
||||||
let pull_future = pull_store(&worker, &client, &pull_params);
|
let pull_future = pull_store(&worker, &client, &pull_params);
|
||||||
let future = select! {
|
let future = select! {
|
||||||
@ -227,7 +286,7 @@ async fn pull(
|
|||||||
|
|
||||||
let _ = future?;
|
let _ = future?;
|
||||||
|
|
||||||
task_log!(worker, "sync datastore '{}' end", store);
|
task_log!(worker, "pull datastore '{}' end", store);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
|
@ -12,8 +12,9 @@ use proxmox_sys::fs::CreateOptions;
|
|||||||
use pbs_api_types::percent_encoding::percent_encode_component;
|
use pbs_api_types::percent_encoding::percent_encode_component;
|
||||||
use pbs_api_types::{
|
use pbs_api_types::{
|
||||||
BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
|
BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
|
||||||
GROUP_FILTER_LIST_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, REMOTE_ID_SCHEMA,
|
GROUP_FILTER_LIST_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, NS_MAX_DEPTH_SCHEMA,
|
||||||
REMOVE_VANISHED_BACKUPS_SCHEMA, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
|
REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, UPID_SCHEMA,
|
||||||
|
VERIFICATION_OUTDATED_AFTER_SCHEMA,
|
||||||
};
|
};
|
||||||
use pbs_client::{display_task_log, view_task_result};
|
use pbs_client::{display_task_log, view_task_result};
|
||||||
use pbs_config::sync;
|
use pbs_config::sync;
|
||||||
@ -234,19 +235,31 @@ fn task_mgmt_cli() -> CommandLineInterface {
|
|||||||
#[api(
|
#[api(
|
||||||
input: {
|
input: {
|
||||||
properties: {
|
properties: {
|
||||||
"local-store": {
|
"store": {
|
||||||
schema: DATASTORE_SCHEMA,
|
schema: DATASTORE_SCHEMA,
|
||||||
},
|
},
|
||||||
|
"ns": {
|
||||||
|
type: BackupNamespace,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
remote: {
|
remote: {
|
||||||
schema: REMOTE_ID_SCHEMA,
|
schema: REMOTE_ID_SCHEMA,
|
||||||
},
|
},
|
||||||
"remote-store": {
|
"remote-store": {
|
||||||
schema: DATASTORE_SCHEMA,
|
schema: DATASTORE_SCHEMA,
|
||||||
},
|
},
|
||||||
|
"remote-ns": {
|
||||||
|
type: BackupNamespace,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
"remove-vanished": {
|
"remove-vanished": {
|
||||||
schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
|
schema: REMOVE_VANISHED_BACKUPS_SCHEMA,
|
||||||
optional: true,
|
optional: true,
|
||||||
},
|
},
|
||||||
|
"max-depth": {
|
||||||
|
schema: NS_MAX_DEPTH_SCHEMA,
|
||||||
|
optional: true,
|
||||||
|
},
|
||||||
"group-filter": {
|
"group-filter": {
|
||||||
schema: GROUP_FILTER_LIST_SCHEMA,
|
schema: GROUP_FILTER_LIST_SCHEMA,
|
||||||
optional: true,
|
optional: true,
|
||||||
@ -266,8 +279,11 @@ fn task_mgmt_cli() -> CommandLineInterface {
|
|||||||
async fn pull_datastore(
|
async fn pull_datastore(
|
||||||
remote: String,
|
remote: String,
|
||||||
remote_store: String,
|
remote_store: String,
|
||||||
local_store: String,
|
remote_ns: Option<BackupNamespace>,
|
||||||
|
store: String,
|
||||||
|
ns: Option<BackupNamespace>,
|
||||||
remove_vanished: Option<bool>,
|
remove_vanished: Option<bool>,
|
||||||
|
max_depth: Option<usize>,
|
||||||
group_filter: Option<Vec<GroupFilter>>,
|
group_filter: Option<Vec<GroupFilter>>,
|
||||||
limit: RateLimitConfig,
|
limit: RateLimitConfig,
|
||||||
param: Value,
|
param: Value,
|
||||||
@ -277,12 +293,24 @@ async fn pull_datastore(
|
|||||||
let client = connect_to_localhost()?;
|
let client = connect_to_localhost()?;
|
||||||
|
|
||||||
let mut args = json!({
|
let mut args = json!({
|
||||||
"store": local_store,
|
"store": store,
|
||||||
"remote": remote,
|
"remote": remote,
|
||||||
"remote-store": remote_store,
|
"remote-store": remote_store,
|
||||||
"limit": limit,
|
"limit": limit,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if remote_ns.is_some() {
|
||||||
|
args["remote-ns"] = json!(remote_ns);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ns.is_some() {
|
||||||
|
args["local-ns"] = json!(ns);
|
||||||
|
}
|
||||||
|
|
||||||
|
if max_depth.is_some() {
|
||||||
|
args["max-depth"] = json!(max_depth);
|
||||||
|
}
|
||||||
|
|
||||||
if group_filter.is_some() {
|
if group_filter.is_some() {
|
||||||
args["group-filter"] = json!(group_filter);
|
args["group-filter"] = json!(group_filter);
|
||||||
}
|
}
|
||||||
@ -406,14 +434,13 @@ async fn run() -> Result<(), Error> {
|
|||||||
.insert(
|
.insert(
|
||||||
"pull",
|
"pull",
|
||||||
CliCommand::new(&API_METHOD_PULL_DATASTORE)
|
CliCommand::new(&API_METHOD_PULL_DATASTORE)
|
||||||
.arg_param(&["remote", "remote-store", "local-store"])
|
.arg_param(&["remote", "remote-store", "store"])
|
||||||
.completion_cb(
|
.completion_cb("store", pbs_config::datastore::complete_datastore_name)
|
||||||
"local-store",
|
.completion_cb("ns", complete_sync_local_datastore_namespace)
|
||||||
pbs_config::datastore::complete_datastore_name,
|
|
||||||
)
|
|
||||||
.completion_cb("remote", pbs_config::remote::complete_remote_name)
|
.completion_cb("remote", pbs_config::remote::complete_remote_name)
|
||||||
.completion_cb("remote-store", complete_remote_datastore_name)
|
.completion_cb("remote-store", complete_remote_datastore_name)
|
||||||
.completion_cb("group-filter", complete_remote_datastore_group_filter),
|
.completion_cb("group-filter", complete_remote_datastore_group_filter)
|
||||||
|
.completion_cb("remote-ns", complete_remote_datastore_namespace),
|
||||||
)
|
)
|
||||||
.insert(
|
.insert(
|
||||||
"verify",
|
"verify",
|
||||||
@ -507,6 +534,12 @@ fn get_remote_ns(param: &HashMap<String, String>) -> Option<BackupNamespace> {
|
|||||||
if let Some(ns_str) = param.get("remote-ns") {
|
if let Some(ns_str) = param.get("remote-ns") {
|
||||||
BackupNamespace::from_str(ns_str).ok()
|
BackupNamespace::from_str(ns_str).ok()
|
||||||
} else {
|
} else {
|
||||||
|
if let Some(id) = param.get("id") {
|
||||||
|
let job = get_sync_job(id).ok();
|
||||||
|
if let Some(ref job) = job {
|
||||||
|
return job.remote_ns.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -103,12 +103,14 @@ pub fn sync_job_commands() -> CommandLineInterface {
|
|||||||
.completion_cb("id", pbs_config::sync::complete_sync_job_id)
|
.completion_cb("id", pbs_config::sync::complete_sync_job_id)
|
||||||
.completion_cb("schedule", pbs_config::datastore::complete_calendar_event)
|
.completion_cb("schedule", pbs_config::datastore::complete_calendar_event)
|
||||||
.completion_cb("store", pbs_config::datastore::complete_datastore_name)
|
.completion_cb("store", pbs_config::datastore::complete_datastore_name)
|
||||||
|
.completion_cb("ns", crate::complete_sync_local_datastore_namespace)
|
||||||
.completion_cb("remote", pbs_config::remote::complete_remote_name)
|
.completion_cb("remote", pbs_config::remote::complete_remote_name)
|
||||||
.completion_cb("remote-store", crate::complete_remote_datastore_name)
|
.completion_cb("remote-store", crate::complete_remote_datastore_name)
|
||||||
.completion_cb(
|
.completion_cb(
|
||||||
"group-filter",
|
"group-filter",
|
||||||
crate::complete_remote_datastore_group_filter,
|
crate::complete_remote_datastore_group_filter,
|
||||||
),
|
)
|
||||||
|
.completion_cb("remote-ns", crate::complete_remote_datastore_namespace),
|
||||||
)
|
)
|
||||||
.insert(
|
.insert(
|
||||||
"update",
|
"update",
|
||||||
@ -117,11 +119,13 @@ pub fn sync_job_commands() -> CommandLineInterface {
|
|||||||
.completion_cb("id", pbs_config::sync::complete_sync_job_id)
|
.completion_cb("id", pbs_config::sync::complete_sync_job_id)
|
||||||
.completion_cb("schedule", pbs_config::datastore::complete_calendar_event)
|
.completion_cb("schedule", pbs_config::datastore::complete_calendar_event)
|
||||||
.completion_cb("store", pbs_config::datastore::complete_datastore_name)
|
.completion_cb("store", pbs_config::datastore::complete_datastore_name)
|
||||||
|
.completion_cb("ns", crate::complete_sync_local_datastore_namespace)
|
||||||
.completion_cb("remote-store", crate::complete_remote_datastore_name)
|
.completion_cb("remote-store", crate::complete_remote_datastore_name)
|
||||||
.completion_cb(
|
.completion_cb(
|
||||||
"group-filter",
|
"group-filter",
|
||||||
crate::complete_remote_datastore_group_filter,
|
crate::complete_remote_datastore_group_filter,
|
||||||
),
|
)
|
||||||
|
.completion_cb("remote-ns", crate::complete_remote_datastore_namespace),
|
||||||
)
|
)
|
||||||
.insert(
|
.insert(
|
||||||
"remove",
|
"remove",
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
//! Sync datastore from remote server
|
//! Sync datastore from remote server
|
||||||
|
|
||||||
|
use std::cmp::min;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::io::{Seek, SeekFrom};
|
use std::io::{Seek, SeekFrom};
|
||||||
@ -9,14 +10,16 @@ use std::time::SystemTime;
|
|||||||
|
|
||||||
use anyhow::{bail, format_err, Error};
|
use anyhow::{bail, format_err, Error};
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
|
use pbs_config::CachedUserInfo;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
use proxmox_router::HttpError;
|
use proxmox_router::HttpError;
|
||||||
use proxmox_sys::task_log;
|
use proxmox_sys::task_log;
|
||||||
|
|
||||||
use pbs_api_types::{
|
use pbs_api_types::{
|
||||||
Authid, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote,
|
Authid, BackupNamespace, DatastoreWithNamespace, GroupFilter, GroupListItem, NamespaceListItem,
|
||||||
SnapshotListItem,
|
Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
|
||||||
|
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY,
|
||||||
};
|
};
|
||||||
|
|
||||||
use pbs_client::{
|
use pbs_client::{
|
||||||
@ -43,10 +46,16 @@ pub struct PullParameters {
|
|||||||
source: BackupRepository,
|
source: BackupRepository,
|
||||||
/// Local store that is pulled into
|
/// Local store that is pulled into
|
||||||
store: Arc<DataStore>,
|
store: Arc<DataStore>,
|
||||||
|
/// Remote namespace
|
||||||
|
remote_ns: BackupNamespace,
|
||||||
|
/// Local namespace (anchor)
|
||||||
|
ns: BackupNamespace,
|
||||||
/// Owner of synced groups (needs to match local owner of pre-existing groups)
|
/// Owner of synced groups (needs to match local owner of pre-existing groups)
|
||||||
owner: Authid,
|
owner: Authid,
|
||||||
/// Whether to remove groups which exist locally, but not on the remote end
|
/// Whether to remove groups which exist locally, but not on the remote end
|
||||||
remove_vanished: bool,
|
remove_vanished: bool,
|
||||||
|
/// How many levels of sub-namespaces to pull (0 == no recursion)
|
||||||
|
max_depth: usize,
|
||||||
/// Filters for reducing the pull scope
|
/// Filters for reducing the pull scope
|
||||||
group_filter: Option<Vec<GroupFilter>>,
|
group_filter: Option<Vec<GroupFilter>>,
|
||||||
/// Rate limits for all transfers from `remote`
|
/// Rate limits for all transfers from `remote`
|
||||||
@ -60,15 +69,20 @@ impl PullParameters {
|
|||||||
/// [BackupRepository] with `remote_store`.
|
/// [BackupRepository] with `remote_store`.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
store: &str,
|
store: &str,
|
||||||
|
ns: BackupNamespace,
|
||||||
remote: &str,
|
remote: &str,
|
||||||
remote_store: &str,
|
remote_store: &str,
|
||||||
|
remote_ns: BackupNamespace,
|
||||||
owner: Authid,
|
owner: Authid,
|
||||||
remove_vanished: Option<bool>,
|
remove_vanished: Option<bool>,
|
||||||
|
max_depth: usize,
|
||||||
group_filter: Option<Vec<GroupFilter>>,
|
group_filter: Option<Vec<GroupFilter>>,
|
||||||
limit: RateLimitConfig,
|
limit: RateLimitConfig,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<Self, Error> {
|
||||||
let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
|
let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
|
||||||
|
|
||||||
|
let max_depth = min(max_depth, MAX_NAMESPACE_DEPTH - remote_ns.depth());
|
||||||
|
|
||||||
let (remote_config, _digest) = pbs_config::remote::config()?;
|
let (remote_config, _digest) = pbs_config::remote::config()?;
|
||||||
let remote: Remote = remote_config.lookup("remote", remote)?;
|
let remote: Remote = remote_config.lookup("remote", remote)?;
|
||||||
|
|
||||||
@ -83,10 +97,13 @@ impl PullParameters {
|
|||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
remote,
|
remote,
|
||||||
|
remote_ns,
|
||||||
|
ns,
|
||||||
source,
|
source,
|
||||||
store,
|
store,
|
||||||
owner,
|
owner,
|
||||||
remove_vanished,
|
remove_vanished,
|
||||||
|
max_depth,
|
||||||
group_filter,
|
group_filter,
|
||||||
limit,
|
limit,
|
||||||
})
|
})
|
||||||
@ -96,6 +113,14 @@ impl PullParameters {
|
|||||||
pub async fn client(&self) -> Result<HttpClient, Error> {
|
pub async fn client(&self) -> Result<HttpClient, Error> {
|
||||||
crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
|
crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns DatastoreWithNamespace with namespace (or local namespace anchor).
|
||||||
|
pub fn store_with_ns(&self, ns: BackupNamespace) -> DatastoreWithNamespace {
|
||||||
|
DatastoreWithNamespace {
|
||||||
|
store: self.store.name().to_string(),
|
||||||
|
ns,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn pull_index_chunks<I: IndexFile>(
|
async fn pull_index_chunks<I: IndexFile>(
|
||||||
@ -240,14 +265,12 @@ async fn pull_single_archive(
|
|||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
reader: &BackupReader,
|
reader: &BackupReader,
|
||||||
chunk_reader: &mut RemoteChunkReader,
|
chunk_reader: &mut RemoteChunkReader,
|
||||||
tgt_store: Arc<DataStore>,
|
snapshot: &pbs_datastore::BackupDir,
|
||||||
snapshot: &pbs_api_types::BackupDir,
|
|
||||||
archive_info: &FileInfo,
|
archive_info: &FileInfo,
|
||||||
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let archive_name = &archive_info.filename;
|
let archive_name = &archive_info.filename;
|
||||||
let mut path = tgt_store.base_path();
|
let mut path = snapshot.full_path();
|
||||||
path.push(snapshot.to_string());
|
|
||||||
path.push(archive_name);
|
path.push(archive_name);
|
||||||
|
|
||||||
let mut tmp_path = path.clone();
|
let mut tmp_path = path.clone();
|
||||||
@ -274,7 +297,7 @@ async fn pull_single_archive(
|
|||||||
pull_index_chunks(
|
pull_index_chunks(
|
||||||
worker,
|
worker,
|
||||||
chunk_reader.clone(),
|
chunk_reader.clone(),
|
||||||
tgt_store.clone(),
|
snapshot.datastore().clone(),
|
||||||
index,
|
index,
|
||||||
downloaded_chunks,
|
downloaded_chunks,
|
||||||
)
|
)
|
||||||
@ -290,7 +313,7 @@ async fn pull_single_archive(
|
|||||||
pull_index_chunks(
|
pull_index_chunks(
|
||||||
worker,
|
worker,
|
||||||
chunk_reader.clone(),
|
chunk_reader.clone(),
|
||||||
tgt_store.clone(),
|
snapshot.datastore().clone(),
|
||||||
index,
|
index,
|
||||||
downloaded_chunks,
|
downloaded_chunks,
|
||||||
)
|
)
|
||||||
@ -347,18 +370,13 @@ async fn try_client_log_download(
|
|||||||
async fn pull_snapshot(
|
async fn pull_snapshot(
|
||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
reader: Arc<BackupReader>,
|
reader: Arc<BackupReader>,
|
||||||
tgt_store: Arc<DataStore>,
|
snapshot: &pbs_datastore::BackupDir,
|
||||||
snapshot: &pbs_api_types::BackupDir,
|
|
||||||
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let snapshot_relative_path = snapshot.to_string();
|
let mut manifest_name = snapshot.full_path();
|
||||||
|
|
||||||
let mut manifest_name = tgt_store.base_path();
|
|
||||||
manifest_name.push(&snapshot_relative_path);
|
|
||||||
manifest_name.push(MANIFEST_BLOB_NAME);
|
manifest_name.push(MANIFEST_BLOB_NAME);
|
||||||
|
|
||||||
let mut client_log_name = tgt_store.base_path();
|
let mut client_log_name = snapshot.full_path();
|
||||||
client_log_name.push(&snapshot_relative_path);
|
|
||||||
client_log_name.push(CLIENT_LOG_BLOB_NAME);
|
client_log_name.push(CLIENT_LOG_BLOB_NAME);
|
||||||
|
|
||||||
let mut tmp_manifest_name = manifest_name.clone();
|
let mut tmp_manifest_name = manifest_name.clone();
|
||||||
@ -424,8 +442,7 @@ async fn pull_snapshot(
|
|||||||
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
|
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
|
||||||
|
|
||||||
for item in manifest.files() {
|
for item in manifest.files() {
|
||||||
let mut path = tgt_store.base_path();
|
let mut path = snapshot.full_path();
|
||||||
path.push(&snapshot_relative_path);
|
|
||||||
path.push(&item.filename);
|
path.push(&item.filename);
|
||||||
|
|
||||||
if path.exists() {
|
if path.exists() {
|
||||||
@ -474,7 +491,6 @@ async fn pull_snapshot(
|
|||||||
worker,
|
worker,
|
||||||
&reader,
|
&reader,
|
||||||
&mut chunk_reader,
|
&mut chunk_reader,
|
||||||
tgt_store.clone(),
|
|
||||||
snapshot,
|
snapshot,
|
||||||
item,
|
item,
|
||||||
downloaded_chunks.clone(),
|
downloaded_chunks.clone(),
|
||||||
@ -491,38 +507,37 @@ async fn pull_snapshot(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// cleanup - remove stale files
|
// cleanup - remove stale files
|
||||||
tgt_store.cleanup_backup_dir(snapshot, &manifest)?;
|
snapshot
|
||||||
|
.datastore()
|
||||||
|
.cleanup_backup_dir(snapshot, &manifest)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pulls a `snapshot` into `tgt_store`, differentiating between new snapshots (removed on error)
|
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
|
||||||
/// and existing ones (kept even on error).
|
///
|
||||||
|
/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
|
||||||
|
/// pointing to the local datastore and target namespace.
|
||||||
async fn pull_snapshot_from(
|
async fn pull_snapshot_from(
|
||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
reader: Arc<BackupReader>,
|
reader: Arc<BackupReader>,
|
||||||
tgt_store: Arc<DataStore>,
|
snapshot: &pbs_datastore::BackupDir,
|
||||||
snapshot: &pbs_api_types::BackupDir,
|
|
||||||
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
// FIXME: Namespace support requires source AND target namespace
|
let (_path, is_new, _snap_lock) = snapshot
|
||||||
let ns = BackupNamespace::root();
|
.datastore()
|
||||||
let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&ns, snapshot)?;
|
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
|
||||||
|
|
||||||
let snapshot_path = snapshot.to_string();
|
let snapshot_path = snapshot.to_string();
|
||||||
if is_new {
|
if is_new {
|
||||||
task_log!(worker, "sync snapshot {:?}", snapshot_path);
|
task_log!(worker, "sync snapshot {:?}", snapshot_path);
|
||||||
|
|
||||||
if let Err(err) = pull_snapshot(
|
if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
|
||||||
worker,
|
if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
|
||||||
reader,
|
snapshot.backup_ns(),
|
||||||
tgt_store.clone(),
|
snapshot.as_ref(),
|
||||||
snapshot,
|
true,
|
||||||
downloaded_chunks,
|
) {
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
if let Err(cleanup_err) = tgt_store.remove_backup_dir(&ns, snapshot, true) {
|
|
||||||
task_log!(worker, "cleanup error - {}", cleanup_err);
|
task_log!(worker, "cleanup error - {}", cleanup_err);
|
||||||
}
|
}
|
||||||
return Err(err);
|
return Err(err);
|
||||||
@ -530,14 +545,7 @@ async fn pull_snapshot_from(
|
|||||||
task_log!(worker, "sync snapshot {:?} done", snapshot_path);
|
task_log!(worker, "sync snapshot {:?} done", snapshot_path);
|
||||||
} else {
|
} else {
|
||||||
task_log!(worker, "re-sync snapshot {:?}", snapshot_path);
|
task_log!(worker, "re-sync snapshot {:?}", snapshot_path);
|
||||||
pull_snapshot(
|
pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
|
||||||
worker,
|
|
||||||
reader,
|
|
||||||
tgt_store.clone(),
|
|
||||||
snapshot,
|
|
||||||
downloaded_chunks,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
task_log!(worker, "re-sync snapshot {:?} done", snapshot_path);
|
task_log!(worker, "re-sync snapshot {:?} done", snapshot_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -590,13 +598,18 @@ impl std::fmt::Display for SkipInfo {
|
|||||||
/// Pulls a group according to `params`.
|
/// Pulls a group according to `params`.
|
||||||
///
|
///
|
||||||
/// Pulling a group consists of the following steps:
|
/// Pulling a group consists of the following steps:
|
||||||
/// - Query the list of snapshots available for this group on the remote, sort by snapshot time
|
/// - Query the list of snapshots available for this group in the source namespace on the remote
|
||||||
|
/// - Sort by snapshot time
|
||||||
/// - Get last snapshot timestamp on local datastore
|
/// - Get last snapshot timestamp on local datastore
|
||||||
/// - Iterate over list of snapshots
|
/// - Iterate over list of snapshots
|
||||||
/// -- Recreate client/BackupReader
|
/// -- Recreate client/BackupReader
|
||||||
/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
|
/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
|
||||||
/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
|
/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
|
||||||
///
|
///
|
||||||
|
/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
|
||||||
|
/// remote when querying snapshots. This allows us to interact with old remotes that don't have
|
||||||
|
/// namespace support yet.
|
||||||
|
///
|
||||||
/// Permission checks:
|
/// Permission checks:
|
||||||
/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
|
/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
|
||||||
/// - local group owner is already checked by pull_store
|
/// - local group owner is already checked by pull_store
|
||||||
@ -605,21 +618,25 @@ async fn pull_group(
|
|||||||
client: &HttpClient,
|
client: &HttpClient,
|
||||||
params: &PullParameters,
|
params: &PullParameters,
|
||||||
group: &pbs_api_types::BackupGroup,
|
group: &pbs_api_types::BackupGroup,
|
||||||
|
remote_ns: BackupNamespace,
|
||||||
progress: &mut StoreProgress,
|
progress: &mut StoreProgress,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
// FIXME: Namespace support
|
|
||||||
let ns = BackupNamespace::root();
|
|
||||||
|
|
||||||
let path = format!(
|
let path = format!(
|
||||||
"api2/json/admin/datastore/{}/snapshots",
|
"api2/json/admin/datastore/{}/snapshots",
|
||||||
params.source.store()
|
params.source.store()
|
||||||
);
|
);
|
||||||
|
|
||||||
let args = json!({
|
let mut args = json!({
|
||||||
"backup-type": group.ty,
|
"backup-type": group.ty,
|
||||||
"backup-id": group.id,
|
"backup-id": group.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if !remote_ns.is_root() {
|
||||||
|
args["backup-ns"] = serde_json::to_value(&remote_ns)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
|
||||||
|
|
||||||
let mut result = client.get(&path, Some(args)).await?;
|
let mut result = client.get(&path, Some(args)).await?;
|
||||||
let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
|
let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
|
||||||
|
|
||||||
@ -629,7 +646,7 @@ async fn pull_group(
|
|||||||
|
|
||||||
let fingerprint = client.fingerprint();
|
let fingerprint = client.fingerprint();
|
||||||
|
|
||||||
let last_sync = params.store.last_successful_backup(&ns, group)?;
|
let last_sync = params.store.last_successful_backup(&target_ns, group)?;
|
||||||
|
|
||||||
let mut remote_snapshots = std::collections::HashSet::new();
|
let mut remote_snapshots = std::collections::HashSet::new();
|
||||||
|
|
||||||
@ -684,20 +701,15 @@ async fn pull_group(
|
|||||||
new_client,
|
new_client,
|
||||||
None,
|
None,
|
||||||
params.source.store(),
|
params.source.store(),
|
||||||
&ns,
|
&remote_ns,
|
||||||
&snapshot,
|
&snapshot,
|
||||||
true,
|
true,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let result = pull_snapshot_from(
|
let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
|
||||||
worker,
|
|
||||||
reader,
|
let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
|
||||||
params.store.clone(),
|
|
||||||
&snapshot,
|
|
||||||
downloaded_chunks.clone(),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
progress.done_snapshots = pos as u64 + 1;
|
progress.done_snapshots = pos as u64 + 1;
|
||||||
task_log!(worker, "percentage done: {}", progress);
|
task_log!(worker, "percentage done: {}", progress);
|
||||||
@ -706,7 +718,7 @@ async fn pull_group(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if params.remove_vanished {
|
if params.remove_vanished {
|
||||||
let group = params.store.backup_group(ns.clone(), group.clone());
|
let group = params.store.backup_group(target_ns.clone(), group.clone());
|
||||||
let local_list = group.list_backups()?;
|
let local_list = group.list_backups()?;
|
||||||
for info in local_list {
|
for info in local_list {
|
||||||
let backup_time = info.backup_dir.backup_time();
|
let backup_time = info.backup_dir.backup_time();
|
||||||
@ -728,7 +740,7 @@ async fn pull_group(
|
|||||||
);
|
);
|
||||||
params
|
params
|
||||||
.store
|
.store
|
||||||
.remove_backup_dir(&ns, info.backup_dir.as_ref(), false)?;
|
.remove_backup_dir(&target_ns, info.backup_dir.as_ref(), false)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -739,37 +751,290 @@ async fn pull_group(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn query_namespaces(
|
||||||
|
client: &HttpClient,
|
||||||
|
params: &PullParameters,
|
||||||
|
) -> Result<Vec<BackupNamespace>, Error> {
|
||||||
|
let path = format!(
|
||||||
|
"api2/json/admin/datastore/{}/namespace",
|
||||||
|
params.source.store()
|
||||||
|
);
|
||||||
|
let data = json!({
|
||||||
|
"max-depth": params.max_depth,
|
||||||
|
});
|
||||||
|
let mut result = client
|
||||||
|
.get(&path, Some(data))
|
||||||
|
.await
|
||||||
|
.map_err(|err| format_err!("Failed to retrieve namespaces from remote - {}", err))?;
|
||||||
|
let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
|
||||||
|
|
||||||
|
// parents first
|
||||||
|
list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
|
||||||
|
|
||||||
|
Ok(list.iter().map(|item| item.ns.clone()).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_ns_privs(
|
||||||
|
store_with_ns: &DatastoreWithNamespace,
|
||||||
|
owner: &Authid,
|
||||||
|
privs: u64,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let user_info = CachedUserInfo::new()?;
|
||||||
|
|
||||||
|
// TODO re-sync with API, maybe find common place?
|
||||||
|
|
||||||
|
let user_privs = user_info.lookup_privs(owner, &store_with_ns.acl_path());
|
||||||
|
|
||||||
|
if (user_privs & privs) == 0 {
|
||||||
|
bail!("no permission to modify parent/datastore.");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_and_create_ns(
|
||||||
|
params: &PullParameters,
|
||||||
|
store_with_ns: &DatastoreWithNamespace,
|
||||||
|
) -> Result<bool, Error> {
|
||||||
|
let ns = &store_with_ns.ns;
|
||||||
|
let mut created = false;
|
||||||
|
|
||||||
|
if !ns.is_root() && !params.store.namespace_path(&ns).exists() {
|
||||||
|
let mut parent = ns.clone();
|
||||||
|
let name = parent.pop();
|
||||||
|
|
||||||
|
let parent = params.store_with_ns(parent);
|
||||||
|
|
||||||
|
if let Err(err) = check_ns_privs(&parent, ¶ms.owner, PRIV_DATASTORE_MODIFY) {
|
||||||
|
bail!(
|
||||||
|
"Not allowed to create namespace {} - {}",
|
||||||
|
store_with_ns,
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Some(name) = name {
|
||||||
|
if let Err(err) = params.store.create_namespace(&parent.ns, name) {
|
||||||
|
bail!(
|
||||||
|
"sync namespace {} failed - namespace creation failed: {}",
|
||||||
|
&store_with_ns,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
created = true;
|
||||||
|
} else {
|
||||||
|
bail!(
|
||||||
|
"sync namespace {} failed - namespace creation failed - couldn't determine parent namespace",
|
||||||
|
&store_with_ns,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO re-sync with API, maybe find common place?
|
||||||
|
if let Err(err) = check_ns_privs(&store_with_ns, ¶ms.owner, PRIV_DATASTORE_BACKUP) {
|
||||||
|
bail!("sync namespace {} failed - {}", &store_with_ns, err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(created)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
|
||||||
|
let parent = local_ns.clone().parent();
|
||||||
|
check_ns_privs(
|
||||||
|
¶ms.store_with_ns(parent),
|
||||||
|
¶ms.owner,
|
||||||
|
PRIV_DATASTORE_MODIFY,
|
||||||
|
)?;
|
||||||
|
params.store.remove_namespace_recursive(local_ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_and_remove_vanished_ns(
|
||||||
|
worker: &WorkerTask,
|
||||||
|
params: &PullParameters,
|
||||||
|
synced_ns: HashSet<BackupNamespace>,
|
||||||
|
) -> Result<bool, Error> {
|
||||||
|
let mut errors = false;
|
||||||
|
let user_info = CachedUserInfo::new()?;
|
||||||
|
|
||||||
|
let mut local_ns_list: Vec<BackupNamespace> = params
|
||||||
|
.store
|
||||||
|
.recursive_iter_backup_ns_ok(params.ns.clone(), Some(params.max_depth))?
|
||||||
|
.filter(|ns| {
|
||||||
|
let store_with_ns = params.store_with_ns(ns.clone());
|
||||||
|
let user_privs = user_info.lookup_privs(¶ms.owner, &store_with_ns.acl_path());
|
||||||
|
user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// children first!
|
||||||
|
local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
|
||||||
|
|
||||||
|
for local_ns in local_ns_list {
|
||||||
|
if local_ns == params.ns {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if synced_ns.contains(&local_ns) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if local_ns.is_root() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
match check_and_remove_ns(params, &local_ns) {
|
||||||
|
Ok(true) => task_log!(worker, "Removed namespace {}", local_ns),
|
||||||
|
Ok(false) => task_log!(
|
||||||
|
worker,
|
||||||
|
"Did not remove namespace {} - protected snapshots remain",
|
||||||
|
local_ns
|
||||||
|
),
|
||||||
|
Err(err) => {
|
||||||
|
task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err);
|
||||||
|
errors = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(errors)
|
||||||
|
}
|
||||||
|
|
||||||
/// Pulls a store according to `params`.
|
/// Pulls a store according to `params`.
|
||||||
///
|
///
|
||||||
/// Pulling a store consists of the following steps:
|
/// Pulling a store consists of the following steps:
|
||||||
/// - Query list of groups on the remote
|
/// - Query list of namespaces on the remote
|
||||||
|
/// - Iterate list
|
||||||
|
/// -- create sub-NS if needed (and allowed)
|
||||||
|
/// -- attempt to pull each NS in turn
|
||||||
|
/// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote
|
||||||
|
///
|
||||||
|
/// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is
|
||||||
|
/// passed to the remote at all to allow pulling from remotes which have no notion of namespaces.
|
||||||
|
///
|
||||||
|
/// Permission checks:
|
||||||
|
/// - access to local datastore, namespace anchor and remote entry need to be checked at call site
|
||||||
|
/// - remote namespaces are filtered by remote
|
||||||
|
/// - creation and removal of sub-NS checked here
|
||||||
|
/// - access to sub-NS checked here
|
||||||
|
pub async fn pull_store(
|
||||||
|
worker: &WorkerTask,
|
||||||
|
client: &HttpClient,
|
||||||
|
params: &PullParameters,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
// explicit create shared lock to prevent GC on newly created chunks
|
||||||
|
let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
|
||||||
|
|
||||||
|
let namespaces = if params.remote_ns.is_root() && params.max_depth == 0 {
|
||||||
|
vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
|
||||||
|
} else {
|
||||||
|
query_namespaces(client, params).await?
|
||||||
|
};
|
||||||
|
|
||||||
|
let (mut groups, mut snapshots) = (0, 0);
|
||||||
|
let mut synced_ns = HashSet::with_capacity(namespaces.len());
|
||||||
|
let mut errors = false;
|
||||||
|
|
||||||
|
for namespace in namespaces {
|
||||||
|
let source_store_ns = DatastoreWithNamespace {
|
||||||
|
store: params.source.store().to_owned(),
|
||||||
|
ns: namespace.clone(),
|
||||||
|
};
|
||||||
|
let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
|
||||||
|
let target_store_ns = params.store_with_ns(target_ns.clone());
|
||||||
|
|
||||||
|
task_log!(worker, "----");
|
||||||
|
task_log!(
|
||||||
|
worker,
|
||||||
|
"Syncing {} into {}",
|
||||||
|
source_store_ns,
|
||||||
|
target_store_ns
|
||||||
|
);
|
||||||
|
|
||||||
|
synced_ns.insert(target_ns.clone());
|
||||||
|
|
||||||
|
match check_and_create_ns(params, &target_store_ns) {
|
||||||
|
Ok(true) => task_log!(worker, "Created namespace {}", target_ns),
|
||||||
|
Ok(false) => {}
|
||||||
|
Err(err) => {
|
||||||
|
task_log!(
|
||||||
|
worker,
|
||||||
|
"Cannot sync {} into {} - {}",
|
||||||
|
source_store_ns,
|
||||||
|
target_store_ns,
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
errors = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match pull_ns(worker, client, params, namespace.clone(), target_ns).await {
|
||||||
|
Ok((ns_progress, ns_errors)) => {
|
||||||
|
errors |= ns_errors;
|
||||||
|
|
||||||
|
if params.max_depth > 0 {
|
||||||
|
groups += ns_progress.done_groups;
|
||||||
|
snapshots += ns_progress.done_snapshots;
|
||||||
|
task_log!(
|
||||||
|
worker,
|
||||||
|
"Finished syncing namespace {}, current progress: {} groups, {} snapshots",
|
||||||
|
namespace,
|
||||||
|
groups,
|
||||||
|
snapshots,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
errors = true;
|
||||||
|
task_log!(
|
||||||
|
worker,
|
||||||
|
"Encountered errors while syncing namespace {} - {}",
|
||||||
|
namespace,
|
||||||
|
err,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if params.remove_vanished {
|
||||||
|
errors |= check_and_remove_vanished_ns(worker, params, synced_ns)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors {
|
||||||
|
bail!("sync failed with some errors.");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pulls a namespace according to `params`.
|
||||||
|
///
|
||||||
|
/// Pulling a namespace consists of the following steps:
|
||||||
|
/// - Query list of groups on the remote (in `source_ns`)
|
||||||
/// - Filter list according to configured group filters
|
/// - Filter list according to configured group filters
|
||||||
/// - Iterate list and attempt to pull each group in turn
|
/// - Iterate list and attempt to pull each group in turn
|
||||||
/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
|
/// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are
|
||||||
/// not or no longer available on the remote
|
/// not or no longer available on the remote
|
||||||
///
|
///
|
||||||
/// Permission checks:
|
/// Permission checks:
|
||||||
/// - access to local datastore and remote entry need to be checked at call site
|
/// - remote namespaces are filtered by remote
|
||||||
/// - remote groups are filtered by remote
|
|
||||||
/// - owner check for vanished groups done here
|
/// - owner check for vanished groups done here
|
||||||
pub async fn pull_store(
|
pub async fn pull_ns(
|
||||||
worker: &WorkerTask,
|
worker: &WorkerTask,
|
||||||
client: &HttpClient,
|
client: &HttpClient,
|
||||||
params: &PullParameters,
|
params: &PullParameters,
|
||||||
) -> Result<(), Error> {
|
source_ns: BackupNamespace,
|
||||||
// FIXME: Namespace support requires source AND target namespace
|
target_ns: BackupNamespace,
|
||||||
let ns = BackupNamespace::root();
|
) -> Result<(StoreProgress, bool), Error> {
|
||||||
let local_ns = BackupNamespace::root();
|
|
||||||
|
|
||||||
// explicit create shared lock to prevent GC on newly created chunks
|
|
||||||
let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
|
|
||||||
|
|
||||||
// FIXME: Namespaces! AND: If we make this API call recurse down namespaces we need to do the
|
|
||||||
// same down in the `remove_vanished` case!
|
|
||||||
let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
|
let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
|
||||||
|
|
||||||
|
let args = if !source_ns.is_root() {
|
||||||
|
Some(json!({
|
||||||
|
"backup-ns": source_ns,
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let mut result = client
|
let mut result = client
|
||||||
.get(&path, None)
|
.get(&path, args)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
|
.map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
|
||||||
|
|
||||||
@ -789,6 +1054,7 @@ pub async fn pull_store(
|
|||||||
filters.iter().any(|filter| group.matches(filter))
|
filters.iter().any(|filter| group.matches(filter))
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Get groups with target NS set
|
||||||
let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
|
let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
|
||||||
|
|
||||||
let list = if let Some(ref group_filter) = ¶ms.group_filter {
|
let list = if let Some(ref group_filter) = ¶ms.group_filter {
|
||||||
@ -826,7 +1092,7 @@ pub async fn pull_store(
|
|||||||
let (owner, _lock_guard) =
|
let (owner, _lock_guard) =
|
||||||
match params
|
match params
|
||||||
.store
|
.store
|
||||||
.create_locked_backup_group(&ns, &group, ¶ms.owner)
|
.create_locked_backup_group(&target_ns, &group, ¶ms.owner)
|
||||||
{
|
{
|
||||||
Ok(result) => result,
|
Ok(result) => result,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@ -852,7 +1118,16 @@ pub async fn pull_store(
|
|||||||
owner
|
owner
|
||||||
);
|
);
|
||||||
errors = true; // do not stop here, instead continue
|
errors = true; // do not stop here, instead continue
|
||||||
} else if let Err(err) = pull_group(worker, client, params, &group, &mut progress).await {
|
} else if let Err(err) = pull_group(
|
||||||
|
worker,
|
||||||
|
client,
|
||||||
|
params,
|
||||||
|
&group,
|
||||||
|
source_ns.clone(),
|
||||||
|
&mut progress,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
task_log!(worker, "sync group {} failed - {}", &group, err,);
|
task_log!(worker, "sync group {} failed - {}", &group, err,);
|
||||||
errors = true; // do not stop here, instead continue
|
errors = true; // do not stop here, instead continue
|
||||||
}
|
}
|
||||||
@ -860,13 +1135,12 @@ pub async fn pull_store(
|
|||||||
|
|
||||||
if params.remove_vanished {
|
if params.remove_vanished {
|
||||||
let result: Result<(), Error> = proxmox_lang::try_block!({
|
let result: Result<(), Error> = proxmox_lang::try_block!({
|
||||||
// FIXME: See above comment about namespaces & recursion
|
for local_group in params.store.iter_backup_groups(target_ns.clone())? {
|
||||||
for local_group in params.store.iter_backup_groups(Default::default())? {
|
|
||||||
let local_group = local_group?;
|
let local_group = local_group?;
|
||||||
if new_groups.contains(local_group.as_ref()) {
|
if new_groups.contains(local_group.as_ref()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let owner = params.store.get_owner(&local_ns, &local_group.group())?;
|
let owner = params.store.get_owner(&target_ns, local_group.group())?;
|
||||||
if check_backup_owner(&owner, ¶ms.owner).is_err() {
|
if check_backup_owner(&owner, ¶ms.owner).is_err() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -881,7 +1155,10 @@ pub async fn pull_store(
|
|||||||
local_group.backup_type(),
|
local_group.backup_type(),
|
||||||
local_group.backup_id()
|
local_group.backup_id()
|
||||||
);
|
);
|
||||||
match params.store.remove_backup_group(&ns, local_group.as_ref()) {
|
match params
|
||||||
|
.store
|
||||||
|
.remove_backup_group(&target_ns, local_group.as_ref())
|
||||||
|
{
|
||||||
Ok(true) => {}
|
Ok(true) => {}
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
task_log!(
|
task_log!(
|
||||||
@ -904,9 +1181,5 @@ pub async fn pull_store(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if errors {
|
Ok((progress, errors))
|
||||||
bail!("sync failed with some errors.");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user