From 166a48f903bdee747c476ee8e718eb505d8e87c6 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 19 Mar 2021 08:19:13 +0100 Subject: [PATCH] tape: cleanup - split PoolWriter into several files --- src/tape/pool_writer/catalog_set.rs | 118 +++++++++++ .../{pool_writer.rs => pool_writer/mod.rs} | 192 +----------------- src/tape/pool_writer/new_chunks_iterator.rs | 99 +++++++++ 3 files changed, 226 insertions(+), 183 deletions(-) create mode 100644 src/tape/pool_writer/catalog_set.rs rename src/tape/{pool_writer.rs => pool_writer/mod.rs} (78%) create mode 100644 src/tape/pool_writer/new_chunks_iterator.rs diff --git a/src/tape/pool_writer/catalog_set.rs b/src/tape/pool_writer/catalog_set.rs new file mode 100644 index 00000000..fbca3e97 --- /dev/null +++ b/src/tape/pool_writer/catalog_set.rs @@ -0,0 +1,118 @@ +use anyhow::{bail, Error}; + +use proxmox::tools::Uuid; + +use crate::{ + tape::{ + MediaCatalog, + MediaSetCatalog, + }, +}; + +/// Helper to build and query sets of catalogs +/// +/// Similar to MediaSetCatalog, but allows to modify the last catalog. +pub struct CatalogSet { + // read only part + pub media_set_catalog: MediaSetCatalog, + // catalog to modify (latest in set) + pub catalog: Option, +} + +impl CatalogSet { + + /// Create empty instance + pub fn new() -> Self { + Self { + media_set_catalog: MediaSetCatalog::new(), + catalog: None, + } + } + + /// Add catalog to the read-only set + pub fn append_read_only_catalog(&mut self, catalog: MediaCatalog) -> Result<(), Error> { + self.media_set_catalog.append_catalog(catalog) + } + + /// Test if the catalog already contains a snapshot + pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool { + if let Some(ref catalog) = self.catalog { + if catalog.contains_snapshot(store, snapshot) { + return true; + } + } + self.media_set_catalog.contains_snapshot(store, snapshot) + } + + /// Test if the catalog already contains a chunk + pub fn contains_chunk(&self, store: &str, digest: &[u8;32]) -> bool { + if let Some(ref catalog) = self.catalog { + if catalog.contains_chunk(store, digest) { + return true; + } + } + self.media_set_catalog.contains_chunk(store, digest) + } + + /// Add a new catalog, move the old on to the read-only set + pub fn append_catalog(&mut self, new_catalog: MediaCatalog) -> Result<(), Error> { + + // append current catalog to read-only set + if let Some(catalog) = self.catalog.take() { + self.media_set_catalog.append_catalog(catalog)?; + } + + // remove read-only version from set (in case it is there) + self.media_set_catalog.remove_catalog(&new_catalog.uuid()); + + self.catalog = Some(new_catalog); + + Ok(()) + } + + /// Register a snapshot + pub fn register_snapshot( + &mut self, + uuid: Uuid, // Uuid form MediaContentHeader + file_number: u64, + store: &str, + snapshot: &str, + ) -> Result<(), Error> { + match self.catalog { + Some(ref mut catalog) => { + catalog.register_snapshot(uuid, file_number, store, snapshot)?; + } + None => bail!("no catalog loaded - internal error"), + } + Ok(()) + } + + /// Register a chunk archive + pub fn register_chunk_archive( + &mut self, + uuid: Uuid, // Uuid form MediaContentHeader + file_number: u64, + store: &str, + chunk_list: &[[u8; 32]], + ) -> Result<(), Error> { + match self.catalog { + Some(ref mut catalog) => { + catalog.start_chunk_archive(uuid, file_number, store)?; + for digest in chunk_list { + catalog.register_chunk(digest)?; + } + catalog.end_chunk_archive()?; + } + None => bail!("no catalog loaded - internal error"), + } + Ok(()) + } + + /// Commit the catalog changes + pub fn commit(&mut self) -> Result<(), Error> { + if let Some(ref mut catalog) = self.catalog { + catalog.commit()?; + } + Ok(()) + } +} diff --git a/src/tape/pool_writer.rs b/src/tape/pool_writer/mod.rs similarity index 78% rename from src/tape/pool_writer.rs rename to src/tape/pool_writer/mod.rs index 5cd7a926..8cfbe645 100644 --- a/src/tape/pool_writer.rs +++ b/src/tape/pool_writer/mod.rs @@ -1,10 +1,15 @@ -use std::collections::HashSet; +mod catalog_set; +pub use catalog_set::*; + +mod new_chunks_iterator; +pub use new_chunks_iterator::*; + use std::path::Path; use std::fs::File; use std::time::SystemTime; use std::sync::{Arc, Mutex}; -use anyhow::{bail, format_err, Error}; +use anyhow::{bail, Error}; use proxmox::tools::Uuid; @@ -12,7 +17,6 @@ use crate::{ task_log, backup::{ DataStore, - DataBlob, }, server::WorkerTask, tape::{ @@ -24,7 +28,6 @@ use crate::{ MediaPool, MediaId, MediaCatalog, - MediaSetCatalog, file_formats::{ MediaSetLabel, ChunkArchiveWriter, @@ -41,181 +44,6 @@ use crate::{ config::tape_encryption_keys::load_key_configs, }; -/// Helper to build and query sets of catalogs -pub struct CatalogSet { - // read only part - media_set_catalog: MediaSetCatalog, - // catalog to modify (latest in set) - catalog: Option, -} - -impl CatalogSet { - - /// Test if the catalog already contains a snapshot - pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool { - if let Some(ref catalog) = self.catalog { - if catalog.contains_snapshot(store, snapshot) { - return true; - } - } - self.media_set_catalog.contains_snapshot(store, snapshot) - } - - /// Test if the catalog already contains a chunk - pub fn contains_chunk(&self, store: &str, digest: &[u8;32]) -> bool { - if let Some(ref catalog) = self.catalog { - if catalog.contains_chunk(store, digest) { - return true; - } - } - self.media_set_catalog.contains_chunk(store, digest) - } - - /// Add a new catalog, move the old on to the read-only set - pub fn append_catalog(&mut self, new_catalog: MediaCatalog) -> Result<(), Error> { - - // append current catalog to read-only set - if let Some(catalog) = self.catalog.take() { - self.media_set_catalog.append_catalog(catalog)?; - } - - // remove read-only version from set (in case it is there) - self.media_set_catalog.remove_catalog(&new_catalog.uuid()); - - self.catalog = Some(new_catalog); - - Ok(()) - } - - /// Register a snapshot - pub fn register_snapshot( - &mut self, - uuid: Uuid, // Uuid form MediaContentHeader - file_number: u64, - store: &str, - snapshot: &str, - ) -> Result<(), Error> { - match self.catalog { - Some(ref mut catalog) => { - catalog.register_snapshot(uuid, file_number, store, snapshot)?; - } - None => bail!("no catalog loaded - internal error"), - } - Ok(()) - } - - /// Register a chunk archive - pub fn register_chunk_archive( - &mut self, - uuid: Uuid, // Uuid form MediaContentHeader - file_number: u64, - store: &str, - chunk_list: &[[u8; 32]], - ) -> Result<(), Error> { - match self.catalog { - Some(ref mut catalog) => { - catalog.start_chunk_archive(uuid, file_number, store)?; - for digest in chunk_list { - catalog.register_chunk(digest)?; - } - catalog.end_chunk_archive()?; - } - None => bail!("no catalog loaded - internal error"), - } - Ok(()) - } - - /// Commit the catalog changes - pub fn commit(&mut self) -> Result<(), Error> { - if let Some(ref mut catalog) = self.catalog { - catalog.commit()?; - } - Ok(()) - } -} - -/// Chunk iterator which use a separate thread to read chunks -/// -/// The iterator skips duplicate chunks and chunks already in the -/// catalog. -pub struct NewChunksIterator { - rx: std::sync::mpsc::Receiver, Error>>, -} - -impl NewChunksIterator { - - /// Creates the iterator, spawning a new thread - /// - /// Make sure to join() the returnd thread handle. - pub fn spawn( - datastore: Arc, - snapshot_reader: Arc>, - catalog_set: Arc>, - ) -> Result<(std::thread::JoinHandle<()>, Self), Error> { - - let (tx, rx) = std::sync::mpsc::sync_channel(3); - - let reader_thread = std::thread::spawn(move || { - - let snapshot_reader = snapshot_reader.lock().unwrap(); - - let mut chunk_index: HashSet<[u8;32]> = HashSet::new(); - - let datastore_name = snapshot_reader.datastore_name(); - - let result: Result<(), Error> = proxmox::try_block!({ - - let mut chunk_iter = snapshot_reader.chunk_iterator()?; - - loop { - let digest = match chunk_iter.next() { - None => { - tx.send(Ok(None)).unwrap(); - break; - } - Some(digest) => digest?, - }; - - if chunk_index.contains(&digest) { - continue; - } - - if catalog_set.lock().unwrap().contains_chunk(&datastore_name, &digest) { - continue; - }; - - let blob = datastore.load_chunk(&digest)?; - //println!("LOAD CHUNK {}", proxmox::tools::digest_to_hex(&digest)); - tx.send(Ok(Some((digest, blob)))).unwrap(); - - chunk_index.insert(digest); - } - - Ok(()) - }); - if let Err(err) = result { - tx.send(Err(err)).unwrap(); - } - }); - - Ok((reader_thread, Self { rx })) - } -} - -// We do not use Receiver::into_iter(). The manual implementation -// returns a simpler type. -impl Iterator for NewChunksIterator { - type Item = Result<([u8; 32], DataBlob), Error>; - - fn next(&mut self) -> Option { - match self.rx.recv() { - Ok(Ok(None)) => None, - Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))), - Ok(Err(err)) => Some(Err(err)), - Err(_) => Some(Err(format_err!("reader thread failed"))), - } - } -} struct PoolWriterState { drive: Box, @@ -259,7 +87,7 @@ impl PoolWriter { let media_set_uuid = pool.current_media_set().uuid(); task_log!(worker, "media set uuid: {}", media_set_uuid); - let mut media_set_catalog = MediaSetCatalog::new(); + let mut catalog_set = CatalogSet::new(); // load all catalogs read-only at start for media_uuid in pool.current_media_list()? { @@ -270,11 +98,9 @@ impl PoolWriter { false, false, )?; - media_set_catalog.append_catalog(media_catalog)?; + catalog_set.append_read_only_catalog(media_catalog)?; } - let catalog_set = CatalogSet { media_set_catalog, catalog: None }; - Ok(Self { pool, drive_name: drive_name.to_string(), diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs new file mode 100644 index 00000000..56491356 --- /dev/null +++ b/src/tape/pool_writer/new_chunks_iterator.rs @@ -0,0 +1,99 @@ +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; + +use anyhow::{format_err, Error}; + +use crate::{ + backup::{ + DataStore, + DataBlob, + }, + tape::{ + CatalogSet, + SnapshotReader, + + }, +}; + +/// Chunk iterator which use a separate thread to read chunks +/// +/// The iterator skips duplicate chunks and chunks already in the +/// catalog. +pub struct NewChunksIterator { + rx: std::sync::mpsc::Receiver, Error>>, +} + +impl NewChunksIterator { + + /// Creates the iterator, spawning a new thread + /// + /// Make sure to join() the returnd thread handle. + pub fn spawn( + datastore: Arc, + snapshot_reader: Arc>, + catalog_set: Arc>, + ) -> Result<(std::thread::JoinHandle<()>, Self), Error> { + + let (tx, rx) = std::sync::mpsc::sync_channel(3); + + let reader_thread = std::thread::spawn(move || { + + let snapshot_reader = snapshot_reader.lock().unwrap(); + + let mut chunk_index: HashSet<[u8;32]> = HashSet::new(); + + let datastore_name = snapshot_reader.datastore_name(); + + let result: Result<(), Error> = proxmox::try_block!({ + + let mut chunk_iter = snapshot_reader.chunk_iterator()?; + + loop { + let digest = match chunk_iter.next() { + None => { + tx.send(Ok(None)).unwrap(); + break; + } + Some(digest) => digest?, + }; + + if chunk_index.contains(&digest) { + continue; + } + + if catalog_set.lock().unwrap().contains_chunk(&datastore_name, &digest) { + continue; + }; + + let blob = datastore.load_chunk(&digest)?; + //println!("LOAD CHUNK {}", proxmox::tools::digest_to_hex(&digest)); + tx.send(Ok(Some((digest, blob)))).unwrap(); + + chunk_index.insert(digest); + } + + Ok(()) + }); + if let Err(err) = result { + tx.send(Err(err)).unwrap(); + } + }); + + Ok((reader_thread, Self { rx })) + } +} + +// We do not use Receiver::into_iter(). The manual implementation +// returns a simpler type. +impl Iterator for NewChunksIterator { + type Item = Result<([u8; 32], DataBlob), Error>; + + fn next(&mut self) -> Option { + match self.rx.recv() { + Ok(Ok(None)) => None, + Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))), + Ok(Err(err)) => Some(Err(err)), + Err(_) => Some(Err(format_err!("reader thread failed"))), + } + } +}