tape/restore: optimize chunk restore behaviour
by checking the 'checked_chunks' before trying to write to disk and by doing the existance check in the parallel handler. This way, we do not have to check the existance of a chunk multiple times (if multiple source datastores gets restored to the same target datastore) and also we do not have to wait on the stat before reading the next chunk. We have to change the &WorkerTask to an Arc though, otherwise we cannot log to the worker from the parallel handler Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
parent
4cba875379
commit
49f9aca627
@ -1336,7 +1336,7 @@ pub fn catalog_media(
|
|||||||
drive.read_label()?; // skip over labels - we already read them above
|
drive.read_label()?; // skip over labels - we already read them above
|
||||||
|
|
||||||
let mut checked_chunks = HashMap::new();
|
let mut checked_chunks = HashMap::new();
|
||||||
restore_media(&worker, &mut drive, &media_id, None, &mut checked_chunks, verbose)?;
|
restore_media(worker, &mut drive, &media_id, None, &mut checked_chunks, verbose)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
|
@ -334,7 +334,7 @@ pub fn restore(
|
|||||||
|
|
||||||
for media_id in media_id_list.iter() {
|
for media_id in media_id_list.iter() {
|
||||||
request_and_restore_media(
|
request_and_restore_media(
|
||||||
&worker,
|
worker.clone(),
|
||||||
media_id,
|
media_id,
|
||||||
&drive_config,
|
&drive_config,
|
||||||
&drive,
|
&drive,
|
||||||
@ -368,7 +368,7 @@ pub fn restore(
|
|||||||
|
|
||||||
/// Request and restore complete media without using existing catalog (create catalog instead)
|
/// Request and restore complete media without using existing catalog (create catalog instead)
|
||||||
pub fn request_and_restore_media(
|
pub fn request_and_restore_media(
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
media_id: &MediaId,
|
media_id: &MediaId,
|
||||||
drive_config: &SectionConfigData,
|
drive_config: &SectionConfigData,
|
||||||
drive_name: &str,
|
drive_name: &str,
|
||||||
@ -388,7 +388,7 @@ pub fn request_and_restore_media(
|
|||||||
.and_then(|userid| lookup_user_email(userid))
|
.and_then(|userid| lookup_user_email(userid))
|
||||||
.or_else(|| lookup_user_email(&authid.clone().into()));
|
.or_else(|| lookup_user_email(&authid.clone().into()));
|
||||||
|
|
||||||
let (mut drive, info) = request_and_load_media(worker, &drive_config, &drive_name, &media_id.label, &email)?;
|
let (mut drive, info) = request_and_load_media(&worker, &drive_config, &drive_name, &media_id.label, &email)?;
|
||||||
|
|
||||||
match info.media_set_label {
|
match info.media_set_label {
|
||||||
None => {
|
None => {
|
||||||
@ -424,7 +424,7 @@ pub fn request_and_restore_media(
|
|||||||
///
|
///
|
||||||
/// Only create the catalog if target is None.
|
/// Only create the catalog if target is None.
|
||||||
pub fn restore_media(
|
pub fn restore_media(
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
drive: &mut Box<dyn TapeDriver>,
|
drive: &mut Box<dyn TapeDriver>,
|
||||||
media_id: &MediaId,
|
media_id: &MediaId,
|
||||||
target: Option<(&DataStoreMap, &Authid)>,
|
target: Option<(&DataStoreMap, &Authid)>,
|
||||||
@ -452,7 +452,7 @@ pub fn restore_media(
|
|||||||
Ok(reader) => reader,
|
Ok(reader) => reader,
|
||||||
};
|
};
|
||||||
|
|
||||||
restore_archive(worker, reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?;
|
restore_archive(worker.clone(), reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
MediaCatalog::finish_temporary_database(status_path, &media_id.label.uuid, true)?;
|
MediaCatalog::finish_temporary_database(status_path, &media_id.label.uuid, true)?;
|
||||||
@ -461,7 +461,7 @@ pub fn restore_media(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn restore_archive<'a>(
|
fn restore_archive<'a>(
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
mut reader: Box<dyn 'a + TapeRead>,
|
mut reader: Box<dyn 'a + TapeRead>,
|
||||||
current_file_number: u64,
|
current_file_number: u64,
|
||||||
target: Option<(&DataStoreMap, &Authid)>,
|
target: Option<(&DataStoreMap, &Authid)>,
|
||||||
@ -520,7 +520,7 @@ fn restore_archive<'a>(
|
|||||||
if is_new {
|
if is_new {
|
||||||
task_log!(worker, "restore snapshot {}", backup_dir);
|
task_log!(worker, "restore snapshot {}", backup_dir);
|
||||||
|
|
||||||
match restore_snapshot_archive(worker, reader, &path, &datastore, checked_chunks) {
|
match restore_snapshot_archive(worker.clone(), reader, &path, &datastore, checked_chunks) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
std::fs::remove_dir_all(&path)?;
|
std::fs::remove_dir_all(&path)?;
|
||||||
bail!("restore snapshot {} failed - {}", backup_dir, err);
|
bail!("restore snapshot {} failed - {}", backup_dir, err);
|
||||||
@ -574,9 +574,9 @@ fn restore_archive<'a>(
|
|||||||
.or_insert(HashSet::new());
|
.or_insert(HashSet::new());
|
||||||
|
|
||||||
let chunks = if let Some(datastore) = datastore {
|
let chunks = if let Some(datastore) = datastore {
|
||||||
restore_chunk_archive(worker, reader, datastore, checked_chunks, verbose)?
|
restore_chunk_archive(worker.clone(), reader, datastore, checked_chunks, verbose)?
|
||||||
} else {
|
} else {
|
||||||
scan_chunk_archive(worker, reader, verbose)?
|
scan_chunk_archive(worker.clone(), reader, verbose)?
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(chunks) = chunks {
|
if let Some(chunks) = chunks {
|
||||||
@ -619,7 +619,7 @@ fn restore_archive<'a>(
|
|||||||
|
|
||||||
// Read chunk archive without restoring data - just record contained chunks
|
// Read chunk archive without restoring data - just record contained chunks
|
||||||
fn scan_chunk_archive<'a>(
|
fn scan_chunk_archive<'a>(
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
reader: Box<dyn 'a + TapeRead>,
|
reader: Box<dyn 'a + TapeRead>,
|
||||||
verbose: bool,
|
verbose: bool,
|
||||||
) -> Result<Option<Vec<[u8;32]>>, Error> {
|
) -> Result<Option<Vec<[u8;32]>>, Error> {
|
||||||
@ -642,7 +642,7 @@ fn scan_chunk_archive<'a>(
|
|||||||
|
|
||||||
// check if this is an aborted stream without end marker
|
// check if this is an aborted stream without end marker
|
||||||
if let Ok(false) = reader.has_end_marker() {
|
if let Ok(false) = reader.has_end_marker() {
|
||||||
worker.log("missing stream end marker".to_string());
|
task_log!(worker, "missing stream end marker");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -664,7 +664,7 @@ fn scan_chunk_archive<'a>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn restore_chunk_archive<'a>(
|
fn restore_chunk_archive<'a>(
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
reader: Box<dyn 'a + TapeRead>,
|
reader: Box<dyn 'a + TapeRead>,
|
||||||
datastore: Arc<DataStore>,
|
datastore: Arc<DataStore>,
|
||||||
checked_chunks: &mut HashSet<[u8;32]>,
|
checked_chunks: &mut HashSet<[u8;32]>,
|
||||||
@ -676,25 +676,38 @@ fn restore_chunk_archive<'a>(
|
|||||||
let mut decoder = ChunkArchiveDecoder::new(reader);
|
let mut decoder = ChunkArchiveDecoder::new(reader);
|
||||||
|
|
||||||
let datastore2 = datastore.clone();
|
let datastore2 = datastore.clone();
|
||||||
|
let start_time = std::time::SystemTime::now();
|
||||||
|
let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||||
|
let bytes2 = bytes.clone();
|
||||||
|
|
||||||
|
let worker2 = worker.clone();
|
||||||
|
|
||||||
let writer_pool = ParallelHandler::new(
|
let writer_pool = ParallelHandler::new(
|
||||||
"tape restore chunk writer",
|
"tape restore chunk writer",
|
||||||
4,
|
4,
|
||||||
move |(chunk, digest): (DataBlob, [u8; 32])| {
|
move |(chunk, digest): (DataBlob, [u8; 32])| {
|
||||||
// println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
|
let chunk_exists = datastore2.cond_touch_chunk(&digest, false)?;
|
||||||
chunk.verify_crc()?;
|
if !chunk_exists {
|
||||||
if chunk.crypt_mode()? == CryptMode::None {
|
if verbose {
|
||||||
chunk.decode(None, Some(&digest))?; // verify digest
|
task_log!(worker2, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest));
|
||||||
}
|
}
|
||||||
|
bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst);
|
||||||
|
// println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
|
||||||
|
chunk.verify_crc()?;
|
||||||
|
if chunk.crypt_mode()? == CryptMode::None {
|
||||||
|
chunk.decode(None, Some(&digest))?; // verify digest
|
||||||
|
}
|
||||||
|
|
||||||
datastore2.insert_chunk(&chunk, &digest)?;
|
datastore2.insert_chunk(&chunk, &digest)?;
|
||||||
|
} else if verbose {
|
||||||
|
task_log!(worker2, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let verify_and_write_channel = writer_pool.channel();
|
let verify_and_write_channel = writer_pool.channel();
|
||||||
|
|
||||||
let start_time = std::time::SystemTime::now();
|
|
||||||
let mut bytes = 0;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (digest, blob) = match decoder.next_chunk() {
|
let (digest, blob) = match decoder.next_chunk() {
|
||||||
@ -710,7 +723,7 @@ fn restore_chunk_archive<'a>(
|
|||||||
|
|
||||||
// check if this is an aborted stream without end marker
|
// check if this is an aborted stream without end marker
|
||||||
if let Ok(false) = reader.has_end_marker() {
|
if let Ok(false) = reader.has_end_marker() {
|
||||||
worker.log("missing stream end marker".to_string());
|
task_log!(worker, "missing stream end marker");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -721,17 +734,10 @@ fn restore_chunk_archive<'a>(
|
|||||||
|
|
||||||
worker.check_abort()?;
|
worker.check_abort()?;
|
||||||
|
|
||||||
let chunk_exists = datastore.cond_touch_chunk(&digest, false)?;
|
if !checked_chunks.contains(&digest) {
|
||||||
if !chunk_exists {
|
|
||||||
if verbose {
|
|
||||||
task_log!(worker, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest));
|
|
||||||
}
|
|
||||||
bytes += blob.raw_size();
|
|
||||||
verify_and_write_channel.send((blob, digest.clone()))?;
|
verify_and_write_channel.send((blob, digest.clone()))?;
|
||||||
} else if verbose {
|
checked_chunks.insert(digest.clone());
|
||||||
task_log!(worker, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
|
|
||||||
}
|
}
|
||||||
checked_chunks.insert(digest.clone());
|
|
||||||
chunks.push(digest);
|
chunks.push(digest);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -741,6 +747,8 @@ fn restore_chunk_archive<'a>(
|
|||||||
|
|
||||||
let elapsed = start_time.elapsed()?.as_secs_f64();
|
let elapsed = start_time.elapsed()?.as_secs_f64();
|
||||||
|
|
||||||
|
let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
task_log!(
|
task_log!(
|
||||||
worker,
|
worker,
|
||||||
"restored {} bytes ({:.2} MB/s)",
|
"restored {} bytes ({:.2} MB/s)",
|
||||||
@ -752,7 +760,7 @@ fn restore_chunk_archive<'a>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn restore_snapshot_archive<'a>(
|
fn restore_snapshot_archive<'a>(
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
reader: Box<dyn 'a + TapeRead>,
|
reader: Box<dyn 'a + TapeRead>,
|
||||||
snapshot_path: &Path,
|
snapshot_path: &Path,
|
||||||
datastore: &DataStore,
|
datastore: &DataStore,
|
||||||
@ -782,7 +790,7 @@ fn restore_snapshot_archive<'a>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn try_restore_snapshot_archive<R: pxar::decoder::SeqRead>(
|
fn try_restore_snapshot_archive<R: pxar::decoder::SeqRead>(
|
||||||
worker: &WorkerTask,
|
worker: Arc<WorkerTask>,
|
||||||
decoder: &mut pxar::decoder::sync::Decoder<R>,
|
decoder: &mut pxar::decoder::sync::Decoder<R>,
|
||||||
snapshot_path: &Path,
|
snapshot_path: &Path,
|
||||||
_datastore: &DataStore,
|
_datastore: &DataStore,
|
||||||
|
Loading…
Reference in New Issue
Block a user