tape: speedup backup by doing read/write in parallel

This commit is contained in:
Dietmar Maurer 2021-03-16 08:11:57 +01:00
parent 7c1666289d
commit 5c4755ad08
2 changed files with 235 additions and 79 deletions

View File

@ -1,5 +1,5 @@
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::{Mutex, Arc};
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::Value; use serde_json::Value;
@ -508,33 +508,48 @@ pub fn backup_snapshot(
} }
}; };
let mut chunk_iter = snapshot_reader.chunk_iterator()?.peekable(); let snapshot_reader = Arc::new(Mutex::new(snapshot_reader));
let (reader_thread, chunk_iter) = pool_writer.spawn_chunk_reader_thread(
datastore.clone(),
snapshot_reader.clone(),
)?;
let mut chunk_iter = chunk_iter.peekable();
loop { loop {
worker.check_abort()?; worker.check_abort()?;
// test is we have remaining chunks // test is we have remaining chunks
if chunk_iter.peek().is_none() { match chunk_iter.peek() {
break; None => break,
Some(Ok(_)) => { /* Ok */ },
Some(Err(err)) => bail!("{}", err),
} }
let uuid = pool_writer.load_writable_media(worker)?; let uuid = pool_writer.load_writable_media(worker)?;
worker.check_abort()?; worker.check_abort()?;
let (leom, _bytes) = pool_writer.append_chunk_archive(worker, &datastore, &mut chunk_iter)?; let (leom, _bytes) = pool_writer.append_chunk_archive(worker, &mut chunk_iter)?;
if leom { if leom {
pool_writer.set_media_status_full(&uuid)?; pool_writer.set_media_status_full(&uuid)?;
} }
} }
if let Err(_) = reader_thread.join() {
bail!("chunk reader thread failed");
}
worker.check_abort()?; worker.check_abort()?;
let uuid = pool_writer.load_writable_media(worker)?; let uuid = pool_writer.load_writable_media(worker)?;
worker.check_abort()?; worker.check_abort()?;
let snapshot_reader = snapshot_reader.lock().unwrap();
let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?; let (done, _bytes) = pool_writer.append_snapshot_archive(worker, &snapshot_reader)?;
if !done { if !done {

View File

@ -1,8 +1,9 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::path::Path; use std::path::Path;
use std::time::SystemTime; use std::time::SystemTime;
use std::sync::{Arc, Mutex};
use anyhow::{bail, Error}; use anyhow::{bail, format_err, Error};
use proxmox::tools::Uuid; use proxmox::tools::Uuid;
@ -10,6 +11,7 @@ use crate::{
task_log, task_log,
backup::{ backup::{
DataStore, DataStore,
DataBlob,
}, },
server::WorkerTask, server::WorkerTask,
tape::{ tape::{
@ -18,7 +20,6 @@ use crate::{
COMMIT_BLOCK_SIZE, COMMIT_BLOCK_SIZE,
TapeWrite, TapeWrite,
SnapshotReader, SnapshotReader,
SnapshotChunkIterator,
MediaPool, MediaPool,
MediaId, MediaId,
MediaCatalog, MediaCatalog,
@ -38,32 +39,192 @@ use crate::{
config::tape_encryption_keys::load_key_configs, config::tape_encryption_keys::load_key_configs,
}; };
/// Helper to build and query sets of catalogs
pub struct CatalogBuilder {
// read only part
media_set_catalog: MediaSetCatalog,
// catalog to modify (latest in set)
catalog: Option<MediaCatalog>,
}
impl CatalogBuilder {
/// Test if the catalog already contains a snapshot
pub fn contains_snapshot(&self, snapshot: &str) -> bool {
if let Some(ref catalog) = self.catalog {
if catalog.contains_snapshot(snapshot) {
return true;
}
}
self.media_set_catalog.contains_snapshot(snapshot)
}
/// Test if the catalog already contains a chunk
pub fn contains_chunk(&self, digest: &[u8;32]) -> bool {
if let Some(ref catalog) = self.catalog {
if catalog.contains_chunk(digest) {
return true;
}
}
self.media_set_catalog.contains_chunk(digest)
}
/// Add a new catalog, move the old on to the read-only set
pub fn append_catalog(&mut self, new_catalog: MediaCatalog) -> Result<(), Error> {
// append current catalog to read-only set
if let Some(catalog) = self.catalog.take() {
self.media_set_catalog.append_catalog(catalog)?;
}
// remove read-only version from set (in case it is there)
self.media_set_catalog.remove_catalog(&new_catalog.uuid());
self.catalog = Some(new_catalog);
Ok(())
}
/// Register a snapshot
pub fn register_snapshot(
&mut self,
uuid: Uuid, // Uuid form MediaContentHeader
file_number: u64,
snapshot: &str,
) -> Result<(), Error> {
match self.catalog {
Some(ref mut catalog) => {
catalog.register_snapshot(uuid, file_number, snapshot)?;
}
None => bail!("no catalog loaded - internal error"),
}
Ok(())
}
/// Register a chunk archive
pub fn register_chunk_archive(
&mut self,
uuid: Uuid, // Uuid form MediaContentHeader
file_number: u64,
chunk_list: &[[u8; 32]],
) -> Result<(), Error> {
match self.catalog {
Some(ref mut catalog) => {
catalog.start_chunk_archive(uuid, file_number)?;
for digest in chunk_list {
catalog.register_chunk(digest)?;
}
catalog.end_chunk_archive()?;
}
None => bail!("no catalog loaded - internal error"),
}
Ok(())
}
/// Commit the catalog changes
pub fn commit(&mut self) -> Result<(), Error> {
if let Some(ref mut catalog) = self.catalog {
catalog.commit()?;
}
Ok(())
}
}
/// Chunk iterator which use a separate thread to read chunks
///
/// The iterator skips duplicate chunks and chunks already in the
/// catalog.
pub struct NewChunksIterator {
rx: std::sync::mpsc::Receiver<Result<Option<([u8; 32], DataBlob)>, Error>>,
}
impl NewChunksIterator {
/// Creates the iterator, spawning a new thread
///
/// Make sure to join() the returnd thread handle.
pub fn spawn(
datastore: Arc<DataStore>,
snapshot_reader: Arc<Mutex<SnapshotReader>>,
catalog_builder: Arc<Mutex<CatalogBuilder>>,
) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
let (tx, rx) = std::sync::mpsc::sync_channel(3);
let reader_thread = std::thread::spawn(move || {
let snapshot_reader = snapshot_reader.lock().unwrap();
let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
let result: Result<(), Error> = proxmox::try_block!({
let mut chunk_iter = snapshot_reader.chunk_iterator()?;
loop {
let digest = match chunk_iter.next() {
None => {
tx.send(Ok(None)).unwrap();
break;
}
Some(digest) => digest?,
};
if chunk_index.contains(&digest) {
continue;
}
if catalog_builder.lock().unwrap().contains_chunk(&digest) {
continue;
};
let blob = datastore.load_chunk(&digest)?;
//println!("LOAD CHUNK {}", proxmox::tools::digest_to_hex(&digest));
tx.send(Ok(Some((digest, blob)))).unwrap();
chunk_index.insert(digest);
}
Ok(())
});
if let Err(err) = result {
tx.send(Err(err)).unwrap();
}
});
Ok((reader_thread, Self { rx }))
}
}
// We do not use Receiver::into_iter(). The manual implementation
// returns a simpler type.
impl Iterator for NewChunksIterator {
type Item = Result<([u8; 32], DataBlob), Error>;
fn next(&mut self) -> Option<Self::Item> {
match self.rx.recv() {
Ok(Ok(None)) => None,
Ok(Ok(Some((digest, blob)))) => Some(Ok((digest, blob))),
Ok(Err(err)) => Some(Err(err)),
Err(_) => Some(Err(format_err!("reader thread failed"))),
}
}
}
struct PoolWriterState { struct PoolWriterState {
drive: Box<dyn TapeDriver>, drive: Box<dyn TapeDriver>,
catalog: MediaCatalog,
// tell if we already moved to EOM // tell if we already moved to EOM
at_eom: bool, at_eom: bool,
// bytes written after the last tape fush/sync // bytes written after the last tape fush/sync
bytes_written: usize, bytes_written: usize,
} }
impl PoolWriterState {
fn commit(&mut self) -> Result<(), Error> {
self.drive.sync()?; // sync all data to the tape
self.catalog.commit()?; // then commit the catalog
self.bytes_written = 0;
Ok(())
}
}
/// Helper to manage a backup job, writing several tapes of a pool /// Helper to manage a backup job, writing several tapes of a pool
pub struct PoolWriter { pub struct PoolWriter {
pool: MediaPool, pool: MediaPool,
drive_name: String, drive_name: String,
status: Option<PoolWriterState>, status: Option<PoolWriterState>,
media_set_catalog: MediaSetCatalog, catalog_builder: Arc<Mutex<CatalogBuilder>>,
notify_email: Option<String>, notify_email: Option<String>,
} }
@ -97,11 +258,13 @@ impl PoolWriter {
media_set_catalog.append_catalog(media_catalog)?; media_set_catalog.append_catalog(media_catalog)?;
} }
let catalog_builder = CatalogBuilder { media_set_catalog, catalog: None };
Ok(Self { Ok(Self {
pool, pool,
drive_name: drive_name.to_string(), drive_name: drive_name.to_string(),
status: None, status: None,
media_set_catalog, catalog_builder: Arc::new(Mutex::new(catalog_builder)),
notify_email, notify_email,
}) })
} }
@ -117,12 +280,7 @@ impl PoolWriter {
} }
pub fn contains_snapshot(&self, snapshot: &str) -> bool { pub fn contains_snapshot(&self, snapshot: &str) -> bool {
if let Some(PoolWriterState { ref catalog, .. }) = self.status { self.catalog_builder.lock().unwrap().contains_snapshot(snapshot)
if catalog.contains_snapshot(snapshot) {
return true;
}
}
self.media_set_catalog.contains_snapshot(snapshot)
} }
/// Eject media and drop PoolWriterState (close drive) /// Eject media and drop PoolWriterState (close drive)
@ -188,16 +346,17 @@ impl PoolWriter {
/// This is done automatically during a backupsession, but needs to /// This is done automatically during a backupsession, but needs to
/// be called explicitly before dropping the PoolWriter /// be called explicitly before dropping the PoolWriter
pub fn commit(&mut self) -> Result<(), Error> { pub fn commit(&mut self) -> Result<(), Error> {
if let Some(ref mut status) = self.status { if let Some(PoolWriterState {ref mut drive, .. }) = self.status {
status.commit()?; drive.sync()?; // sync all data to the tape
} }
self.catalog_builder.lock().unwrap().commit()?; // then commit the catalog
Ok(()) Ok(())
} }
/// Load a writable media into the drive /// Load a writable media into the drive
pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> { pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
let last_media_uuid = match self.status { let last_media_uuid = match self.catalog_builder.lock().unwrap().catalog {
Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()), Some(ref catalog) => Some(catalog.uuid().clone()),
None => None, None => None,
}; };
@ -217,14 +376,12 @@ impl PoolWriter {
task_log!(worker, "allocated new writable media '{}'", media.label_text()); task_log!(worker, "allocated new writable media '{}'", media.label_text());
// remove read-only catalog (we store a writable version in status) if let Some(PoolWriterState {mut drive, .. }) = self.status.take() {
self.media_set_catalog.remove_catalog(&media_uuid); if last_media_uuid.is_some() {
if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() {
self.media_set_catalog.append_catalog(catalog)?;
task_log!(worker, "eject current media"); task_log!(worker, "eject current media");
drive.eject_media()?; drive.eject_media()?;
} }
}
let (drive_config, _digest) = crate::config::drive::config()?; let (drive_config, _digest) = crate::config::drive::config()?;
@ -249,6 +406,8 @@ impl PoolWriter {
media.id(), media.id(),
)?; )?;
self.catalog_builder.lock().unwrap().append_catalog(catalog)?;
let media_set = media.media_set_label().clone().unwrap(); let media_set = media.media_set_label().clone().unwrap();
let encrypt_fingerprint = media_set let encrypt_fingerprint = media_set
@ -258,19 +417,11 @@ impl PoolWriter {
drive.set_encryption(encrypt_fingerprint)?; drive.set_encryption(encrypt_fingerprint)?;
self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 }); self.status = Some(PoolWriterState { drive, at_eom: false, bytes_written: 0 });
Ok(media_uuid) Ok(media_uuid)
} }
/// uuid of currently loaded BackupMedia
pub fn current_media_uuid(&self) -> Result<&Uuid, Error> {
match self.status {
Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()),
None => bail!("PoolWriter - no media loaded"),
}
}
/// Move to EOM (if not already there), then creates a new snapshot /// Move to EOM (if not already there), then creates a new snapshot
/// archive writing specified files (as .pxar) into it. On /// archive writing specified files (as .pxar) into it. On
/// success, this return 'Ok(true)' and the media catalog gets /// success, this return 'Ok(true)' and the media catalog gets
@ -308,7 +459,7 @@ impl PoolWriter {
match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? { match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? {
Some(content_uuid) => { Some(content_uuid) => {
status.catalog.register_snapshot( self.catalog_builder.lock().unwrap().register_snapshot(
content_uuid, content_uuid,
current_file_number, current_file_number,
&snapshot_reader.snapshot().to_string(), &snapshot_reader.snapshot().to_string(),
@ -324,7 +475,7 @@ impl PoolWriter {
let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
if !done || request_sync { if !done || request_sync {
status.commit()?; self.commit()?;
} }
Ok((done, bytes_written)) Ok((done, bytes_written))
@ -337,8 +488,7 @@ impl PoolWriter {
pub fn append_chunk_archive( pub fn append_chunk_archive(
&mut self, &mut self,
worker: &WorkerTask, worker: &WorkerTask,
datastore: &DataStore, chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
) -> Result<(bool, usize), Error> { ) -> Result<(bool, usize), Error> {
let status = match self.status { let status = match self.status {
@ -363,10 +513,7 @@ impl PoolWriter {
let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive( let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
worker, worker,
writer, writer,
datastore,
chunk_iter, chunk_iter,
&self.media_set_catalog,
&status.catalog,
MAX_CHUNK_ARCHIVE_SIZE, MAX_CHUNK_ARCHIVE_SIZE,
)?; )?;
@ -383,63 +530,57 @@ impl PoolWriter {
let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
// register chunks in media_catalog // register chunks in media_catalog
status.catalog.start_chunk_archive(content_uuid, current_file_number)?; self.catalog_builder.lock().unwrap()
for digest in saved_chunks { .register_chunk_archive(content_uuid, current_file_number, &saved_chunks)?;
status.catalog.register_chunk(&digest)?;
}
status.catalog.end_chunk_archive()?;
if leom || request_sync { if leom || request_sync {
status.commit()?; self.commit()?;
} }
Ok((leom, bytes_written)) Ok((leom, bytes_written))
} }
pub fn spawn_chunk_reader_thread(
&self,
datastore: Arc<DataStore>,
snapshot_reader: Arc<Mutex<SnapshotReader>>,
) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
NewChunksIterator::spawn(
datastore,
snapshot_reader,
Arc::clone(&self.catalog_builder),
)
}
} }
/// write up to <max_size> of chunks /// write up to <max_size> of chunks
fn write_chunk_archive<'a>( fn write_chunk_archive<'a>(
_worker: &WorkerTask, _worker: &WorkerTask,
writer: Box<dyn 'a + TapeWrite>, writer: Box<dyn 'a + TapeWrite>,
datastore: &DataStore, chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>,
media_set_catalog: &MediaSetCatalog,
media_catalog: &MediaCatalog,
max_size: usize, max_size: usize,
) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> { ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?; let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?;
let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
// we want to get the chunk list in correct order // we want to get the chunk list in correct order
let mut chunk_list: Vec<[u8;32]> = Vec::new(); let mut chunk_list: Vec<[u8;32]> = Vec::new();
let mut leom = false; let mut leom = false;
loop { loop {
let digest = match chunk_iter.peek() { let (digest, blob) = match chunk_iter.peek() {
None => break, None => break,
Some(Ok(digest)) => *digest, Some(Ok((digest, blob))) => (digest, blob),
Some(Err(err)) => bail!("{}", err), Some(Err(err)) => bail!("{}", err),
}; };
if media_catalog.contains_chunk(&digest) //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size());
|| chunk_index.contains(&digest)
|| media_set_catalog.contains_chunk(&digest)
{
chunk_iter.next(); // consume
continue;
}
let blob = datastore.load_chunk(&digest)?;
//println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size());
match writer.try_write_chunk(&digest, &blob) { match writer.try_write_chunk(&digest, &blob) {
Ok(true) => { Ok(true) => {
chunk_list.push(*digest);
chunk_iter.next(); // consume chunk_iter.next(); // consume
chunk_index.insert(digest);
chunk_list.push(digest);
} }
Ok(false) => { Ok(false) => {
// Note; we do not consume the chunk (no chunk_iter.next()) // Note; we do not consume the chunk (no chunk_iter.next())