api2/tape/restore: add optional snapshots to 'restore'

this makes it possible to only restore some snapshots from a tape media-set
instead of the whole. If the user selects only a small part, this will
probably be faster (and definitely uses less space on the target
datastores).

the user has to provide a list of snapshots to restore in the form of
'store:type/group/id'
e.g. 'mystore:ct/100/2021-01-01T00:00:00Z'

we achieve this by first restoring the index to a temp dir, retrieving
a list of chunks, and using the catalog, we generate a list of
media/files that we need to (partially) restore.

finally, we copy the snapshots to the correct dir in the datastore,
and clean up the temp dir

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
Dominik Csapak 2021-05-11 12:50:04 +02:00 committed by Dietmar Maurer
parent fa9507020a
commit ff99780303

View File

@ -1,6 +1,6 @@
use std::path::Path; use std::path::{Path, PathBuf};
use std::ffi::OsStr; use std::ffi::OsStr;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet, BTreeMap};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::io::{Seek, SeekFrom}; use std::io::{Seek, SeekFrom};
use std::sync::Arc; use std::sync::Arc;
@ -40,6 +40,7 @@ use crate::{
UPID_SCHEMA, UPID_SCHEMA,
Authid, Authid,
Userid, Userid,
TAPE_RESTORE_SNAPSHOT_SCHEMA,
}, },
config::{ config::{
self, self,
@ -51,9 +52,14 @@ use crate::{
}, },
}, },
backup::{ backup::{
ArchiveType,
archive_type,
IndexFile,
MANIFEST_BLOB_NAME, MANIFEST_BLOB_NAME,
CryptMode, CryptMode,
DataStore, DataStore,
DynamicIndexReader,
FixedIndexReader,
BackupDir, BackupDir,
DataBlob, DataBlob,
BackupManifest, BackupManifest,
@ -69,6 +75,7 @@ use crate::{
MediaId, MediaId,
MediaSet, MediaSet,
MediaCatalog, MediaCatalog,
MediaSetCatalog,
Inventory, Inventory,
lock_media_set, lock_media_set,
file_formats::{ file_formats::{
@ -95,6 +102,8 @@ use crate::{
}, },
}; };
const RESTORE_TMP_DIR: &str = "/var/tmp/proxmox-backup";
pub struct DataStoreMap { pub struct DataStoreMap {
map: HashMap<String, Arc<DataStore>>, map: HashMap<String, Arc<DataStore>>,
default: Option<Arc<DataStore>>, default: Option<Arc<DataStore>>,
@ -200,6 +209,14 @@ pub const ROUTER: Router = Router::new().post(&API_METHOD_RESTORE);
type: Userid, type: Userid,
optional: true, optional: true,
}, },
"snapshots": {
description: "List of snapshots.",
type: Array,
optional: true,
items: {
schema: TAPE_RESTORE_SNAPSHOT_SCHEMA,
},
},
owner: { owner: {
type: Authid, type: Authid,
optional: true, optional: true,
@ -222,6 +239,7 @@ pub fn restore(
drive: String, drive: String,
media_set: String, media_set: String,
notify_user: Option<Userid>, notify_user: Option<Userid>,
snapshots: Option<Vec<String>>,
owner: Option<Authid>, owner: Option<Authid>,
rpcenv: &mut dyn RpcEnvironment, rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> { ) -> Result<Value, Error> {
@ -271,6 +289,7 @@ pub fn restore(
.map(|s| s.to_string()) .map(|s| s.to_string())
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join(", "); .join(", ");
let upid_str = WorkerTask::new_thread( let upid_str = WorkerTask::new_thread(
"tape-restore", "tape-restore",
Some(taskid), Some(taskid),
@ -288,10 +307,23 @@ pub fn restore(
.and_then(|userid| lookup_user_email(userid)) .and_then(|userid| lookup_user_email(userid))
.or_else(|| lookup_user_email(&auth_id.clone().into())); .or_else(|| lookup_user_email(&auth_id.clone().into()));
task_log!(worker, "Mediaset '{}'", media_set);
task_log!(worker, "Restore mediaset '{}'", media_set);
task_log!(worker, "Pool: {}", pool); task_log!(worker, "Pool: {}", pool);
let res = restore_worker(
let res = if let Some(snapshots) = snapshots {
restore_list_worker(
worker.clone(),
snapshots,
inventory,
media_set_uuid,
drive_config,
&drive,
store_map,
restore_owner,
email,
)
} else {
restore_full_worker(
worker.clone(), worker.clone(),
inventory, inventory,
media_set_uuid, media_set_uuid,
@ -299,9 +331,13 @@ pub fn restore(
&drive, &drive,
store_map, store_map,
restore_owner, restore_owner,
email email,
); )
};
if res.is_ok() {
task_log!(worker, "Restore mediaset '{}' done", media_set); task_log!(worker, "Restore mediaset '{}' done", media_set);
}
if let Err(err) = set_tape_device_state(&drive, "") { if let Err(err) = set_tape_device_state(&drive, "") {
task_log!( task_log!(
@ -319,7 +355,7 @@ pub fn restore(
Ok(upid_str.into()) Ok(upid_str.into())
} }
fn restore_worker( fn restore_full_worker(
worker: Arc<WorkerTask>, worker: Arc<WorkerTask>,
inventory: Inventory, inventory: Inventory,
media_set_uuid: Uuid, media_set_uuid: Uuid,
@ -406,6 +442,477 @@ fn restore_worker(
Ok(()) Ok(())
} }
fn restore_list_worker(
worker: Arc<WorkerTask>,
snapshots: Vec<String>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
store_map: DataStoreMap,
restore_owner: &Authid,
email: Option<String>,
) -> Result<(), Error> {
let base_path: PathBuf = format!("{}/{}", RESTORE_TMP_DIR, media_set_uuid).into();
std::fs::create_dir_all(&base_path)?;
let catalog = get_media_set_catalog(&inventory, &media_set_uuid)?;
let mut datastore_locks = Vec::new();
let mut snapshot_file_hash: BTreeMap<Uuid, Vec<u64>> = BTreeMap::new();
let mut snapshot_locks = HashMap::new();
let res = proxmox::try_block!({
// assemble snapshot files/locks
for store_snapshot in snapshots.iter() {
let mut split = store_snapshot.splitn(2, ':');
let source_datastore = split
.next()
.ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
let snapshot = split
.next()
.ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
let backup_dir: BackupDir = snapshot.parse()?;
let datastore = store_map.get_datastore(source_datastore).ok_or_else(|| {
format_err!(
"could not find mapping for source datastore: {}",
source_datastore
)
})?;
let (owner, _group_lock) =
datastore.create_locked_backup_group(backup_dir.group(), &restore_owner)?;
if restore_owner != &owner {
// only the owner is allowed to create additional snapshots
bail!(
"restore '{}' failed - owner check failed ({} != {})",
snapshot,
restore_owner,
owner
);
}
let (media_id, file_num) = if let Some((media_uuid, file_num)) =
catalog.lookup_snapshot(&source_datastore, &snapshot)
{
let media_id = inventory.lookup_media(media_uuid).unwrap();
(media_id, file_num)
} else {
task_warn!(
worker,
"did not find snapshot '{}' in media set {}",
snapshot,
media_set_uuid
);
continue;
};
let (_rel_path, is_new, snap_lock) = datastore.create_locked_backup_dir(&backup_dir)?;
if !is_new {
task_log!(
worker,
"found snapshot {} on target datastore, skipping...",
snapshot
);
continue;
}
snapshot_locks.insert(store_snapshot.to_string(), snap_lock);
let shared_store_lock = datastore.try_shared_chunk_store_lock()?;
datastore_locks.push(shared_store_lock);
let file_list = snapshot_file_hash
.entry(media_id.label.uuid.clone())
.or_insert_with(Vec::new);
file_list.push(file_num);
task_log!(
worker,
"found snapshot {} on {}: file {}",
snapshot,
media_id.label.label_text,
file_num
);
}
if snapshot_file_hash.is_empty() {
task_log!(worker, "nothing to restore, skipping remaining phases...");
return Ok(());
}
task_log!(worker, "Phase 1: temporarily restore snapshots to temp dir");
let mut datastore_chunk_map: HashMap<String, HashSet<[u8; 32]>> = HashMap::new();
for (media_uuid, file_list) in snapshot_file_hash.iter_mut() {
let media_id = inventory.lookup_media(media_uuid).unwrap();
let (drive, info) = request_and_load_media(
&worker,
&drive_config,
&drive_name,
&media_id.label,
&email,
)?;
file_list.sort_unstable();
restore_snapshots_to_tmpdir(
worker.clone(),
&base_path,
file_list,
drive,
&info,
&media_set_uuid,
&mut datastore_chunk_map,
).map_err(|err| format_err!("could not restore snapshots to tmpdir: {}", err))?;
}
// sorted media_uuid => (sorted file_num => (set of digests)))
let mut media_file_chunk_map: BTreeMap<Uuid, BTreeMap<u64, HashSet<[u8; 32]>>> = BTreeMap::new();
for (source_datastore, chunks) in datastore_chunk_map.into_iter() {
let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
format_err!(
"could not find mapping for source datastore: {}",
source_datastore
)
})?;
for digest in chunks.into_iter() {
// we only want to restore chunks that we do not have yet
if !datastore.cond_touch_chunk(&digest, false)? {
if let Some((uuid, nr)) = catalog.lookup_chunk(&source_datastore, &digest) {
let file = media_file_chunk_map.entry(uuid.clone()).or_insert_with(BTreeMap::new);
let chunks = file.entry(nr).or_insert_with(HashSet::new);
chunks.insert(digest);
}
}
}
}
// we do not need it anymore, saves memory
drop(catalog);
if !media_file_chunk_map.is_empty() {
task_log!(worker, "Phase 2: restore chunks to datastores");
} else {
task_log!(worker, "all chunks exist already, skipping phase 2...");
}
for (media_uuid, file_chunk_map) in media_file_chunk_map.iter_mut() {
let media_id = inventory.lookup_media(media_uuid).unwrap();
let (mut drive, _info) = request_and_load_media(
&worker,
&drive_config,
&drive_name,
&media_id.label,
&email,
)?;
restore_file_chunk_map(worker.clone(), &mut drive, &store_map, file_chunk_map)?;
}
task_log!(
worker,
"Phase 3: copy snapshots from temp dir to datastores"
);
for (store_snapshot, _lock) in snapshot_locks.into_iter() {
proxmox::try_block!({
let mut split = store_snapshot.splitn(2, ':');
let source_datastore = split
.next()
.ok_or_else(|| format_err!("invalid snapshot: {}", store_snapshot))?;
let snapshot = split
.next()
.ok_or_else(|| format_err!("invalid snapshot:{}", store_snapshot))?;
let backup_dir: BackupDir = snapshot.parse()?;
let datastore = store_map
.get_datastore(&source_datastore)
.ok_or_else(|| format_err!("unexpected source datastore: {}", source_datastore))?;
let mut tmp_path = base_path.clone();
tmp_path.push(&source_datastore);
tmp_path.push(snapshot);
let path = datastore.snapshot_path(&backup_dir);
for entry in std::fs::read_dir(tmp_path)? {
let entry = entry?;
let mut new_path = path.clone();
new_path.push(entry.file_name());
std::fs::copy(entry.path(), new_path)?;
}
task_log!(worker, "Restore snapshot '{}' done", snapshot);
Ok(())
}).map_err(|err: Error| format_err!("could not copy {}: {}", store_snapshot, err))?;
}
Ok(())
});
match std::fs::remove_dir_all(&base_path) {
Ok(()) => {}
Err(err) => task_warn!(worker, "error cleaning up: {}", err),
}
res
}
fn get_media_set_catalog(
inventory: &Inventory,
media_set_uuid: &Uuid,
) -> Result<MediaSetCatalog, Error> {
let status_path = Path::new(TAPE_STATUS_DIR);
let members = inventory.compute_media_set_members(media_set_uuid)?;
let media_list = members.media_list();
let mut catalog = MediaSetCatalog::new();
for (seq_nr, media_uuid) in media_list.iter().enumerate() {
match media_uuid {
None => {
bail!(
"media set {} is incomplete (missing member {}).",
media_set_uuid,
seq_nr
);
}
Some(media_uuid) => {
let media_id = inventory.lookup_media(media_uuid).unwrap();
let media_catalog = MediaCatalog::open(status_path, &media_id, false, false)?;
catalog.append_catalog(media_catalog)?;
}
}
}
Ok(catalog)
}
fn restore_snapshots_to_tmpdir(
worker: Arc<WorkerTask>,
path: &PathBuf,
file_list: &[u64],
mut drive: Box<dyn TapeDriver>,
media_id: &MediaId,
media_set_uuid: &Uuid,
chunks_list: &mut HashMap<String, HashSet<[u8; 32]>>,
) -> Result<(), Error> {
match media_id.media_set_label {
None => {
bail!(
"missing media set label on media {} ({})",
media_id.label.label_text,
media_id.label.uuid
);
}
Some(ref set) => {
if set.uuid != *media_set_uuid {
bail!(
"wrong media set label on media {} ({} != {})",
media_id.label.label_text,
media_id.label.uuid,
media_set_uuid
);
}
let encrypt_fingerprint = set.encryption_key_fingerprint.clone().map(|fp| {
task_log!(worker, "Encryption key fingerprint: {}", fp);
(fp, set.uuid.clone())
});
drive.set_encryption(encrypt_fingerprint)?;
}
}
for file_num in file_list {
let current_file_number = drive.current_file_number()?;
if current_file_number != *file_num {
task_log!(worker, "was at file {}, moving to {}", current_file_number, file_num);
drive.move_to_file(*file_num)?;
let current_file_number = drive.current_file_number()?;
task_log!(worker, "now at file {}", current_file_number);
}
let mut reader = drive.read_next_file()?;
let header: MediaContentHeader = unsafe { reader.read_le_value()? };
if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
bail!("missing MediaContentHeader");
}
match header.content_magic {
PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1 => {
let header_data = reader.read_exact_allocated(header.size as usize)?;
let archive_header: SnapshotArchiveHeader = serde_json::from_slice(&header_data)
.map_err(|err| {
format_err!("unable to parse snapshot archive header - {}", err)
})?;
let source_datastore = archive_header.store;
let snapshot = archive_header.snapshot;
task_log!(
worker,
"File {}: snapshot archive {}:{}",
file_num,
source_datastore,
snapshot
);
let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
let mut tmp_path = path.clone();
tmp_path.push(&source_datastore);
tmp_path.push(snapshot);
std::fs::create_dir_all(&tmp_path)?;
let chunks = chunks_list
.entry(source_datastore)
.or_insert_with(HashSet::new);
let manifest = try_restore_snapshot_archive(worker.clone(), &mut decoder, &tmp_path)?;
for item in manifest.files() {
let mut archive_path = tmp_path.to_owned();
archive_path.push(&item.filename);
let index: Box<dyn IndexFile> = match archive_type(&item.filename)? {
ArchiveType::DynamicIndex => {
Box::new(DynamicIndexReader::open(&archive_path)?)
}
ArchiveType::FixedIndex => {
Box::new(FixedIndexReader::open(&archive_path)?)
}
ArchiveType::Blob => continue,
};
for i in 0..index.index_count() {
if let Some(digest) = index.index_digest(i) {
chunks.insert(*digest);
}
}
}
}
other => bail!("unexpected file type: {:?}", other),
}
}
Ok(())
}
fn restore_file_chunk_map(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
store_map: &DataStoreMap,
file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
) -> Result<(), Error> {
for (nr, chunk_map) in file_chunk_map.iter_mut() {
let current_file_number = drive.current_file_number()?;
if current_file_number != *nr {
task_log!(worker, "was at file {}, moving to {}", current_file_number, nr);
drive.move_to_file(*nr)?;
let current_file_number = drive.current_file_number()?;
task_log!(worker, "now at file {}", current_file_number);
}
let mut reader = drive.read_next_file()?;
let header: MediaContentHeader = unsafe { reader.read_le_value()? };
if header.magic != PROXMOX_BACKUP_CONTENT_HEADER_MAGIC_1_0 {
bail!("missing MediaContentHeader");
}
match header.content_magic {
PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1 => {
let header_data = reader.read_exact_allocated(header.size as usize)?;
let archive_header: ChunkArchiveHeader = serde_json::from_slice(&header_data)
.map_err(|err| format_err!("unable to parse chunk archive header - {}", err))?;
let source_datastore = archive_header.store;
task_log!(
worker,
"File {}: chunk archive for datastore '{}'",
nr,
source_datastore
);
let datastore = store_map.get_datastore(&source_datastore).ok_or_else(|| {
format_err!("unexpected chunk archive for store: {}", source_datastore)
})?;
let count = restore_partial_chunk_archive(worker.clone(), reader, datastore.clone(), chunk_map)?;
task_log!(worker, "restored {} chunks", count);
}
_ => bail!("unexpected content magic {:?}", header.content_magic),
}
}
Ok(())
}
fn restore_partial_chunk_archive<'a>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
datastore: Arc<DataStore>,
chunk_list: &mut HashSet<[u8; 32]>,
) -> Result<usize, Error> {
let mut decoder = ChunkArchiveDecoder::new(reader);
let mut count = 0;
let start_time = std::time::SystemTime::now();
let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
let bytes2 = bytes.clone();
let writer_pool = ParallelHandler::new(
"tape restore chunk writer",
4,
move |(chunk, digest): (DataBlob, [u8; 32])| {
if !datastore.cond_touch_chunk(&digest, false)? {
bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
chunk.verify_crc()?;
if chunk.crypt_mode()? == CryptMode::None {
chunk.decode(None, Some(&digest))?; // verify digest
}
datastore.insert_chunk(&chunk, &digest)?;
}
Ok(())
},
);
let verify_and_write_channel = writer_pool.channel();
loop {
let (digest, blob) = match decoder.next_chunk()? {
Some((digest, blob)) => (digest, blob),
None => break,
};
worker.check_abort()?;
if chunk_list.remove(&digest) {
verify_and_write_channel.send((blob, digest.clone()))?;
count += 1;
}
if chunk_list.is_empty() {
break;
}
}
drop(verify_and_write_channel);
writer_pool.complete()?;
let elapsed = start_time.elapsed()?.as_secs_f64();
let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
task_log!(
worker,
"restored {} bytes ({:.2} MB/s)",
bytes,
(bytes as f64) / (1_000_000.0 * elapsed)
);
Ok(count)
}
/// Request and restore complete media without using existing catalog (create catalog instead) /// Request and restore complete media without using existing catalog (create catalog instead)
pub fn request_and_restore_media( pub fn request_and_restore_media(
worker: Arc<WorkerTask>, worker: Arc<WorkerTask>,