tape: add PoolWriter
This commit is contained in:
		| @ -45,6 +45,9 @@ pub use chunk_archive::*; | ||||
| mod snapshot_archive; | ||||
| pub use snapshot_archive::*; | ||||
|  | ||||
| mod pool_writer; | ||||
| pub use pool_writer::*; | ||||
|  | ||||
| /// Directory path where we store all tape status information | ||||
| pub const TAPE_STATUS_DIR: &str = "/var/lib/proxmox-backup/tape"; | ||||
|  | ||||
|  | ||||
							
								
								
									
										392
									
								
								src/tape/pool_writer.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										392
									
								
								src/tape/pool_writer.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,392 @@ | ||||
| use std::collections::HashSet; | ||||
| use std::path::Path; | ||||
|  | ||||
| use anyhow::{bail, Error}; | ||||
|  | ||||
| use proxmox::{ | ||||
|     tools::Uuid, | ||||
|     api::section_config::SectionConfigData, | ||||
| }; | ||||
|  | ||||
| use crate::{ | ||||
|     backup::{ | ||||
|         DataStore, | ||||
|     }, | ||||
|     tape::{ | ||||
|         TAPE_STATUS_DIR, | ||||
|         MAX_CHUNK_ARCHIVE_SIZE, | ||||
|         COMMIT_BLOCK_SIZE, | ||||
|         TapeDriver, | ||||
|         TapeWrite, | ||||
|         ChunkArchiveWriter, | ||||
|         SnapshotReader, | ||||
|         SnapshotChunkIterator, | ||||
|         MediaPool, | ||||
|         MediaId, | ||||
|         MediaCatalog, | ||||
|         MediaSetCatalog, | ||||
|         tape_write_snapshot_archive, | ||||
|         request_and_load_media, | ||||
|     }, | ||||
| }; | ||||
|  | ||||
|  | ||||
| struct PoolWriterState { | ||||
|     drive: Box<dyn TapeDriver>, | ||||
|     catalog: MediaCatalog, | ||||
|     // tell if we already moved to EOM | ||||
|     at_eom: bool, | ||||
|     // bytes written after the last tape fush/sync | ||||
|     bytes_written: usize, | ||||
| } | ||||
|  | ||||
| impl PoolWriterState { | ||||
|  | ||||
|     fn commit(&mut self) -> Result<(), Error> { | ||||
|         self.drive.sync()?; // sync all data to the tape | ||||
|         self.catalog.commit()?; // then commit the catalog | ||||
|         self.bytes_written = 0; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Helper to manage a backup job, writing several tapes of a pool | ||||
| pub struct PoolWriter { | ||||
|     pool: MediaPool, | ||||
|     drive_name: String, | ||||
|     status: Option<PoolWriterState>, | ||||
|     media_set_catalog: MediaSetCatalog, | ||||
| } | ||||
|  | ||||
| impl PoolWriter { | ||||
|  | ||||
|     pub fn new(mut pool: MediaPool, drive_name: &str) -> Result<Self, Error> { | ||||
|  | ||||
|         let current_time = proxmox::tools::time::epoch_i64(); | ||||
|  | ||||
|         pool.start_write_session(current_time)?; | ||||
|  | ||||
|         let mut media_set_catalog = MediaSetCatalog::new(); | ||||
|  | ||||
|         // load all catalogs read-only at start | ||||
|         for media_uuid in pool.current_media_list()? { | ||||
|             let media_catalog = MediaCatalog::open( | ||||
|                 Path::new(TAPE_STATUS_DIR), | ||||
|                 &media_uuid, | ||||
|                 false, | ||||
|                 false, | ||||
|             )?; | ||||
|             media_set_catalog.append_catalog(media_catalog)?; | ||||
|         } | ||||
|  | ||||
|         Ok(Self { | ||||
|             pool, | ||||
|             drive_name: drive_name.to_string(), | ||||
|             status: None, | ||||
|             media_set_catalog, | ||||
|          }) | ||||
|     } | ||||
|  | ||||
|     pub fn pool(&mut self) -> &mut MediaPool { | ||||
|         &mut self.pool | ||||
|     } | ||||
|  | ||||
|     /// Set media status to FULL (persistent - stores pool status) | ||||
|     pub fn set_media_status_full(&mut self, uuid: &Uuid) -> Result<(), Error> { | ||||
|         self.pool.set_media_status_full(&uuid)?; | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn contains_snapshot(&self, snapshot: &str) -> bool { | ||||
|         if let Some(PoolWriterState { ref catalog, .. }) = self.status { | ||||
|             if catalog.contains_snapshot(snapshot) { | ||||
|                 return true; | ||||
|             } | ||||
|         } | ||||
|         self.media_set_catalog.contains_snapshot(snapshot) | ||||
|     } | ||||
|  | ||||
|     /// commit changes to tape and catalog | ||||
|     /// | ||||
|     /// This is done automatically during a backupsession, but needs to | ||||
|     /// be called explicitly before dropping the PoolWriter | ||||
|     pub fn commit(&mut self) -> Result<(), Error> { | ||||
|         if let Some(ref mut status) = self.status { | ||||
|             status.commit()?; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Load a writable media into the drive | ||||
|     pub fn load_writable_media(&mut self) -> Result<Uuid, Error> { | ||||
|         let last_media_uuid = match self.status { | ||||
|             Some(PoolWriterState { ref catalog, .. }) => Some(catalog.uuid().clone()), | ||||
|             None => None, | ||||
|         }; | ||||
|  | ||||
|         let current_time = proxmox::tools::time::epoch_i64(); | ||||
|         let media_uuid = self.pool.alloc_writable_media(current_time)?; | ||||
|  | ||||
|         let media = self.pool.lookup_media(&media_uuid).unwrap(); | ||||
|  | ||||
|         let media_changed = match last_media_uuid { | ||||
|             Some(ref last_media_uuid) => last_media_uuid != &media_uuid, | ||||
|             None => true, | ||||
|         }; | ||||
|  | ||||
|         if !media_changed { | ||||
|             return Ok(media_uuid); | ||||
|         } | ||||
|  | ||||
|         // remove read-only catalog (we store a writable version in status) | ||||
|         self.media_set_catalog.remove_catalog(&media_uuid); | ||||
|  | ||||
|         if let Some(PoolWriterState {mut drive, catalog, .. }) = self.status.take() { | ||||
|             self.media_set_catalog.append_catalog(catalog)?; | ||||
|             drive.eject_media()?; | ||||
|         } | ||||
|  | ||||
|         let (drive_config, _digest) = crate::config::drive::config()?; | ||||
|         let (drive, catalog) = drive_load_and_label_media(&drive_config, &self.drive_name, &media.id())?; | ||||
|         self.status = Some(PoolWriterState { drive, catalog, at_eom: false, bytes_written: 0 }); | ||||
|  | ||||
|         Ok(media_uuid) | ||||
|     } | ||||
|  | ||||
|     /// uuid of currently loaded BackupMedia | ||||
|     pub fn current_media_uuid(&self) -> Result<&Uuid, Error> { | ||||
|         match self.status { | ||||
|             Some(PoolWriterState { ref catalog, ..}) => Ok(catalog.uuid()), | ||||
|             None => bail!("PoolWriter - no media loaded"), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Move to EOM (if not aleady there), then creates a new snapshot | ||||
|     /// archive writing specified files (as .pxar) into it. On | ||||
|     /// success, this return 'Ok(true)' and the media catalog gets | ||||
|     /// updated. | ||||
|  | ||||
|     /// Please note that this may fail when there is not enough space | ||||
|     /// on the media (return value 'Ok(false, _)'). In that case, the | ||||
|     /// archive is marked incomplete, and we do not use it. The caller | ||||
|     /// should mark the media as full and try again using another | ||||
|     /// media. | ||||
|     pub fn append_snapshot_archive( | ||||
|         &mut self, | ||||
|         snapshot_reader: &SnapshotReader, | ||||
|     ) -> Result<(bool, usize), Error> { | ||||
|  | ||||
|         let status = match self.status { | ||||
|             Some(ref mut status) => status, | ||||
|             None => bail!("PoolWriter - no media loaded"), | ||||
|         }; | ||||
|  | ||||
|         if !status.at_eom { | ||||
|             status.drive.move_to_eom()?; | ||||
|             status.at_eom = true; | ||||
|         } | ||||
|  | ||||
|         let current_file_number = status.drive.current_file_number()?; | ||||
|         if current_file_number < 2 { | ||||
|             bail!("got strange file position number from drive ({})", current_file_number); | ||||
|         } | ||||
|  | ||||
|         let (done, bytes_written) = { | ||||
|             let mut writer: Box<dyn TapeWrite> = status.drive.write_file()?; | ||||
|  | ||||
|             match tape_write_snapshot_archive(writer.as_mut(), snapshot_reader)? { | ||||
|                 Some(content_uuid) => { | ||||
|                     status.catalog.register_snapshot( | ||||
|                         content_uuid, | ||||
|                         current_file_number, | ||||
|                         &snapshot_reader.snapshot().to_string(), | ||||
|                     )?; | ||||
|                     (true, writer.bytes_written()) | ||||
|                 } | ||||
|                 None => (false, writer.bytes_written()), | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         status.bytes_written += bytes_written; | ||||
|  | ||||
|         let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false }; | ||||
|  | ||||
|         if !done || request_sync { | ||||
|             status.commit()?; | ||||
|         } | ||||
|  | ||||
|         Ok((done, bytes_written)) | ||||
|     } | ||||
|  | ||||
|     /// Move to EOM (if not aleady there), then creates a new chunk | ||||
|     /// archive and writes chunks from 'chunk_iter'. This stops when | ||||
|     /// it detect LEOM or when we reach max archive size | ||||
|     /// (4GB). Written chunks are registered in the media catalog. | ||||
|     pub fn append_chunk_archive( | ||||
|         &mut self, | ||||
|         datastore: &DataStore, | ||||
|         chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>, | ||||
|     ) -> Result<(bool, usize), Error> { | ||||
|  | ||||
|         let status = match self.status { | ||||
|             Some(ref mut status) => status, | ||||
|             None => bail!("PoolWriter - no media loaded"), | ||||
|         }; | ||||
|  | ||||
|         if !status.at_eom { | ||||
|             status.drive.move_to_eom()?; | ||||
|             status.at_eom = true; | ||||
|         } | ||||
|  | ||||
|         let current_file_number = status.drive.current_file_number()?; | ||||
|         if current_file_number < 2 { | ||||
|             bail!("got strange file position number from drive ({})", current_file_number); | ||||
|         } | ||||
|         let writer = status.drive.write_file()?; | ||||
|  | ||||
|         let (saved_chunks, content_uuid, leom, bytes_written) = write_chunk_archive( | ||||
|             writer, | ||||
|             datastore, | ||||
|             chunk_iter, | ||||
|             &self.media_set_catalog, | ||||
|             &status.catalog, | ||||
|             MAX_CHUNK_ARCHIVE_SIZE, | ||||
|         )?; | ||||
|  | ||||
|         status.bytes_written += bytes_written; | ||||
|  | ||||
|         let request_sync = if status.bytes_written >= COMMIT_BLOCK_SIZE { true } else { false }; | ||||
|  | ||||
|         // register chunks in media_catalog | ||||
|         status.catalog.start_chunk_archive(content_uuid, current_file_number)?; | ||||
|         for digest in saved_chunks { | ||||
|             status.catalog.register_chunk(&digest)?; | ||||
|         } | ||||
|         status.catalog.end_chunk_archive()?; | ||||
|  | ||||
|         if leom || request_sync { | ||||
|             status.commit()?; | ||||
|         } | ||||
|  | ||||
|         Ok((leom, bytes_written)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// write up to <max_size> of chunks | ||||
| fn write_chunk_archive<'a>( | ||||
|     writer: Box<dyn 'a + TapeWrite>, | ||||
|     datastore: &DataStore, | ||||
|     chunk_iter: &mut std::iter::Peekable<SnapshotChunkIterator>, | ||||
|     media_set_catalog: &MediaSetCatalog, | ||||
|     media_catalog: &MediaCatalog, | ||||
|     max_size: usize, | ||||
| ) -> Result<(Vec<[u8;32]>, Uuid, bool, usize), Error> { | ||||
|  | ||||
|     let (mut writer, content_uuid) = ChunkArchiveWriter::new(writer, true)?; | ||||
|  | ||||
|     let mut chunk_index: HashSet<[u8;32]> = HashSet::new(); | ||||
|  | ||||
|     // we want to get the chunk list in correct order | ||||
|     let mut chunk_list: Vec<[u8;32]> = Vec::new(); | ||||
|  | ||||
|     let mut leom = false; | ||||
|  | ||||
|     loop { | ||||
|         let digest = match chunk_iter.next() { | ||||
|             None => break, | ||||
|             Some(digest) => digest?, | ||||
|         }; | ||||
|         if media_catalog.contains_chunk(&digest) | ||||
|             || chunk_index.contains(&digest) | ||||
|             || media_set_catalog.contains_chunk(&digest) | ||||
|         { | ||||
|             continue; | ||||
|         } | ||||
|  | ||||
|         let blob = datastore.load_chunk(&digest)?; | ||||
|         println!("CHUNK {} size {}", proxmox::tools::digest_to_hex(&digest), blob.raw_size()); | ||||
|  | ||||
|         match writer.try_write_chunk(&digest, &blob) { | ||||
|             Ok(true) => { | ||||
|                 chunk_index.insert(digest); | ||||
|                 chunk_list.push(digest); | ||||
|             } | ||||
|             Ok(false) => { | ||||
|                 leom = true; | ||||
|                 break; | ||||
|             } | ||||
|             Err(err) => bail!("write chunk failed - {}", err), | ||||
|         } | ||||
|  | ||||
|         if writer.bytes_written() > max_size { | ||||
|             println!("Chunk Archive max size reached, closing archive"); | ||||
|             break; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     writer.finish()?; | ||||
|  | ||||
|     Ok((chunk_list, content_uuid, leom, writer.bytes_written())) | ||||
| } | ||||
|  | ||||
| // Requests and load 'media' into the drive. Then compare the media | ||||
| // set label. If the tabe is empty, or the existing set label does not | ||||
| // match the expected media set, overwrite the media set label. | ||||
| fn drive_load_and_label_media( | ||||
|     drive_config: &SectionConfigData, | ||||
|     drive_name: &str, | ||||
|     media_id: &MediaId, | ||||
| ) -> Result<(Box<dyn TapeDriver>, MediaCatalog), Error> { | ||||
|  | ||||
|     let (mut tmp_drive, info) = | ||||
|         request_and_load_media(&drive_config, &drive_name, &media_id.label)?; | ||||
|  | ||||
|     let media_catalog; | ||||
|  | ||||
|     let new_set = match media_id.media_set_label { | ||||
|         None => { | ||||
|             bail!("got media without media set - internal error"); | ||||
|         } | ||||
|         Some(ref set) => set, | ||||
|     }; | ||||
|  | ||||
|     let status_path = Path::new(TAPE_STATUS_DIR); | ||||
|  | ||||
|     match &info.media_set_label { | ||||
|         None => { | ||||
|             println!("wrinting new media set label"); | ||||
|             tmp_drive.write_media_set_label(new_set)?; | ||||
|  | ||||
|             let info = MediaId { | ||||
|                 label: info.label, | ||||
|                 media_set_label: Some(new_set.clone()), | ||||
|             }; | ||||
|             media_catalog = MediaCatalog::overwrite(status_path, &info, true)?; | ||||
|         } | ||||
|         Some(media_set_label) => { | ||||
|             if new_set.uuid == media_set_label.uuid { | ||||
|                 if new_set.seq_nr != media_set_label.seq_nr { | ||||
|                     bail!("got media with wrong media sequence number ({} != {}", | ||||
|                           new_set.seq_nr,media_set_label.seq_nr); | ||||
|                 } | ||||
|                 media_catalog = MediaCatalog::open(status_path, &media_id.label.uuid, true, false)?; | ||||
|             } else { | ||||
|                 println!("wrinting new media set label (overwrite '{}/{}')", | ||||
|                          media_set_label.uuid.to_string(), media_set_label.seq_nr); | ||||
|  | ||||
|                 tmp_drive.write_media_set_label(new_set)?; | ||||
|  | ||||
|                 let info = MediaId { | ||||
|                     label: info.label, | ||||
|                     media_set_label: Some(new_set.clone()), | ||||
|                 }; | ||||
|                 media_catalog = MediaCatalog::overwrite(status_path, &info, true)?; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // todo: verify last content/media_catalog somehow? | ||||
|     tmp_drive.move_to_eom()?; | ||||
|  | ||||
|     Ok((tmp_drive, media_catalog)) | ||||
| } | ||||
		Reference in New Issue
	
	Block a user