tape restore: write datastore in separate thread

This commit is contained in:
Dietmar Maurer 2021-04-30 12:35:11 +02:00
parent 87bf9f569f
commit 5400fe171c
1 changed files with 26 additions and 9 deletions

View File

@ -32,7 +32,7 @@ use crate::{
task_log, task_log,
task_warn, task_warn,
task::TaskState, task::TaskState,
tools::compute_file_csum, tools::{compute_file_csum, ParallelHandler},
api2::types::{ api2::types::{
DATASTORE_MAP_ARRAY_SCHEMA, DATASTORE_MAP_ARRAY_SCHEMA,
DATASTORE_MAP_LIST_SCHEMA, DATASTORE_MAP_LIST_SCHEMA,
@ -680,6 +680,24 @@ fn restore_chunk_archive<'a>(
let mut decoder = ChunkArchiveDecoder::new(reader); let mut decoder = ChunkArchiveDecoder::new(reader);
let datastore2 = datastore.clone();
let writer_pool = ParallelHandler::new(
"tape restore chunk writer",
4,
move |(chunk, digest): (DataBlob, [u8; 32])| {
// println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
chunk.verify_crc()?;
if chunk.crypt_mode()? == CryptMode::None {
chunk.decode(None, Some(&digest))?; // verify digest
}
datastore2.insert_chunk(&chunk, &digest)?;
Ok(())
},
);
let verify_and_write_channel = writer_pool.channel();
loop { loop {
let (digest, blob) = match decoder.next_chunk() { let (digest, blob) = match decoder.next_chunk() {
Ok(Some((digest, blob))) => (digest, blob), Ok(Some((digest, blob))) => (digest, blob),
@ -707,15 +725,10 @@ fn restore_chunk_archive<'a>(
let chunk_exists = datastore.cond_touch_chunk(&digest, false)?; let chunk_exists = datastore.cond_touch_chunk(&digest, false)?;
if !chunk_exists { if !chunk_exists {
blob.verify_crc()?;
if blob.crypt_mode()? == CryptMode::None {
blob.decode(None, Some(&digest))?; // verify digest
}
if verbose { if verbose {
task_log!(worker, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest)); task_log!(worker, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest));
} }
datastore.insert_chunk(&blob, &digest)?; verify_and_write_channel.send((blob, digest.clone()))?;
} else if verbose { } else if verbose {
task_log!(worker, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest)); task_log!(worker, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest));
} }
@ -723,6 +736,10 @@ fn restore_chunk_archive<'a>(
chunks.push(digest); chunks.push(digest);
} }
drop(verify_and_write_channel);
writer_pool.complete()?;
Ok(Some(chunks)) Ok(Some(chunks))
} }