tape restore: split restore_chunk_archive

Split out a separate function scan_chunk_archive() for catalog restores.

Note: Required, because we need to optimize restore_chunk_archive() to
write datastore in separate threads (else thape drive will stop during restore)
This commit is contained in:
Dietmar Maurer 2021-04-30 12:09:50 +02:00
parent 8fb24a2c0a
commit 87bf9f569f
1 changed files with 69 additions and 21 deletions

View File

@ -150,12 +150,12 @@ impl DataStoreMap {
set
}
fn get_datastore(&self, source: &str) -> Option<&DataStore> {
fn get_datastore(&self, source: &str) -> Option<Arc<DataStore>> {
if let Some(store) = self.map.get(source) {
return Some(&store);
return Some(Arc::clone(store));
}
if let Some(ref store) = self.default {
return Some(&store);
return Some(Arc::clone(store));
}
return None;
@ -575,10 +575,16 @@ fn restore_archive<'a>(
if datastore.is_some() || target.is_none() {
let checked_chunks = checked_chunks_map
.entry(datastore.map(|d| d.name()).unwrap_or("_unused_").to_string())
.entry(datastore.as_ref().map(|d| d.name()).unwrap_or("_unused_").to_string())
.or_insert(HashSet::new());
if let Some(chunks) = restore_chunk_archive(worker, reader, datastore, checked_chunks, verbose)? {
let chunks = if let Some(datastore) = datastore {
restore_chunk_archive(worker, reader, datastore, checked_chunks, verbose)?
} else {
scan_chunk_archive(worker, reader, verbose)?
};
if let Some(chunks) = chunks {
catalog.start_chunk_archive(
Uuid::from(header.uuid),
current_file_number,
@ -616,10 +622,56 @@ fn restore_archive<'a>(
Ok(())
}
// Read chunk archive without restoring data - just record contained chunks
fn scan_chunk_archive<'a>(
worker: &WorkerTask,
reader: Box<dyn 'a + TapeRead>,
verbose: bool,
) -> Result<Option<Vec<[u8;32]>>, Error> {
let mut chunks = Vec::new();
let mut decoder = ChunkArchiveDecoder::new(reader);
loop {
let digest = match decoder.next_chunk() {
Ok(Some((digest, _blob))) => digest,
Ok(None) => break,
Err(err) => {
let reader = decoder.reader();
// check if this stream is marked incomplete
if let Ok(true) = reader.is_incomplete() {
return Ok(Some(chunks));
}
// check if this is an aborted stream without end marker
if let Ok(false) = reader.has_end_marker() {
worker.log("missing stream end marker".to_string());
return Ok(None);
}
// else the archive is corrupt
return Err(err);
}
};
worker.check_abort()?;
if verbose {
task_log!(worker, "Found chunk: {}", proxmox::tools::digest_to_hex(&digest));
}
chunks.push(digest);
}
Ok(Some(chunks))
}
fn restore_chunk_archive<'a>(
worker: &WorkerTask,
reader: Box<dyn 'a + TapeRead>,
datastore: Option<&DataStore>,
datastore: Arc<DataStore>,
checked_chunks: &mut HashSet<[u8;32]>,
verbose: bool,
) -> Result<Option<Vec<[u8;32]>>, Error> {
@ -653,25 +705,21 @@ fn restore_chunk_archive<'a>(
worker.check_abort()?;
if let Some(datastore) = datastore {
let chunk_exists = datastore.cond_touch_chunk(&digest, false)?;
if !chunk_exists {
blob.verify_crc()?;
let chunk_exists = datastore.cond_touch_chunk(&digest, false)?;
if !chunk_exists {
blob.verify_crc()?;
if blob.crypt_mode()? == CryptMode::None {
blob.decode(None, Some(&digest))?; // verify digest
}
if verbose {
task_log!(worker, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest));
}
datastore.insert_chunk(&blob, &digest)?;
} else if verbose {
task_log!(worker, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
if blob.crypt_mode()? == CryptMode::None {
blob.decode(None, Some(&digest))?; // verify digest
}
checked_chunks.insert(digest.clone());
if verbose {
task_log!(worker, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest));
}
datastore.insert_chunk(&blob, &digest)?;
} else if verbose {
task_log!(worker, "Found chunk: {}", proxmox::tools::digest_to_hex(&digest));
task_log!(worker, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
}
checked_chunks.insert(digest.clone());
chunks.push(digest);
}