tape: add BlockeReader/BlockedWriter streams
This is the basic format used to write data to tapes.
This commit is contained in:
		
							
								
								
									
										309
									
								
								src/tape/helpers/blocked_reader.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										309
									
								
								src/tape/helpers/blocked_reader.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,309 @@ | ||||
| use std::io::Read; | ||||
|  | ||||
| use crate::tape::{ | ||||
|     TapeRead, | ||||
|     tape_device_read_block, | ||||
|     file_formats::{ | ||||
|         PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0, | ||||
|         BlockHeader, | ||||
|         BlockHeaderFlags, | ||||
|     }, | ||||
| }; | ||||
|  | ||||
| /// Read a block stream generated by 'BlockWriter'. | ||||
| /// | ||||
| /// This class implements 'TapeRead'. It always read whole blocks from | ||||
| /// the underlying reader, and does additional error checks: | ||||
| /// | ||||
| /// - check magic number (detect streams not written by 'BlockWriter') | ||||
| /// - check block size | ||||
| /// - check block sequence numbers | ||||
| /// | ||||
| /// The reader consumes the EOF mark after the data stream (if read to | ||||
| /// the end of the stream). | ||||
| pub struct BlockedReader<R> { | ||||
|     reader: R, | ||||
|     buffer: Box<BlockHeader>, | ||||
|     seq_nr: u32, | ||||
|     found_end_marker: bool, | ||||
|     incomplete: bool, | ||||
|     got_eod: bool, | ||||
|     read_error: bool, | ||||
|     read_pos: usize, | ||||
| } | ||||
|  | ||||
| impl <R: Read> BlockedReader<R> { | ||||
|  | ||||
|     /// Create a new BlockedReader instance. | ||||
|     /// | ||||
|     /// This tries to read the first block, and returns None if we are | ||||
|     /// at EOT. | ||||
|     pub fn open(mut reader: R) -> Result<Option<Self>, std::io::Error> { | ||||
|  | ||||
|         let mut buffer = BlockHeader::new(); | ||||
|  | ||||
|         if !Self::read_block_frame(&mut buffer, &mut reader)? { | ||||
|             return Ok(None); | ||||
|         } | ||||
|  | ||||
|         let (_size, found_end_marker) = Self::check_buffer(&buffer, 0)?; | ||||
|  | ||||
|         let mut incomplete = false; | ||||
|         if found_end_marker { | ||||
|             incomplete = buffer.flags.contains(BlockHeaderFlags::INCOMPLETE); | ||||
|         } | ||||
|         Ok(Some(Self { | ||||
|             reader, | ||||
|             buffer, | ||||
|             found_end_marker, | ||||
|             incomplete, | ||||
|             seq_nr: 1, | ||||
|             got_eod: false, | ||||
|             read_error: false, | ||||
|             read_pos: 0, | ||||
|         })) | ||||
|     } | ||||
|  | ||||
|     fn check_buffer(buffer: &BlockHeader, seq_nr: u32) -> Result<(usize, bool), std::io::Error> { | ||||
|  | ||||
|         if buffer.magic != PROXMOX_TAPE_BLOCK_HEADER_MAGIC_1_0 { | ||||
|             proxmox::io_bail!("detected tape block with wrong magic number - not written by proxmox tape"); | ||||
|         } | ||||
|  | ||||
|         if seq_nr != buffer.seq_nr() { | ||||
|             proxmox::io_bail!( | ||||
|                 "detected tape block with wrong seqence number ({} != {})", | ||||
|                 seq_nr, buffer.seq_nr()) | ||||
|         } | ||||
|  | ||||
|         let size = buffer.size(); | ||||
|         let found_end_marker = buffer.flags.contains(BlockHeaderFlags::END_OF_STREAM); | ||||
|  | ||||
|         if size > buffer.payload.len() { | ||||
|             proxmox::io_bail!("detected tape block with wrong payload size ({} > {}", size, buffer.payload.len()); | ||||
|         } else if size == 0 { | ||||
|             if !found_end_marker{ | ||||
|                 proxmox::io_bail!("detected tape block with zero payload size"); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|  | ||||
|         Ok((size, found_end_marker)) | ||||
|     } | ||||
|  | ||||
|     fn read_block_frame(buffer: &mut BlockHeader, reader: &mut R) -> Result<bool, std::io::Error> { | ||||
|  | ||||
|         let data = unsafe { | ||||
|             std::slice::from_raw_parts_mut( | ||||
|                 (buffer as *mut BlockHeader) as *mut u8, | ||||
|                 BlockHeader::SIZE, | ||||
|             ) | ||||
|         }; | ||||
|  | ||||
|         tape_device_read_block(reader, data) | ||||
|     } | ||||
|  | ||||
|     fn read_block(&mut self) -> Result<usize, std::io::Error> { | ||||
|  | ||||
|         if !Self::read_block_frame(&mut self.buffer, &mut self.reader)? { | ||||
|             self.got_eod = true; | ||||
|             self.read_pos = self.buffer.payload.len(); | ||||
|             if !self.found_end_marker { | ||||
|                 proxmox::io_bail!("detected tape stream without end marker"); | ||||
|             } | ||||
|             return Ok(0); // EOD | ||||
|         } | ||||
|  | ||||
|         let (size, found_end_marker) = Self::check_buffer(&self.buffer, self.seq_nr)?; | ||||
|         self.seq_nr += 1; | ||||
|  | ||||
|         if found_end_marker { // consume EOF mark | ||||
|             self.found_end_marker = true; | ||||
|             self.incomplete = self.buffer.flags.contains(BlockHeaderFlags::INCOMPLETE); | ||||
|             let mut tmp_buf = [0u8; 512]; // use a small buffer for testing EOF | ||||
|             if tape_device_read_block(&mut self.reader, &mut tmp_buf)? { | ||||
|                 proxmox::io_bail!("detected tape block after stream end marker"); | ||||
|             } else { | ||||
|                 self.got_eod = true; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         self.read_pos = 0; | ||||
|  | ||||
|         Ok(size) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl <R: Read> TapeRead for BlockedReader<R> { | ||||
|  | ||||
|     fn is_incomplete(&self) -> Result<bool, std::io::Error> { | ||||
|         if !self.got_eod { | ||||
|             proxmox::io_bail!("is_incomplete failed: EOD not reached"); | ||||
|         } | ||||
|         if !self.found_end_marker { | ||||
|             proxmox::io_bail!("is_incomplete failed: no end marker found"); | ||||
|         } | ||||
|  | ||||
|         Ok(self.incomplete) | ||||
|     } | ||||
|  | ||||
|     fn has_end_marker(&self) -> Result<bool, std::io::Error> { | ||||
|         if !self.got_eod { | ||||
|             proxmox::io_bail!("has_end_marker failed: EOD not reached"); | ||||
|         } | ||||
|  | ||||
|         Ok(self.found_end_marker) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl <R: Read> Read for BlockedReader<R> { | ||||
|  | ||||
|     fn read(&mut self, buffer: &mut [u8]) -> Result<usize, std::io::Error> { | ||||
|  | ||||
|          if self.read_error { | ||||
|             proxmox::io_bail!("detected read after error - internal error"); | ||||
|         } | ||||
|  | ||||
|         let mut buffer_size = self.buffer.size(); | ||||
|         let mut rest = (buffer_size as isize) - (self.read_pos as isize); | ||||
|  | ||||
|         if rest <= 0 && !self.got_eod { // try to refill buffer | ||||
|             buffer_size = match self.read_block() { | ||||
|                 Ok(len) => len, | ||||
|                 err => { | ||||
|                     self.read_error = true; | ||||
|                     return err; | ||||
|                 } | ||||
|             }; | ||||
|             rest = buffer_size as isize; | ||||
|         } | ||||
|  | ||||
|         if rest <= 0 { | ||||
|             return Ok(0); | ||||
|         } else { | ||||
|             let copy_len = if (buffer.len() as isize) < rest { | ||||
|                 buffer.len() | ||||
|             } else { | ||||
|                 rest as usize | ||||
|             }; | ||||
|             buffer[..copy_len].copy_from_slice( | ||||
|                 &self.buffer.payload[self.read_pos..(self.read_pos + copy_len)]); | ||||
|             self.read_pos += copy_len; | ||||
|             return Ok(copy_len); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use std::io::Read; | ||||
|     use anyhow::Error; | ||||
|     use crate::tape::{ | ||||
|         TapeWrite, | ||||
|         file_formats::PROXMOX_TAPE_BLOCK_SIZE, | ||||
|         helpers::{ | ||||
|             BlockedReader, | ||||
|             BlockedWriter, | ||||
|         }, | ||||
|     }; | ||||
|  | ||||
|     fn write_and_verify(data: &[u8]) -> Result<(), Error> { | ||||
|  | ||||
|         let mut tape_data = Vec::new(); | ||||
|  | ||||
|         let mut writer = BlockedWriter::new(&mut tape_data); | ||||
|  | ||||
|         writer.write_all(data)?; | ||||
|  | ||||
|         writer.finish(false)?; | ||||
|  | ||||
|         assert_eq!( | ||||
|             tape_data.len(), | ||||
|             ((data.len() + PROXMOX_TAPE_BLOCK_SIZE)/PROXMOX_TAPE_BLOCK_SIZE) | ||||
|                 *PROXMOX_TAPE_BLOCK_SIZE | ||||
|         ); | ||||
|  | ||||
|         let reader = &mut &tape_data[..]; | ||||
|         let mut reader = BlockedReader::open(reader)?.unwrap(); | ||||
|  | ||||
|         let mut read_data = Vec::with_capacity(PROXMOX_TAPE_BLOCK_SIZE); | ||||
|         reader.read_to_end(&mut read_data)?; | ||||
|  | ||||
|         assert_eq!(data.len(), read_data.len()); | ||||
|  | ||||
|         assert_eq!(data, &read_data[..]); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn empty_stream() -> Result<(), Error> { | ||||
|         write_and_verify(b"") | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn small_data() -> Result<(), Error> { | ||||
|         write_and_verify(b"ABC") | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn large_data() -> Result<(), Error> { | ||||
|         let data = proxmox::sys::linux::random_data(1024*1024*5)?; | ||||
|         write_and_verify(&data) | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn no_data() -> Result<(), Error> { | ||||
|         let tape_data = Vec::new(); | ||||
|         let reader = &mut &tape_data[..]; | ||||
|         let reader = BlockedReader::open(reader)?; | ||||
|         assert!(reader.is_none()); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn no_end_marker() -> Result<(), Error> { | ||||
|         let mut tape_data = Vec::new(); | ||||
|         { | ||||
|             let mut writer = BlockedWriter::new(&mut tape_data); | ||||
|             // write at least one block | ||||
|             let data = proxmox::sys::linux::random_data(PROXMOX_TAPE_BLOCK_SIZE)?; | ||||
|             writer.write_all(&data)?; | ||||
|             // but do not call finish here | ||||
|         } | ||||
|         let reader = &mut &tape_data[..]; | ||||
|         let mut reader = BlockedReader::open(reader)?.unwrap(); | ||||
|  | ||||
|         let mut data = Vec::with_capacity(PROXMOX_TAPE_BLOCK_SIZE); | ||||
|         assert!(reader.read_to_end(&mut data).is_err()); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn small_read_buffer() -> Result<(), Error> { | ||||
|         let mut tape_data = Vec::new(); | ||||
|  | ||||
|         let mut writer = BlockedWriter::new(&mut tape_data); | ||||
|  | ||||
|         writer.write_all(b"ABC")?; | ||||
|  | ||||
|         writer.finish(false)?; | ||||
|  | ||||
|         let reader = &mut &tape_data[..]; | ||||
|         let mut reader = BlockedReader::open(reader)?.unwrap(); | ||||
|  | ||||
|         let mut buf = [0u8; 1]; | ||||
|         assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); | ||||
|         assert_eq!(&buf, b"A"); | ||||
|         assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); | ||||
|         assert_eq!(&buf, b"B"); | ||||
|         assert_eq!(reader.read(&mut buf)?, 1, "wrong byte count"); | ||||
|         assert_eq!(&buf, b"C"); | ||||
|         assert_eq!(reader.read(&mut buf)?, 0, "wrong byte count"); | ||||
|         assert_eq!(reader.read(&mut buf)?, 0, "wrong byte count"); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										124
									
								
								src/tape/helpers/blocked_writer.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										124
									
								
								src/tape/helpers/blocked_writer.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,124 @@ | ||||
| use std::io::Write; | ||||
|  | ||||
| use proxmox::tools::vec; | ||||
|  | ||||
| use crate::tape::{ | ||||
|     TapeWrite, | ||||
|     tape_device_write_block, | ||||
|     file_formats::{ | ||||
|         BlockHeader, | ||||
|         BlockHeaderFlags, | ||||
|     }, | ||||
| }; | ||||
|  | ||||
| /// Assemble and write blocks of data | ||||
| /// | ||||
| /// This type implement 'TapeWrite'. Data written is assembled to | ||||
| /// equally sized blocks (see 'BlockHeader'), which are then written | ||||
| /// to the underlying writer. | ||||
| pub struct BlockedWriter<W> { | ||||
|     writer: W, | ||||
|     buffer: Box<BlockHeader>, | ||||
|     buffer_pos: usize, | ||||
|     seq_nr: u32, | ||||
|     logical_end_of_media: bool, | ||||
|     bytes_written: usize, | ||||
| } | ||||
|  | ||||
| impl <W: Write> BlockedWriter<W> { | ||||
|  | ||||
|     /// Allow access to underlying writer | ||||
|     pub fn writer_ref_mut(&mut self) -> &mut W { | ||||
|         &mut self.writer | ||||
|     } | ||||
|  | ||||
|     /// Creates a new instance. | ||||
|     pub fn new(writer: W) -> Self { | ||||
|         Self { | ||||
|             writer, | ||||
|             buffer: BlockHeader::new(), | ||||
|             buffer_pos: 0, | ||||
|             seq_nr: 0, | ||||
|             logical_end_of_media: false, | ||||
|             bytes_written: 0, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn write_block(buffer: &BlockHeader, writer: &mut W) -> Result<bool, std::io::Error> { | ||||
|  | ||||
|         let data = unsafe { | ||||
|             std::slice::from_raw_parts( | ||||
|                 (buffer as *const BlockHeader) as *const u8, | ||||
|                 BlockHeader::SIZE, | ||||
|             ) | ||||
|         }; | ||||
|         tape_device_write_block(writer, data) | ||||
|     } | ||||
|  | ||||
|     fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> { | ||||
|  | ||||
|         if data.is_empty() { return Ok(0); } | ||||
|  | ||||
|         let rest = self.buffer.payload.len() - self.buffer_pos; | ||||
|         let bytes = if data.len() < rest { data.len() } else { rest }; | ||||
|         self.buffer.payload[self.buffer_pos..(self.buffer_pos+bytes)] | ||||
|             .copy_from_slice(&data[..bytes]); | ||||
|  | ||||
|         let rest = rest - bytes; | ||||
|  | ||||
|         if rest == 0 { | ||||
|             self.buffer.flags = BlockHeaderFlags::empty(); | ||||
|             self.buffer.set_size(self.buffer.payload.len()); | ||||
|             self.buffer.set_seq_nr(self.seq_nr); | ||||
|             self.seq_nr += 1; | ||||
|             let leom = Self::write_block(&self.buffer, &mut self.writer)?; | ||||
|             if leom { self.logical_end_of_media = true; } | ||||
|             self.buffer_pos = 0; | ||||
|             self.bytes_written += BlockHeader::SIZE; | ||||
|  | ||||
|         } else { | ||||
|             self.buffer_pos = self.buffer_pos + bytes; | ||||
|         } | ||||
|  | ||||
|         Ok(bytes) | ||||
|     } | ||||
|  | ||||
| } | ||||
|  | ||||
| impl <W: Write> TapeWrite for BlockedWriter<W> { | ||||
|  | ||||
|     fn write_all(&mut self, mut data: &[u8]) -> Result<bool, std::io::Error> { | ||||
|         while !data.is_empty() { | ||||
|             match self.write(data) { | ||||
|                 Ok(n) => data = &data[n..], | ||||
|                 Err(e) => return Err(e), | ||||
|             } | ||||
|         } | ||||
|         Ok(self.logical_end_of_media) | ||||
|     } | ||||
|  | ||||
|     fn bytes_written(&self) -> usize { | ||||
|         self.bytes_written | ||||
|     } | ||||
|  | ||||
|     /// flush last block, set END_OF_STREAM flag | ||||
|     /// | ||||
|     /// Note: This may write an empty block just including the | ||||
|     /// END_OF_STREAM flag. | ||||
|     fn finish(&mut self, incomplete: bool) -> Result<bool, std::io::Error> { | ||||
|         vec::clear(&mut self.buffer.payload[self.buffer_pos..]); | ||||
|         self.buffer.flags = BlockHeaderFlags::END_OF_STREAM; | ||||
|         if incomplete { self.buffer.flags |= BlockHeaderFlags::INCOMPLETE; } | ||||
|         self.buffer.set_size(self.buffer_pos); | ||||
|         self.buffer.set_seq_nr(self.seq_nr); | ||||
|         self.seq_nr += 1; | ||||
|         self.bytes_written += BlockHeader::SIZE; | ||||
|         Self::write_block(&self.buffer, &mut self.writer) | ||||
|     } | ||||
|  | ||||
|     /// Returns if the writer already detected the logical end of media | ||||
|     fn logical_end_of_media(&self) -> bool { | ||||
|         self.logical_end_of_media | ||||
|     } | ||||
|  | ||||
| } | ||||
| @ -3,3 +3,9 @@ pub use emulate_tape_writer::*; | ||||
|  | ||||
| mod emulate_tape_reader; | ||||
| pub use emulate_tape_reader::*; | ||||
|  | ||||
| mod blocked_reader; | ||||
| pub use blocked_reader::*; | ||||
|  | ||||
| mod blocked_writer; | ||||
| pub use blocked_writer::*; | ||||
|  | ||||
		Reference in New Issue
	
	Block a user