mod catalog_set; pub use catalog_set::*; mod new_chunks_iterator; pub use new_chunks_iterator::*; use std::path::Path; use std::fs::File; use std::time::SystemTime; use std::sync::{Arc, Mutex}; use anyhow::{bail, Error}; use proxmox::tools::Uuid; use pbs_datastore::task_log; use pbs_config::tape_encryption_keys::load_key_configs; use pbs_tape::{ TapeWrite, sg_tape::tape_alert_flags_critical, }; use crate::{ backup::{ DataStore, }, server::WorkerTask, tape::{ TAPE_STATUS_DIR, MAX_CHUNK_ARCHIVE_SIZE, COMMIT_BLOCK_SIZE, SnapshotReader, MediaPool, MediaId, MediaCatalog, file_formats::{ MediaSetLabel, ChunkArchiveWriter, tape_write_snapshot_archive, tape_write_catalog, }, drive::{ TapeDriver, request_and_load_media, media_changer, }, }, }; struct PoolWriterState { drive: Box, // Media Uuid from loaded media media_uuid: Uuid, // tell if we already moved to EOM at_eom: bool, // bytes written after the last tape fush/sync bytes_written: usize, } /// Helper to manage a backup job, writing several tapes of a pool pub struct PoolWriter { pool: MediaPool, drive_name: String, status: Option, catalog_set: Arc>, notify_email: Option, } impl PoolWriter { pub fn new( mut pool: MediaPool, drive_name: &str, worker: &WorkerTask, notify_email: Option, force_media_set: bool, ) -> Result { let current_time = proxmox::tools::time::epoch_i64(); let new_media_set_reason = pool.start_write_session(current_time, force_media_set)?; if let Some(reason) = new_media_set_reason { task_log!( worker, "starting new media set - reason: {}", reason, ); } let media_set_uuid = pool.current_media_set().uuid(); task_log!(worker, "media set uuid: {}", media_set_uuid); let mut catalog_set = CatalogSet::new(); // load all catalogs read-only at start for media_uuid in pool.current_media_list()? { let media_info = pool.lookup_media(media_uuid).unwrap(); let media_catalog = MediaCatalog::open( Path::new(TAPE_STATUS_DIR), media_info.id(), false, false, )?; catalog_set.append_read_only_catalog(media_catalog)?; } Ok(Self { pool, drive_name: drive_name.to_string(), status: None, catalog_set: Arc::new(Mutex::new(catalog_set)), notify_email, }) } pub fn pool(&mut self) -> &mut MediaPool { &mut self.pool } /// Set media status to FULL (persistent - stores pool status) pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> { self.pool.set_media_status_full(&uuid)?; Ok(()) } pub fn contains_snapshot(&self, store: &str, snapshot: &str) -> bool { self.catalog_set.lock().unwrap().contains_snapshot(store, snapshot) } /// Eject media and drop PoolWriterState (close drive) pub fn eject_media(&mut self, worker: &WorkerTask) -> Result<(), Error> { let mut status = match self.status.take() { Some(status) => status, None => return Ok(()), // no media loaded }; let (drive_config, _digest) = pbs_config::drive::config()?; if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? { worker.log("eject media"); status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster drop(status); // close drive worker.log("unload media"); changer.unload_media(None)?; //eject and unload } else { worker.log("standalone drive - ejecting media"); status.drive.eject_media()?; } Ok(()) } /// Export current media set and drop PoolWriterState (close drive) pub fn export_media_set(&mut self, worker: &WorkerTask) -> Result<(), Error> { let mut status = self.status.take(); let (drive_config, _digest) = pbs_config::drive::config()?; if let Some((mut changer, _)) = media_changer(&drive_config, &self.drive_name)? { if let Some(ref mut status) = status { worker.log("eject media"); status.drive.eject_media()?; // rewind and eject early, so that unload_media is faster } drop(status); // close drive worker.log("unload media"); changer.unload_media(None)?; for media_uuid in self.pool.current_media_list()? { let media = self.pool.lookup_media(media_uuid)?; let label_text = media.label_text(); if let Some(slot) = changer.export_media(label_text)? { worker.log(format!("exported media '{}' to import/export slot {}", label_text, slot)); } else { worker.warn(format!("export failed - media '{}' is not online", label_text)); } } } else if let Some(mut status) = status { worker.log("standalone drive - ejecting media instead of export"); status.drive.eject_media()?; } Ok(()) } /// commit changes to tape and catalog /// /// This is done automatically during a backupsession, but needs to /// be called explicitly before dropping the PoolWriter pub fn commit(&mut self) -> Result<(), Error> { if let Some(PoolWriterState {ref mut drive, .. }) = self.status { drive.sync()?; // sync all data to the tape } self.catalog_set.lock().unwrap().commit()?; // then commit the catalog Ok(()) } /// Load a writable media into the drive pub fn load_writable_media(&mut self, worker: &WorkerTask) -> Result { let last_media_uuid = match self.status { Some(PoolWriterState { ref media_uuid, ..}) => Some(media_uuid.clone()), None => None, }; let current_time = proxmox::tools::time::epoch_i64(); let media_uuid = self.pool.alloc_writable_media(current_time)?; let media = self.pool.lookup_media(&media_uuid).unwrap(); let media_changed = match last_media_uuid { Some(ref last_media_uuid) => last_media_uuid != &media_uuid, None => true, }; if !media_changed { return Ok(media_uuid); } task_log!(worker, "allocated new writable media '{}'", media.label_text()); if let Some(PoolWriterState {mut drive, .. }) = self.status.take() { if last_media_uuid.is_some() { task_log!(worker, "eject current media"); drive.eject_media()?; } } let (drive_config, _digest) = pbs_config::drive::config()?; let (mut drive, old_media_id) = request_and_load_media(worker, &drive_config, &self.drive_name, media.label(), &self.notify_email)?; // test for critical tape alert flags if let Ok(alert_flags) = drive.tape_alert_flags() { if !alert_flags.is_empty() { worker.log(format!("TapeAlertFlags: {:?}", alert_flags)); if tape_alert_flags_critical(alert_flags) { self.pool.set_media_status_damaged(&media_uuid)?; bail!("aborting due to critical tape alert flags: {:?}", alert_flags); } } } let (catalog, is_new_media) = update_media_set_label( worker, drive.as_mut(), old_media_id.media_set_label, media.id(), )?; self.catalog_set.lock().unwrap().append_catalog(catalog)?; let media_set = media.media_set_label().clone().unwrap(); let encrypt_fingerprint = media_set .encryption_key_fingerprint .clone() .map(|fp| (fp, media_set.uuid.clone())); drive.set_encryption(encrypt_fingerprint)?; self.status = Some(PoolWriterState { drive, media_uuid: media_uuid.clone(), at_eom: false, bytes_written: 0, }); if is_new_media { // add catalogs from previous media self.append_media_set_catalogs(worker)?; } Ok(media_uuid) } fn open_catalog_file(uuid: &Uuid) -> Result { let status_path = Path::new(TAPE_STATUS_DIR); let mut path = status_path.to_owned(); path.push(uuid.to_string()); path.set_extension("log"); let file = std::fs::OpenOptions::new() .read(true) .open(&path)?; Ok(file) } // Check it tape is loaded, then move to EOM (if not already there) // // Returns the tape position at EOM. fn prepare_tape_write( status: &mut PoolWriterState, worker: &WorkerTask, ) -> Result { if !status.at_eom { worker.log(String::from("moving to end of media")); status.drive.move_to_eom(true)?; status.at_eom = true; } let current_file_number = status.drive.current_file_number()?; if current_file_number < 2 { bail!("got strange file position number from drive ({})", current_file_number); } Ok(current_file_number) } /// Move to EOM (if not already there), then write the current /// catalog to the tape. On success, this return 'Ok(true)'. /// Please note that this may fail when there is not enough space /// on the media (return value 'Ok(false, _)'). In that case, the /// archive is marked incomplete. The caller should mark the media /// as full and try again using another media. pub fn append_catalog_archive( &mut self, worker: &WorkerTask, ) -> Result { let status = match self.status { Some(ref mut status) => status, None => bail!("PoolWriter - no media loaded"), }; Self::prepare_tape_write(status, worker)?; let catalog_set = self.catalog_set.lock().unwrap(); let catalog = match catalog_set.catalog { None => bail!("append_catalog_archive failed: no catalog - internal error"), Some(ref catalog) => catalog, }; let media_set = self.pool.current_media_set(); let media_list = media_set.media_list(); let uuid = match media_list.last() { None => bail!("got empty media list - internal error"), Some(None) => bail!("got incomplete media list - internal error"), Some(Some(last_uuid)) => { if last_uuid != catalog.uuid() { bail!("got wrong media - internal error"); } last_uuid } }; let seq_nr = media_list.len() - 1; let mut writer: Box = status.drive.write_file()?; let mut file = Self::open_catalog_file(uuid)?; let done = tape_write_catalog( writer.as_mut(), uuid, media_set.uuid(), seq_nr, &mut file, )?.is_some(); Ok(done) } // Append catalogs for all previous media in set (without last) fn append_media_set_catalogs( &mut self, worker: &WorkerTask, ) -> Result<(), Error> { let media_set = self.pool.current_media_set(); let mut media_list = &media_set.media_list()[..]; if media_list.len() < 2 { return Ok(()); } media_list = &media_list[..(media_list.len()-1)]; let status = match self.status { Some(ref mut status) => status, None => bail!("PoolWriter - no media loaded"), }; Self::prepare_tape_write(status, worker)?; for (seq_nr, uuid) in media_list.iter().enumerate() { let uuid = match uuid { None => bail!("got incomplete media list - internal error"), Some(uuid) => uuid, }; let mut writer: Box = status.drive.write_file()?; let mut file = Self::open_catalog_file(uuid)?; task_log!(worker, "write catalog for previous media: {}", uuid); if tape_write_catalog( writer.as_mut(), uuid, media_set.uuid(), seq_nr, &mut file, )?.is_none() { bail!("got EOM while writing start catalog"); } } Ok(()) } /// Move to EOM (if not already there), then creates a new snapshot /// archive writing specified files (as .pxar) into it. On /// success, this return 'Ok(true)' and the media catalog gets /// updated. /// Please note that this may fail when there is not enough space /// on the media (return value 'Ok(false, _)'). In that case, the /// archive is marked incomplete, and we do not use it. The caller /// should mark the media as full and try again using another /// media. pub fn append_snapshot_archive( &mut self, worker: &WorkerTask, snapshot_reader: &SnapshotReader, ) -> Result<(bool, usize), Error> { let status = match self.status { Some(ref mut status) => status, None => bail!("PoolWriter - no media loaded"), }; let current_file_number = Self::prepare_tape_write(status, worker)?; let (done, bytes_written) = { let mut writer: Box = status.drive.write_file()?; match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? { Some(content_uuid) => { self.catalog_set.lock().unwrap().register_snapshot( content_uuid, current_file_number, &snapshot_reader.datastore_name().to_string(), &snapshot_reader.snapshot().to_string(), )?; (true, writer.bytes_written()) } None => (false, writer.bytes_written()), } }; status.bytes_written += bytes_written; let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; if !done || request_sync { self.commit()?; } Ok((done, bytes_written)) } /// Move to EOM (if not already there), then creates a new chunk /// archive and writes chunks from 'chunk_iter'. This stops when /// it detect LEOM or when we reach max archive size /// (4GB). Written chunks are registered in the media catalog. pub fn append_chunk_archive( &mut self, worker: &WorkerTask, chunk_iter: &mut std::iter::Peekable, store: &str, ) -> Result<(bool, usize), Error> { let status = match self.status { Some(ref mut status) => status, None => bail!("PoolWriter - no media loaded"), }; let current_file_number = Self::prepare_tape_write(status, worker)?; let writer = status.drive.write_file()?; let start_time = SystemTime::now(); let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive( worker, writer, chunk_iter, store, MAX_CHUNK_ARCHIVE_SIZE, )?; status.bytes_written += bytes_written; let elapsed = start_time.elapsed()?.as_secs_f64(); worker.log(format!( "wrote {} chunks ({:.2} MB at {:.2} MB/s)", saved_chunks.len(), bytes_written as f64 /1_000_000.0, (bytes_written as f64)/(1_000_000.0*elapsed), )); let request_sync = status.bytes_written >= COMMIT_BLOCK_SIZE; // register chunks in media_catalog self.catalog_set.lock().unwrap() .register_chunk_archive(content_uuid, current_file_number, store, &saved_chunks)?; if leom || request_sync { self.commit()?; } Ok((leom, bytes_written)) } pub fn spawn_chunk_reader_thread( &self, datastore: Arc, snapshot_reader: Arc>, ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> { NewChunksIterator::spawn( datastore, snapshot_reader, Arc::clone(&self.catalog_set), ) } } /// write up to of chunks fn write_chunk_archive<'a>( _worker: &WorkerTask, writer: Box, chunk_iter: &mut std::iter::Peekable, store: &str, max_size: usize, ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> { let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, store, true)?; // we want to get the chunk list in correct order let mut chunk_list: Vec<[u8;32]> = Vec::new(); let mut leom = false; loop { let (digest, blob) = match chunk_iter.peek() { None => break, Some(Ok((digest, blob))) => (digest, blob), Some(Err(err)) => bail!("{}", err), }; //println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(digest), blob.raw_size()); match writer.try_write_chunk(&digest, &blob) { Ok(true) => { chunk_list.push(*digest); chunk_iter.next(); // consume } Ok(false) => { // Note; we do not consume the chunk (no chunk_iter.next()) leom = true; break; } Err(err) => bail!("write chunk failed - {}", err), } if writer.bytes_written() > max_size { //worker.log("Chunk Archive max size reached, closing archive".to_string()); break; } } writer.finish()?; Ok((chunk_list, content_uuid, leom, writer.bytes_written())) } // Compare the media set label. If the media is empty, or the existing // set label does not match the expected media set, overwrite the // media set label. fn update_media_set_label( worker: &WorkerTask, drive: &mut dyn TapeDriver, old_set: Option, media_id: &MediaId, ) -> Result<(MediaCatalog, bool), Error> { let media_catalog; let new_set = match media_id.media_set_label { None => bail!("got media without media set - internal error"), Some(ref set) => set, }; let key_config = if let Some(ref fingerprint) = new_set.encryption_key_fingerprint { let (config_map, _digest) = load_key_configs()?; match config_map.get(fingerprint) { Some(key_config) => Some(key_config.clone()), None => { bail!("unable to find tape encryption key config '{}'", fingerprint); } } } else { None }; let status_path = Path::new(TAPE_STATUS_DIR); let new_media = match old_set { None => { worker.log("writing new media set label".to_string()); drive.write_media_set_label(new_set, key_config.as_ref())?; media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?; true } Some(media_set_label) => { if new_set.uuid == media_set_label.uuid { if new_set.seq_nr != media_set_label.seq_nr { bail!("got media with wrong media sequence number ({} != {}", new_set.seq_nr,media_set_label.seq_nr); } if new_set.encryption_key_fingerprint != media_set_label.encryption_key_fingerprint { bail!("detected changed encryption fingerprint - internal error"); } media_catalog = MediaCatalog::open(status_path, &media_id, true, false)?; // todo: verify last content/media_catalog somehow? false } else { worker.log( format!("writing new media set label (overwrite '{}/{}')", media_set_label.uuid.to_string(), media_set_label.seq_nr) ); drive.write_media_set_label(new_set, key_config.as_ref())?; media_catalog = MediaCatalog::overwrite(status_path, media_id, false)?; true } } }; Ok((media_catalog, new_media)) }