diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs index 17eed23e..9f1167fe 100644 --- a/src/api2/tape/backup.rs +++ b/src/api2/tape/backup.rs @@ -1,5 +1,5 @@ use std::path::Path; -use std::sync::Arc; +use std::sync::{Mutex, Arc}; use anyhow::{bail, format_err, Error}; use serde_json::Value; @@ -508,33 +508,48 @@ pub fn backup_snapshot( } }; - let mut chunk_iter = snapshot_reader.chunk_iterator()?.peekable(); + let snapshot_reader = Arc::new(Mutex::new(snapshot_reader)); + + let (reader_thread, chunk_iter) = pool_writer.spawn_chunk_reader_thread( + datastore.clone(), + snapshot_reader.clone(), + )?; + + let mut chunk_iter = chunk_iter.peekable(); loop { worker.check_abort()?; // test is we have remaining chunks - if chunk_iter.peek().is_none() { - break; + match chunk_iter.peek() { + None => break, + Some(Ok(_)) => { /* Ok */ }, + Some(Err(err)) => bail!("{}", err), } let uuid = pool_writer.load_writable_media(worker)?; worker.check_abort()?; - let (leom, _bytes) = pool_writer.append_chunk_archive(worker, &datastore, &mut chunk_iter)?; + let (leom, _bytes) = pool_writer.append_chunk_archive(worker, &mut chunk_iter)?; if leom { pool_writer.set_media_status_full(&uuid)?; } } + if let Err(_) = reader_thread.join() { + bail!("chunk reader thread failed"); + } + worker.check_abort()?; let uuid = pool_writer.load_writable_media(worker)?; worker.check_abort()?; + let snapshot_reader = snapshot_reader.lock().unwrap(); + let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?; if !done { diff --git a/src/tape/pool_writer.rs b/src/tape/pool_writer.rs index 105fe695..2b21c374 100644 --- a/src/tape/pool_writer.rs +++ b/src/tape/pool_writer.rs @@ -1,8 +1,9 @@ use std::collections::HashSet; use std::path::Path; use std::time::SystemTime; +use std::sync::{Arc, Mutex}; -use anyhow::{bail, Error}; +use anyhow::{bail, format_err, Error}; use proxmox::tools::Uuid; @@ -10,6 +11,7 @@ use crate::{ task_log, backup::{ DataStore, + DataBlob, }, server::WorkerTask, tape::{ @@ -18,7 +20,6 @@ use crate::{ COMMIT_BLOCK_SIZE, TapeWrite, SnapshotReader, - SnapshotChunkIterator, MediaPool, MediaId, MediaCatalog, @@ -38,32 +39,192 @@ use crate::{ config::tape_encryption_keys::load_key_configs, }; +/// Helper to build and query sets of catalogs +pub struct CatalogBuilder { + // read only part + media_set_catalog: MediaSetCatalog, + // catalog to modify (latest in set) + catalog: Option, +} + +impl CatalogBuilder { + + /// Test if the catalog already contains a snapshot + pub fn contains_snapshot(&self, snapshot: &str) -> bool { + if let Some(ref catalog) = self.catalog { + if catalog.contains_snapshot(snapshot) { + return true; + } + } + self.media_set_catalog.contains_snapshot(snapshot) + } + + /// Test if the catalog already contains a chunk + pub fn contains_chunk(&self, digest: &[u8;32]) -> bool { + if let Some(ref catalog) = self.catalog { + if catalog.contains_chunk(digest) { + return true; + } + } + self.media_set_catalog.contains_chunk(digest) + } + + /// Add a new catalog, move the old on to the read-only set + pub fn append_catalog(&mut self, new_catalog: MediaCatalog) -> Result<(), Error> { + + // append current catalog to read-only set + if let Some(catalog) = self.catalog.take() { + self.media_set_catalog.append_catalog(catalog)?; + } + + // remove read-only version from set (in case it is there) + self.media_set_catalog.remove_catalog(&new_catalog.uuid()); + + self.catalog = Some(new_catalog); + + Ok(()) + } + + /// Register a snapshot + pub fn register_snapshot( + &mut self, + uuid: Uuid, // Uuid form MediaContentHeader + file_number: u64, + snapshot: &str, + ) -> Result<(), Error> { + match self.catalog { + Some(ref mut catalog) => { + catalog.register_snapshot(uuid, file_number, snapshot)?; + } + None => bail!("no catalog loaded - internal error"), + } + Ok(()) + } + + /// Register a chunk archive + pub fn register_chunk_archive( + &mut self, + uuid: Uuid, // Uuid form MediaContentHeader + file_number: u64, + chunk_list: &[[u8; 32]], + ) -> Result<(), Error> { + match self.catalog { + Some(ref mut catalog) => { + catalog.start_chunk_archive(uuid, file_number)?; + for digest in chunk_list { + catalog.register_chunk(digest)?; + } + catalog.end_chunk_archive()?; + } + None => bail!("no catalog loaded - internal error"), + } + Ok(()) + } + + /// Commit the catalog changes + pub fn commit(&mut self) -> Result<(), Error> { + if let Some(ref mut catalog) = self.catalog { + catalog.commit()?; + } + Ok(()) + } +} + +/// Chunk iterator which use a separate thread to read chunks +/// +/// The iterator skips duplicate chunks and chunks already in the +/// catalog. +pub struct NewChunksIterator { + rx: std::sync::mpsc::Receiver, Error>>, +} + +impl NewChunksIterator { + + /// Creates the iterator, spawning a new thread + /// + /// Make sure to join() the returnd thread handle. + pub fn spawn( + datastore: Arc, + snapshot_reader: Arc>, + catalog_builder: Arc>, + ) -> Result<(std::thread::JoinHandle<()>, Self), Error> { + + let (tx, rx) = std::sync::mpsc::sync_channel(3); + + let reader_thread = std::thread::spawn(move || { + + let snapshot_reader = snapshot_reader.lock().unwrap(); + + let mut chunk_index: HashSet<[u8;32]> = HashSet::new(); + + let result: Result<(), Error> = proxmox::try_block!({ + + let mut chunk_iter = snapshot_reader.chunk_iterator()?; + + loop { + let digest = match chunk_iter.next() { + None => { + tx.send(Ok(None)).unwrap(); + break; + } + Some(digest) => digest?, + }; + + if chunk_index.contains(&digest) { + continue; + } + + if catalog_builder.lock().unwrap().contains_chunk(&digest) { + continue; + }; + + let blob = datastore.load_chunk(&digest)?; + //println!("LOAD CHUNK {}", proxmox::tools::digest_to_hex(&digest)); + tx.send(Ok(Some((digest, blob)))).unwrap(); + + chunk_index.insert(digest); + } + + Ok(()) + }); + if let Err(err) = result { + tx.send(Err(err)).unwrap(); + } + }); + + Ok((reader_thread, Self { rx })) + } +} + +// We do not use Receiver::into_iter(). The manual implementation +// returns a simpler type. +impl Iterator for NewChunksIterator { + type Item = Result<([u8; 32], DataBlob), Error>; + + fn next(&mut self) -> Option { + match self.rx.recv() { + Ok(Ok(None)) => None, + Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))), + Ok(Err(err)) => Some(Err(err)), + Err(_) => Some(Err(format_err!("reader thread failed"))), + } + } +} struct PoolWriterState { drive: Box, - catalog: MediaCatalog, // tell if we already moved to EOM at_eom: bool, // bytes written after the last tape fush/sync bytes_written: usize, } -impl PoolWriterState { - - fn commit(&mut self) -> Result<(), Error> { - self.drive.sync()?; // sync all data to the tape - self.catalog.commit()?; // then commit the catalog - self.bytes_written = 0; - Ok(()) - } -} - /// Helper to manage a backup job, writing several tapes of a pool pub struct PoolWriter { pool: MediaPool, drive_name: String, status: Option, - media_set_catalog: MediaSetCatalog, + catalog_builder: Arc>, notify_email: Option, } @@ -97,11 +258,13 @@ impl PoolWriter { media_set_catalog.append_catalog(media_catalog)?; } + let catalog_builder = CatalogBuilder { media_set_catalog, catalog: None }; + Ok(Self { pool, drive_name: drive_name.to_string(), status: None, - media_set_catalog, + catalog_builder: Arc::new(Mutex::new(catalog_builder)), notify_email, }) } @@ -117,12 +280,7 @@ impl PoolWriter { } pub fn contains_snapshot(&self, snapshot: &str) -> bool { - if let Some(PoolWriterState { ref catalog, .. }) = self.status { - if catalog.contains_snapshot(snapshot) { - return true; - } - } - self.media_set_catalog.contains_snapshot(snapshot) + self.catalog_builder.lock().unwrap().contains_snapshot(snapshot) } /// Eject media and drop PoolWriterState (close drive) @@ -188,16 +346,17 @@ impl PoolWriter { /// This is done automatically during a backupsession, but needs to /// be called explicitly before dropping the PoolWriter pub fn commit(&mut self) -> Result<(), Error> { - if let Some(ref mut status) = self.status { - status.commit()?; + if let Some(PoolWriterState {ref mut drive, .. }) = self.status { + drive.sync()?; // sync all data to the tape } + self.catalog_builder.lock().unwrap().commit()?; // then commit the catalog Ok(()) } /// Load a writable media into the drive pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result { - let last_media_uuid = match self.status { - Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()), + let last_media_uuid = match self.catalog_builder.lock().unwrap().catalog { + Some(ref catalog) => Some(catalog.uuid().clone()), None => None, }; @@ -217,13 +376,11 @@ impl PoolWriter { task_log!(worker, "allocated new writable media '{}'", media.label_text()); - // remove read-only catalog (we store a writable version in status) - self.media_set_catalog.remove_catalog(&media_uuid); - - if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() { - self.media_set_catalog.append_catalog(catalog)?; - task_log!(worker, "eject current media"); - drive.eject_media()?; + if let Some(PoolWriterState {mut drive, .. }) = self.status.take() { + if last_media_uuid.is_some() { + task_log!(worker, "eject current media"); + drive.eject_media()?; + } } let (drive_config, _digest) = crate::config::drive::config()?; @@ -249,6 +406,8 @@ impl PoolWriter { media.id(), )?; + self.catalog_builder.lock().unwrap().append_catalog(catalog)?; + let media_set = media.media_set_label().clone().unwrap(); let encrypt_fingerprint = media_set @@ -258,19 +417,11 @@ impl PoolWriter { drive.set_encryption(encrypt_fingerprint)?; - self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 }); + self.status = Some(PoolWriterState { drive, at_eom: false, bytes_written: 0 }); Ok(media_uuid) } - /// uuid of currently loaded BackupMedia - pub fn current_media_uuid(&self) -> Result<&Uuid, Error> { - match self.status { - Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()), - None => bail!("PoolWriter - no media loaded"), - } - } - /// Move to EOM (if not already there), then creates a new snapshot /// archive writing specified files (as .pxar) into it. On /// success, this return 'Ok(true)' and the media catalog gets @@ -308,7 +459,7 @@ impl PoolWriter { match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? { Some(content_uuid) => { - status.catalog.register_snapshot( + self.catalog_builder.lock().unwrap().register_snapshot( content_uuid, current_file_number, &snapshot_reader.snapshot().to_string(), @@ -324,7 +475,7 @@ impl PoolWriter { let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; if !done || request_sync { - status.commit()?; + self.commit()?; } Ok((done, bytes_written)) @@ -337,8 +488,7 @@ impl PoolWriter { pub fn append_chunk_archive( &mut self, worker: &WorkerTask, - datastore: &DataStore, - chunk_iter: &mut std::iter::Peekable, + chunk_iter: &mut std::iter::Peekable, ) -> Result<(bool, usize), Error> { let status = match self.status { @@ -363,10 +513,7 @@ impl PoolWriter { let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive( worker, writer, - datastore, chunk_iter, - &self.media_set_catalog, - &status.catalog, MAX_CHUNK_ARCHIVE_SIZE, )?; @@ -383,63 +530,57 @@ impl PoolWriter { let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; // register chunks in media_catalog - status.catalog.start_chunk_archive(content_uuid, current_file_number)?; - for digest in saved_chunks { - status.catalog.register_chunk(&digest)?; - } - status.catalog.end_chunk_archive()?; + self.catalog_builder.lock().unwrap() + .register_chunk_archive(content_uuid, current_file_number, &saved_chunks)?; if leom || request_sync { - status.commit()?; + self.commit()?; } Ok((leom, bytes_written)) } + + pub fn spawn_chunk_reader_thread( + &self, + datastore: Arc, + snapshot_reader: Arc>, + ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> { + NewChunksIterator::spawn( + datastore, + snapshot_reader, + Arc::clone(&self.catalog_builder), + ) + } } /// write up to of chunks fn write_chunk_archive<'a>( _worker: &WorkerTask, writer: Box, - datastore: &DataStore, - chunk_iter: &mut std::iter::Peekable, - media_set_catalog: &MediaSetCatalog, - media_catalog: &MediaCatalog, + chunk_iter: &mut std::iter::Peekable, max_size: usize, ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> { let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?; - let mut chunk_index: HashSet<[u8;32]> = HashSet::new(); - // we want to get the chunk list in correct order let mut chunk_list: Vec<[u8;32]> = Vec::new(); let mut leom = false; loop { - let digest = match chunk_iter.peek() { + let (digest, blob) = match chunk_iter.peek() { None => break, - Some(Ok(digest)) => *digest, + Some(Ok((digest, blob))) => (digest, blob), Some(Err(err)) => bail!("{}", err), }; - if media_catalog.contains_chunk(&digest) - || chunk_index.contains(&digest) - || media_set_catalog.contains_chunk(&digest) - { - chunk_iter.next(); // consume - continue; - } - - let blob = datastore.load_chunk(&digest)?; - //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size()); + //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size()); match writer.try_write_chunk(&digest, &blob) { Ok(true) => { + chunk_list.push(*digest); chunk_iter.next(); // consume - chunk_index.insert(digest); - chunk_list.push(digest); } Ok(false) => { // Note; we do not consume the chunk (no chunk_iter.next())