diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs index c97de61a..9cf36b37 100644 --- a/src/api2/tape/drive.rs +++ b/src/api2/tape/drive.rs @@ -1336,7 +1336,7 @@ pub fn catalog_media( drive.read_label()?; // skip over labels - we already read them above let mut checked_chunks = HashMap::new(); - restore_media(&worker, &mut drive, &media_id, None, &mut checked_chunks, verbose)?; + restore_media(worker, &mut drive, &media_id, None, &mut checked_chunks, verbose)?; Ok(()) }, diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs index 2614c68a..1dd6ba11 100644 --- a/src/api2/tape/restore.rs +++ b/src/api2/tape/restore.rs @@ -334,7 +334,7 @@ pub fn restore( for media_id in media_id_list.iter() { request_and_restore_media( - &worker, + worker.clone(), media_id, &drive_config, &drive, @@ -368,7 +368,7 @@ pub fn restore( /// Request and restore complete media without using existing catalog (create catalog instead) pub fn request_and_restore_media( - worker: &WorkerTask, + worker: Arc, media_id: &MediaId, drive_config: &SectionConfigData, drive_name: &str, @@ -388,7 +388,7 @@ pub fn request_and_restore_media( .and_then(|userid| lookup_user_email(userid)) .or_else(|| lookup_user_email(&authid.clone().into())); - let (mut drive, info) = request_and_load_media(worker, &drive_config, &drive_name, &media_id.label, &email)?; + let (mut drive, info) = request_and_load_media(&worker, &drive_config, &drive_name, &media_id.label, &email)?; match info.media_set_label { None => { @@ -424,7 +424,7 @@ pub fn request_and_restore_media( /// /// Only create the catalog if target is None. pub fn restore_media( - worker: &WorkerTask, + worker: Arc, drive: &mut Box, media_id: &MediaId, target: Option<(&DataStoreMap, &Authid)>, @@ -452,7 +452,7 @@ pub fn restore_media( Ok(reader) => reader, }; - restore_archive(worker, reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?; + restore_archive(worker.clone(), reader, current_file_number, target, &mut catalog, checked_chunks_map, verbose)?; } MediaCatalog::finish_temporary_database(status_path, &media_id.label.uuid, true)?; @@ -461,7 +461,7 @@ pub fn restore_media( } fn restore_archive<'a>( - worker: &WorkerTask, + worker: Arc, mut reader: Box, current_file_number: u64, target: Option<(&DataStoreMap, &Authid)>, @@ -520,7 +520,7 @@ fn restore_archive<'a>( if is_new { task_log!(worker, "restore snapshot {}", backup_dir); - match restore_snapshot_archive(worker, reader, &path, &datastore, checked_chunks) { + match restore_snapshot_archive(worker.clone(), reader, &path, &datastore, checked_chunks) { Err(err) => { std::fs::remove_dir_all(&path)?; bail!("restore snapshot {} failed - {}", backup_dir, err); @@ -574,9 +574,9 @@ fn restore_archive<'a>( .or_insert(HashSet::new()); let chunks = if let Some(datastore) = datastore { - restore_chunk_archive(worker, reader, datastore, checked_chunks, verbose)? + restore_chunk_archive(worker.clone(), reader, datastore, checked_chunks, verbose)? } else { - scan_chunk_archive(worker, reader, verbose)? + scan_chunk_archive(worker.clone(), reader, verbose)? }; if let Some(chunks) = chunks { @@ -619,7 +619,7 @@ fn restore_archive<'a>( // Read chunk archive without restoring data - just record contained chunks fn scan_chunk_archive<'a>( - worker: &WorkerTask, + worker: Arc, reader: Box, verbose: bool, ) -> Result>, Error> { @@ -642,7 +642,7 @@ fn scan_chunk_archive<'a>( // check if this is an aborted stream without end marker if let Ok(false) = reader.has_end_marker() { - worker.log("missing stream end marker".to_string()); + task_log!(worker, "missing stream end marker"); return Ok(None); } @@ -664,7 +664,7 @@ fn scan_chunk_archive<'a>( } fn restore_chunk_archive<'a>( - worker: &WorkerTask, + worker: Arc, reader: Box, datastore: Arc, checked_chunks: &mut HashSet<[u8;32]>, @@ -676,25 +676,38 @@ fn restore_chunk_archive<'a>( let mut decoder = ChunkArchiveDecoder::new(reader); let datastore2 = datastore.clone(); + let start_time = std::time::SystemTime::now(); + let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let bytes2 = bytes.clone(); + + let worker2 = worker.clone(); + let writer_pool = ParallelHandler::new( "tape restore chunk writer", 4, move |(chunk, digest): (DataBlob, [u8; 32])| { - // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); - chunk.verify_crc()?; - if chunk.crypt_mode()? == CryptMode::None { - chunk.decode(None, Some(&digest))?; // verify digest - } + let chunk_exists = datastore2.cond_touch_chunk(&digest, false)?; + if !chunk_exists { + if verbose { + task_log!(worker2, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest)); + } + bytes2.fetch_add(chunk.raw_size(), std::sync::atomic::Ordering::SeqCst); + // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); + chunk.verify_crc()?; + if chunk.crypt_mode()? == CryptMode::None { + chunk.decode(None, Some(&digest))?; // verify digest + } - datastore2.insert_chunk(&chunk, &digest)?; + datastore2.insert_chunk(&chunk, &digest)?; + } else if verbose { + task_log!(worker2, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest)); + } Ok(()) }, ); let verify_and_write_channel = writer_pool.channel(); - let start_time = std::time::SystemTime::now(); - let mut bytes = 0; loop { let (digest, blob) = match decoder.next_chunk() { @@ -710,7 +723,7 @@ fn restore_chunk_archive<'a>( // check if this is an aborted stream without end marker if let Ok(false) = reader.has_end_marker() { - worker.log("missing stream end marker".to_string()); + task_log!(worker, "missing stream end marker"); return Ok(None); } @@ -721,17 +734,10 @@ fn restore_chunk_archive<'a>( worker.check_abort()?; - let chunk_exists = datastore.cond_touch_chunk(&digest, false)?; - if !chunk_exists { - if verbose { - task_log!(worker, "Insert chunk: {}", proxmox::tools::digest_to_hex(&digest)); - } - bytes += blob.raw_size(); + if !checked_chunks.contains(&digest) { verify_and_write_channel.send((blob, digest.clone()))?; - } else if verbose { - task_log!(worker, "Found existing chunk: {}", proxmox::tools::digest_to_hex(&digest)); + checked_chunks.insert(digest.clone()); } - checked_chunks.insert(digest.clone()); chunks.push(digest); } @@ -741,6 +747,8 @@ fn restore_chunk_archive<'a>( let elapsed = start_time.elapsed()?.as_secs_f64(); + let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst); + task_log!( worker, "restored {} bytes ({:.2} MB/s)", @@ -752,7 +760,7 @@ fn restore_chunk_archive<'a>( } fn restore_snapshot_archive<'a>( - worker: &WorkerTask, + worker: Arc, reader: Box, snapshot_path: &Path, datastore: &DataStore, @@ -782,7 +790,7 @@ fn restore_snapshot_archive<'a>( } fn try_restore_snapshot_archive( - worker: &WorkerTask, + worker: Arc, decoder: &mut pxar::decoder::sync::Decoder, snapshot_path: &Path, _datastore: &DataStore,