api: tape/restore: add optional namespace map to DataStoreMap

and change the interface from 'get_datastore' to 'get_targets'

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
Dominik Csapak 2022-05-05 15:54:50 +02:00
parent be97e0a55b
commit 6b61d319c5
1 changed files with 72 additions and 41 deletions

View File

@ -55,6 +55,7 @@ const RESTORE_TMP_DIR: &str = "/var/tmp/proxmox-backup";
pub struct DataStoreMap {
map: HashMap<String, Arc<DataStore>>,
default: Option<Arc<DataStore>>,
ns_map: Option<NamespaceMap>,
}
struct NamespaceMap {
@ -149,33 +150,50 @@ impl TryFrom<String> for DataStoreMap {
}
}
Ok(Self { map, default })
Ok(Self {
map,
default,
ns_map: None,
})
}
}
impl DataStoreMap {
fn used_datastores<'a>(&self) -> HashMap<&str, Arc<DataStore>> {
fn add_namespaces_maps(&mut self, mappings: Vec<String>) -> Result<bool, Error> {
let count = mappings.len();
let ns_map = NamespaceMap::try_from(mappings)?;
self.ns_map = Some(ns_map);
Ok(count > 0)
}
fn used_datastores(&self) -> HashMap<&str, (Arc<DataStore>, Option<HashSet<BackupNamespace>>)> {
let mut map = HashMap::new();
for (source, target) in self.map.iter() {
map.insert(source.as_str(), Arc::clone(target));
let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source));
map.insert(source.as_str(), (Arc::clone(target), ns));
}
if let Some(ref store) = self.default {
map.insert("", Arc::clone(store));
map.insert("", (Arc::clone(store), None));
}
map
}
fn get_datastore(&self, source: &str) -> Option<Arc<DataStore>> {
if let Some(store) = self.map.get(source) {
return Some(Arc::clone(store));
}
if let Some(ref store) = self.default {
return Some(Arc::clone(store));
fn get_targets(
&self,
source_ds: &str,
source_ns: &BackupNamespace,
) -> Option<(Arc<DataStore>, Option<Vec<BackupNamespace>>)> {
if let Some(store) = self.map.get(source_ds).or(self.default.as_ref()) {
let ns = self
.ns_map
.as_ref()
.map(|map| map.get_namespaces(source_ds, source_ns));
return Some((Arc::clone(store), ns));
}
return None;
None
}
}
@ -266,7 +284,7 @@ pub fn restore(
bail!("no datastores given");
}
for (_, target) in used_datastores.iter() {
for (_, (target, _)) in used_datastores.iter() {
check_datastore_privs(&user_info, target.name(), &auth_id, &owner)?;
}
@ -299,7 +317,7 @@ pub fn restore(
let taskid = used_datastores
.iter()
.map(|(_, t)| t.name().to_string())
.map(|(_, (t, _))| t.name().to_string())
.collect::<Vec<String>>()
.join(", ");
@ -415,7 +433,7 @@ fn restore_full_worker(
store_map
.used_datastores()
.into_iter()
.map(|(_, t)| String::from(t.name()))
.map(|(_, (t, _))| String::from(t.name()))
.collect::<Vec<String>>()
.join(", "),
);
@ -432,7 +450,7 @@ fn restore_full_worker(
);
let mut datastore_locks = Vec::new();
for (_, target) in store_map.used_datastores() {
for (_, (target, _)) in store_map.used_datastores() {
// explicit create shared lock to prevent GC on newly created chunks
let shared_store_lock = target.try_shared_chunk_store_lock()?;
datastore_locks.push(shared_store_lock);
@ -492,12 +510,15 @@ fn restore_list_worker(
.ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
let backup_dir: pbs_api_types::BackupDir = snapshot.parse()?;
let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| {
format_err!(
"could not find mapping for source datastore: {}",
source_datastore
)
})?;
// FIXME ns
let (datastore, _) = store_map
.get_targets(source_datastore, &ns)
.ok_or_else(|| {
format_err!(
"could not find mapping for source datastore: {}",
source_datastore
)
})?;
let (owner, _group_lock) =
datastore.create_locked_backup_group(&ns, backup_dir.as_ref(), restore_owner)?;
@ -593,12 +614,14 @@ fn restore_list_worker(
BTreeMap::new();
for (source_datastore, chunks) in datastore_chunk_map.into_iter() {
let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
format_err!(
"could not find mapping for source datastore: {}",
source_datastore
)
})?;
let (datastore, _) = store_map
.get_targets(&source_datastore, &Default::default())
.ok_or_else(|| {
format_err!(
"could not find mapping for source datastore: {}",
source_datastore
)
})?;
for digest in chunks.into_iter() {
// we only want to restore chunks that we do not have yet
if !datastore.cond_touch_chunk(&digest, false)? {
@ -649,9 +672,15 @@ fn restore_list_worker(
.ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
let backup_dir: pbs_api_types::BackupDir = snapshot.parse()?;
let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| {
format_err!("unexpected source datastore: {}", source_datastore)
})?;
// FIXME ns
let (datastore, _) =
store_map
.get_targets(source_datastore, &ns)
.ok_or_else(|| {
format_err!("unexpected source datastore: {}", source_datastore)
})?;
let ns = BackupNamespace::root();
let mut tmp_path = base_path.clone();
tmp_path.push(&source_datastore);
@ -871,9 +900,11 @@ fn restore_file_chunk_map(
source_datastore
);
let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
format_err!("unexpected chunk archive for store: {}", source_datastore)
})?;
let (datastore, _) = store_map
.get_targets(&source_datastore, &Default::default())
.ok_or_else(|| {
format_err!("unexpected chunk archive for store: {}", source_datastore)
})?;
let count = restore_partial_chunk_archive(
worker.clone(),
@ -1111,19 +1142,19 @@ fn restore_archive<'a>(
let backup_ns = BackupNamespace::root();
let backup_dir: pbs_api_types::BackupDir = snapshot.parse()?;
if let Some((store_map, authid)) = target.as_ref() {
if let Some(datastore) = store_map.get_datastore(&datastore_name) {
if let Some((store_map, restore_owner)) = target.as_ref() {
if let Some((datastore, _)) = store_map.get_targets(&datastore_name, &backup_ns) {
let (owner, _group_lock) = datastore.create_locked_backup_group(
&backup_ns,
backup_dir.as_ref(),
authid,
restore_owner,
)?;
if *authid != &owner {
if *restore_owner != &owner {
// only the owner is allowed to create additional snapshots
bail!(
"restore '{}' failed - owner check failed ({} != {})",
snapshot,
authid,
restore_owner,
owner
);
}
@ -1194,20 +1225,20 @@ fn restore_archive<'a>(
);
let datastore = target
.as_ref()
.and_then(|t| t.0.get_datastore(&source_datastore));
.and_then(|t| t.0.get_targets(&source_datastore, &Default::default()));
if datastore.is_some() || target.is_none() {
let checked_chunks = checked_chunks_map
.entry(
datastore
.as_ref()
.map(|d| d.name())
.map(|(d, _)| d.name())
.unwrap_or("_unused_")
.to_string(),
)
.or_insert(HashSet::new());
let chunks = if let Some(datastore) = datastore {
let chunks = if let Some((datastore, _)) = datastore {
restore_chunk_archive(
worker.clone(),
reader,