tape: rust fmt

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht
2022-04-10 17:49:03 +02:00
parent 429bc9d0a2
commit 4de1c42c20
29 changed files with 1183 additions and 1116 deletions

View File

@ -2,12 +2,7 @@ use anyhow::{bail, Error};
use proxmox_uuid::Uuid;
use crate::{
tape::{
MediaCatalog,
MediaSetCatalog,
},
};
use crate::tape::{MediaCatalog, MediaSetCatalog};
/// Helper to build and query sets of catalogs
///
@ -20,7 +15,6 @@ pub struct CatalogSet {
}
impl CatalogSet {
/// Create empty instance
pub fn new() -> Self {
Self {
@ -45,7 +39,7 @@ impl CatalogSet {
}
/// Test if the catalog already contains a chunk
pub fn contains_chunk(&self, store: &str, digest: &[u8;32]) -> bool {
pub fn contains_chunk(&self, store: &str, digest: &[u8; 32]) -> bool {
if let Some(ref catalog) = self.catalog {
if catalog.contains_chunk(store, digest) {
return true;
@ -56,7 +50,6 @@ impl CatalogSet {
/// Add a new catalog, move the old on to the read-only set
pub fn append_catalog(&mut self, new_catalog: MediaCatalog) -> Result<(), Error> {
// append current catalog to read-only set
if let Some(catalog) = self.catalog.take() {
self.media_set_catalog.append_catalog(catalog)?;
@ -77,7 +70,7 @@ impl CatalogSet {
file_number: u64,
store: &str,
snapshot: &str,
) -> Result<(), Error> {
) -> Result<(), Error> {
match self.catalog {
Some(ref mut catalog) => {
catalog.register_snapshot(uuid, file_number, store, snapshot)?;

View File

@ -4,47 +4,29 @@ pub use catalog_set::*;
mod new_chunks_iterator;
pub use new_chunks_iterator::*;
use std::path::Path;
use std::fs::File;
use std::time::SystemTime;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, Error};
use proxmox_uuid::Uuid;
use proxmox_sys::{task_log, task_warn};
use proxmox_uuid::Uuid;
use pbs_config::tape_encryption_keys::load_key_configs;
use pbs_tape::{
TapeWrite,
sg_tape::tape_alert_flags_critical,
};
use pbs_datastore::{DataStore, SnapshotReader};
use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite};
use proxmox_rest_server::WorkerTask;
use crate::{
tape::{
TAPE_STATUS_DIR,
MAX_CHUNK_ARCHIVE_SIZE,
COMMIT_BLOCK_SIZE,
MediaPool,
MediaId,
MediaCatalog,
file_formats::{
MediaSetLabel,
ChunkArchiveWriter,
tape_write_snapshot_archive,
tape_write_catalog,
},
drive::{
TapeDriver,
request_and_load_media,
media_changer,
},
use crate::tape::{
drive::{media_changer, request_and_load_media, TapeDriver},
file_formats::{
tape_write_catalog, tape_write_snapshot_archive, ChunkArchiveWriter, MediaSetLabel,
},
MediaCatalog, MediaId, MediaPool, COMMIT_BLOCK_SIZE, MAX_CHUNK_ARCHIVE_SIZE, TAPE_STATUS_DIR,
};
struct PoolWriterState {
drive: Box<dyn TapeDriver>,
// Media Uuid from loaded media
@ -65,7 +47,6 @@ pub struct PoolWriter {
}
impl PoolWriter {
pub fn new(
mut pool: MediaPool,
drive_name: &str,
@ -73,16 +54,11 @@ impl PoolWriter {
notify_email: Option<String>,
force_media_set: bool,
) -> Result<Self, Error> {
let current_time = proxmox_time::epoch_i64();
let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?;
if let Some(reason) = new_media_set_reason {
task_log!(
worker,
"starting new media set - reason: {}",
reason,
);
task_log!(worker, "starting new media set - reason: {}", reason,);
}
let media_set_uuid = pool.current_media_set().uuid();
@ -93,12 +69,8 @@ impl PoolWriter {
// load all catalogs read-only at start
for media_uuid in pool.current_media_list()? {
let media_info = pool.lookup_media(media_uuid).unwrap();
let media_catalog = MediaCatalog::open(
Path::new(TAPE_STATUS_DIR),
media_info.id(),
false,
false,
)?;
let media_catalog =
MediaCatalog::open(Path::new(TAPE_STATUS_DIR), media_info.id(), false, false)?;
catalog_set.append_read_only_catalog(media_catalog)?;
}
@ -108,7 +80,7 @@ impl PoolWriter {
status: None,
catalog_set: Arc::new(Mutex::new(catalog_set)),
notify_email,
})
})
}
pub fn pool(&mut self) -> &mut MediaPool {
@ -122,7 +94,10 @@ impl PoolWriter {
}
pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool {
self.catalog_set.lock().unwrap().contains_snapshot(store, snapshot)
self.catalog_set
.lock()
.unwrap()
.contains_snapshot(store, snapshot)
}
/// Eject media and drop PoolWriterState (close drive)
@ -155,7 +130,6 @@ impl PoolWriter {
let (drive_config, _digest) = pbs_config::drive::config()?;
if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? {
if let Some(ref mut status) = status {
task_log!(worker, "rewind media");
// rewind first so that the unload command later does not run into a timeout
@ -167,14 +141,25 @@ impl PoolWriter {
let media = self.pool.lookup_media(media_uuid)?;
let label_text = media.label_text();
if let Some(slot) = changer.export_media(label_text)? {
task_log!(worker, "exported media '{}' to import/export slot {}", label_text, slot);
task_log!(
worker,
"exported media '{}' to import/export slot {}",
label_text,
slot
);
} else {
task_warn!(worker, "export failed - media '{}' is not online or in different drive", label_text);
task_warn!(
worker,
"export failed - media '{}' is not online or in different drive",
label_text
);
}
}
} else if let Some(mut status) = status {
task_log!(worker, "standalone drive - ejecting media instead of export");
task_log!(
worker,
"standalone drive - ejecting media instead of export"
);
status.drive.eject_media()?;
}
@ -186,7 +171,7 @@ impl PoolWriter {
/// This is done automatically during a backupsession, but needs to
/// be called explicitly before dropping the PoolWriter
pub fn commit(&mut self) -> Result<(), Error> {
if let Some(PoolWriterState {ref mut drive, .. }) = self.status {
if let Some(PoolWriterState { ref mut drive, .. }) = self.status {
drive.sync()?; // sync all data to the tape
}
self.catalog_set.lock().unwrap().commit()?; // then commit the catalog
@ -196,7 +181,7 @@ impl PoolWriter {
/// Load a writable media into the drive
pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result<Uuid, Error> {
let last_media_uuid = match self.status {
Some(PoolWriterState { ref media_uuid, ..}) => Some(media_uuid.clone()),
Some(PoolWriterState { ref media_uuid, .. }) => Some(media_uuid.clone()),
None => None,
};
@ -214,9 +199,13 @@ impl PoolWriter {
return Ok(media_uuid);
}
task_log!(worker, "allocated new writable media '{}'", media.label_text());
task_log!(
worker,
"allocated new writable media '{}'",
media.label_text()
);
if let Some(PoolWriterState {mut drive, .. }) = self.status.take() {
if let Some(PoolWriterState { mut drive, .. }) = self.status.take() {
if last_media_uuid.is_some() {
task_log!(worker, "eject current media");
drive.eject_media()?;
@ -225,8 +214,13 @@ impl PoolWriter {
let (drive_config, _digest) = pbs_config::drive::config()?;
let (mut drive, old_media_id) =
request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?;
let (mut drive, old_media_id) = request_and_load_media(
worker,
&drive_config,
&self.drive_name,
media.label(),
&self.notify_email,
)?;
// test for critical tape alert flags
if let Ok(alert_flags) = drive.tape_alert_flags() {
@ -234,7 +228,10 @@ impl PoolWriter {
task_log!(worker, "TapeAlertFlags: {:?}", alert_flags);
if tape_alert_flags_critical(alert_flags) {
self.pool.set_media_status_damaged(&media_uuid)?;
bail!("aborting due to critical tape alert flags: {:?}", alert_flags);
bail!(
"aborting due to critical tape alert flags: {:?}",
alert_flags
);
}
}
}
@ -273,15 +270,12 @@ impl PoolWriter {
}
fn open_catalog_file(uuid: &Uuid) -> Result<File, Error> {
let status_path = Path::new(TAPE_STATUS_DIR);
let mut path = status_path.to_owned();
path.push(uuid.to_string());
path.set_extension("log");
let file = std::fs::OpenOptions::new()
.read(true)
.open(&path)?;
let file = std::fs::OpenOptions::new().read(true).open(&path)?;
Ok(file)
}
@ -289,11 +283,7 @@ impl PoolWriter {
// Check it tape is loaded, then move to EOM (if not already there)
//
// Returns the tape position at EOM.
fn prepare_tape_write(
status: &mut PoolWriterState,
worker: &WorkerTask,
) -> Result<u64, Error> {
fn prepare_tape_write(status: &mut PoolWriterState, worker: &WorkerTask) -> Result<u64, Error> {
if !status.at_eom {
task_log!(worker, "moving to end of media");
status.drive.move_to_eom(true)?;
@ -302,7 +292,10 @@ impl PoolWriter {
let current_file_number = status.drive.current_file_number()?;
if current_file_number < 2 {
bail!("got strange file position number from drive ({})", current_file_number);
bail!(
"got strange file position number from drive ({})",
current_file_number
);
}
Ok(current_file_number)
@ -315,11 +308,7 @@ impl PoolWriter {
/// on the media (return value 'Ok(false, _)'). In that case, the
/// archive is marked incomplete. The caller should mark the media
/// as full and try again using another media.
pub fn append_catalog_archive(
&mut self,
worker: &WorkerTask,
) -> Result<bool, Error> {
pub fn append_catalog_archive(&mut self, worker: &WorkerTask) -> Result<bool, Error> {
let status = match self.status {
Some(ref mut status) => status,
None => bail!("PoolWriter - no media loaded"),
@ -354,30 +343,21 @@ impl PoolWriter {
let mut file = Self::open_catalog_file(uuid)?;
let done = tape_write_catalog(
writer.as_mut(),
uuid,
media_set.uuid(),
seq_nr,
&mut file,
)?.is_some();
let done = tape_write_catalog(writer.as_mut(), uuid, media_set.uuid(), seq_nr, &mut file)?
.is_some();
Ok(done)
}
// Append catalogs for all previous media in set (without last)
fn append_media_set_catalogs(
&mut self,
worker: &WorkerTask,
) -> Result<(), Error> {
fn append_media_set_catalogs(&mut self, worker: &WorkerTask) -> Result<(), Error> {
let media_set = self.pool.current_media_set();
let mut media_list = &media_set.media_list()[..];
if media_list.len() < 2 {
return Ok(());
}
media_list = &media_list[..(media_list.len()-1)];
media_list = &media_list[..(media_list.len() - 1)];
let status = match self.status {
Some(ref mut status) => status,
@ -387,7 +367,6 @@ impl PoolWriter {
Self::prepare_tape_write(status, worker)?;
for (seq_nr, uuid) in media_list.iter().enumerate() {
let uuid = match uuid {
None => bail!("got incomplete media list - internal error"),
Some(uuid) => uuid,
@ -399,13 +378,9 @@ impl PoolWriter {
task_log!(worker, "write catalog for previous media: {}", uuid);
if tape_write_catalog(
writer.as_mut(),
uuid,
media_set.uuid(),
seq_nr,
&mut file,
)?.is_none() {
if tape_write_catalog(writer.as_mut(), uuid, media_set.uuid(), seq_nr, &mut file)?
.is_none()
{
bail!("got EOM while writing start catalog");
}
}
@ -428,7 +403,6 @@ impl PoolWriter {
worker: &WorkerTask,
snapshot_reader: &SnapshotReader,
) -> Result<(bool, usize), Error> {
let status = match self.status {
Some(ref mut status) => status,
None => bail!("PoolWriter - no media loaded"),
@ -474,7 +448,6 @@ impl PoolWriter {
chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
store: &str,
) -> Result<(bool, usize), Error> {
let status = match self.status {
Some(ref mut status) => status,
None => bail!("PoolWriter - no media loaded"),
@ -486,30 +459,29 @@ impl PoolWriter {
let start_time = SystemTime::now();
let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive(
worker,
writer,
chunk_iter,
store,
MAX_CHUNK_ARCHIVE_SIZE,
)?;
let (saved_chunks, content_uuid, leom, bytes_written) =
write_chunk_archive(worker, writer, chunk_iter, store, MAX_CHUNK_ARCHIVE_SIZE)?;
status.bytes_written += bytes_written;
let elapsed = start_time.elapsed()?.as_secs_f64();
let elapsed = start_time.elapsed()?.as_secs_f64();
task_log!(
worker,
"wrote {} chunks ({:.2} MB at {:.2} MB/s)",
saved_chunks.len(),
bytes_written as f64 /1_000_000.0,
(bytes_written as f64)/(1_000_000.0*elapsed),
bytes_written as f64 / 1_000_000.0,
(bytes_written as f64) / (1_000_000.0 * elapsed),
);
let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE;
// register chunks in media_catalog
self.catalog_set.lock().unwrap()
.register_chunk_archive(content_uuid, current_file_number, store, &saved_chunks)?;
self.catalog_set.lock().unwrap().register_chunk_archive(
content_uuid,
current_file_number,
store,
&saved_chunks,
)?;
if leom || request_sync {
self.commit()?;
@ -523,11 +495,7 @@ impl PoolWriter {
datastore: Arc<DataStore>,
snapshot_reader: Arc<Mutex<SnapshotReader>>,
) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
NewChunksIterator::spawn(
datastore,
snapshot_reader,
Arc::clone(&self.catalog_set),
)
NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
}
}
@ -538,12 +506,11 @@ fn write_chunk_archive<'a>(
chunk_iter: &mut std::iter::Peekable<NewChunksIterator>,
store: &str,
max_size: usize,
) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> {
) -> Result<(Vec<[u8; 32]>, Uuid, bool, usize), Error> {
let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?;
// we want to get the chunk list in correct order
let mut chunk_list: Vec<[u8;32]> = Vec::new();
let mut chunk_list: Vec<[u8; 32]> = Vec::new();
let mut leom = false;
@ -589,7 +556,6 @@ fn update_media_set_label(
old_set: Option<MediaSetLabel>,
media_id: &MediaId,
) -> Result<(MediaCatalog, bool), Error> {
let media_catalog;
let new_set = match media_id.media_set_label {
@ -602,7 +568,10 @@ fn update_media_set_label(
match config_map.get(fingerprint) {
Some(key_config) => Some(key_config.clone()),
None => {
bail!("unable to find tape encryption key config '{}'", fingerprint);
bail!(
"unable to find tape encryption key config '{}'",
fingerprint
);
}
}
} else {
@ -621,10 +590,14 @@ fn update_media_set_label(
Some(media_set_label) => {
if new_set.uuid == media_set_label.uuid {
if new_set.seq_nr != media_set_label.seq_nr {
bail!("got media with wrong media sequence number ({} != {}",
new_set.seq_nr,media_set_label.seq_nr);
bail!(
"got media with wrong media sequence number ({} != {}",
new_set.seq_nr,
media_set_label.seq_nr
);
}
if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint {
if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint
{
bail!("detected changed encryption fingerprint - internal error");
}
media_catalog = MediaCatalog::open(status_path, media_id, true, false)?;

View File

@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex};
use anyhow::{format_err, Error};
use pbs_datastore::{DataStore, DataBlob, SnapshotReader};
use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
use crate::tape::CatalogSet;
@ -16,7 +16,6 @@ pub struct NewChunksIterator {
}
impl NewChunksIterator {
/// Creates the iterator, spawning a new thread
///
/// Make sure to join() the returnd thread handle.
@ -25,19 +24,16 @@ impl NewChunksIterator {
snapshot_reader: Arc<Mutex<SnapshotReader>>,
catalog_set: Arc<Mutex<CatalogSet>>,
) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
let (tx, rx) = std::sync::mpsc::sync_channel(3);
let reader_thread = std::thread::spawn(move || {
let snapshot_reader = snapshot_reader.lock().unwrap();
let mut chunk_index: HashSet<[u8;32]> = HashSet::new();
let mut chunk_index: HashSet<[u8; 32]> = HashSet::new();
let datastore_name = snapshot_reader.datastore_name().to_string();
let result: Result<(), Error> = proxmox_lang::try_block!({
let mut chunk_iter = snapshot_reader.chunk_iterator(move |digest| {
catalog_set
.lock()
@ -61,7 +57,7 @@ impl NewChunksIterator {
let blob = datastore.load_chunk(&digest)?;
//println!("LOAD CHUNK {}", hex::encode(&digest));
match tx.send(Ok(Some((digest, blob)))) {
Ok(()) => {},
Ok(()) => {}
Err(err) => {
eprintln!("could not send chunk to reader thread: {}", err);
break;