moving more code to pbs-datastore
prune and fixed/dynamic index Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
		
							
								
								
									
										508
									
								
								pbs-datastore/src/dynamic_index.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										508
									
								
								pbs-datastore/src/dynamic_index.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,508 @@ | ||||
| use std::fs::File; | ||||
| use std::io::{BufWriter, Seek, SeekFrom, Write}; | ||||
| use std::os::unix::io::AsRawFd; | ||||
| use std::path::{Path, PathBuf}; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use anyhow::{bail, format_err, Error}; | ||||
|  | ||||
| use proxmox::tools::io::ReadExt; | ||||
| use proxmox::tools::uuid::Uuid; | ||||
| use proxmox::tools::mmap::Mmap; | ||||
|  | ||||
| use pbs_tools::process_locker::ProcessLockSharedGuard; | ||||
|  | ||||
| use crate::Chunker; | ||||
| use crate::chunk_stat::ChunkStat; | ||||
| use crate::chunk_store::ChunkStore; | ||||
| use crate::data_blob::{DataBlob, DataChunkBuilder}; | ||||
| use crate::file_formats; | ||||
| use crate::index::{IndexFile, ChunkReadInfo}; | ||||
|  | ||||
| /// Header format definition for dynamic index files (`.dixd`) | ||||
| #[repr(C)] | ||||
| pub struct DynamicIndexHeader { | ||||
|     pub magic: [u8; 8], | ||||
|     pub uuid: [u8; 16], | ||||
|     pub ctime: i64, | ||||
|     /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)`` | ||||
|     pub index_csum: [u8; 32], | ||||
|     reserved: [u8; 4032], // overall size is one page (4096 bytes) | ||||
| } | ||||
| proxmox::static_assert_size!(DynamicIndexHeader, 4096); | ||||
| // TODO: Once non-Copy unions are stabilized, use: | ||||
| // union DynamicIndexHeader { | ||||
| //     reserved: [u8; 4096], | ||||
| //     pub data: DynamicIndexHeaderData, | ||||
| // } | ||||
|  | ||||
| impl DynamicIndexHeader { | ||||
|     /// Convenience method to allocate a zero-initialized header struct. | ||||
|     pub fn zeroed() -> Box<Self> { | ||||
|         unsafe { | ||||
|             Box::from_raw(std::alloc::alloc_zeroed(std::alloc::Layout::new::<Self>()) as *mut Self) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn as_bytes(&self) -> &[u8] { | ||||
|         unsafe { | ||||
|             std::slice::from_raw_parts( | ||||
|                 self as *const Self as *const u8, | ||||
|                 std::mem::size_of::<Self>(), | ||||
|             ) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone, Debug)] | ||||
| #[repr(C)] | ||||
| pub struct DynamicEntry { | ||||
|     end_le: u64, | ||||
|     digest: [u8; 32], | ||||
| } | ||||
|  | ||||
| impl DynamicEntry { | ||||
|     #[inline] | ||||
|     pub fn end(&self) -> u64 { | ||||
|         u64::from_le(self.end_le) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct DynamicIndexReader { | ||||
|     _file: File, | ||||
|     pub size: usize, | ||||
|     index: Mmap<DynamicEntry>, | ||||
|     pub uuid: [u8; 16], | ||||
|     pub ctime: i64, | ||||
|     pub index_csum: [u8; 32], | ||||
| } | ||||
|  | ||||
| impl DynamicIndexReader { | ||||
|     pub fn open(path: &Path) -> Result<Self, Error> { | ||||
|         File::open(path) | ||||
|             .map_err(Error::from) | ||||
|             .and_then(Self::new) | ||||
|             .map_err(|err| format_err!("Unable to open dynamic index {:?} - {}", path, err)) | ||||
|     } | ||||
|  | ||||
|     pub fn index(&self) -> &[DynamicEntry] { | ||||
|         &self.index | ||||
|     } | ||||
|  | ||||
|     pub fn new(mut file: std::fs::File) -> Result<Self, Error> { | ||||
|         // FIXME: This is NOT OUR job! Check the callers of this method and remove this! | ||||
|         file.seek(SeekFrom::Start(0))?; | ||||
|  | ||||
|         let header_size = std::mem::size_of::<DynamicIndexHeader>(); | ||||
|  | ||||
|         let rawfd = file.as_raw_fd(); | ||||
|         let stat = match nix::sys::stat::fstat(rawfd) { | ||||
|             Ok(stat) => stat, | ||||
|             Err(err) => bail!("fstat failed - {}", err), | ||||
|         }; | ||||
|  | ||||
|         let size = stat.st_size as usize; | ||||
|  | ||||
|         if size < header_size { | ||||
|             bail!("index too small ({})", stat.st_size); | ||||
|         } | ||||
|  | ||||
|         let header: Box<DynamicIndexHeader> = unsafe { file.read_host_value_boxed()? }; | ||||
|  | ||||
|         if header.magic != file_formats::DYNAMIC_SIZED_CHUNK_INDEX_1_0 { | ||||
|             bail!("got unknown magic number"); | ||||
|         } | ||||
|  | ||||
|         let ctime = proxmox::tools::time::epoch_i64(); | ||||
|  | ||||
|         let index_size = stat.st_size as usize - header_size; | ||||
|         let index_count = index_size / 40; | ||||
|         if index_count * 40 != index_size { | ||||
|             bail!("got unexpected file size"); | ||||
|         } | ||||
|  | ||||
|         let index = unsafe { | ||||
|             Mmap::map_fd( | ||||
|                 rawfd, | ||||
|                 header_size as u64, | ||||
|                 index_count, | ||||
|                 nix::sys::mman::ProtFlags::PROT_READ, | ||||
|                 nix::sys::mman::MapFlags::MAP_PRIVATE, | ||||
|             )? | ||||
|         }; | ||||
|  | ||||
|         Ok(Self { | ||||
|             _file: file, | ||||
|             size, | ||||
|             index, | ||||
|             ctime, | ||||
|             uuid: header.uuid, | ||||
|             index_csum: header.index_csum, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     #[allow(clippy::cast_ptr_alignment)] | ||||
|     pub fn chunk_end(&self, pos: usize) -> u64 { | ||||
|         if pos >= self.index.len() { | ||||
|             panic!("chunk index out of range"); | ||||
|         } | ||||
|         self.index[pos].end() | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn chunk_digest(&self, pos: usize) -> &[u8; 32] { | ||||
|         if pos >= self.index.len() { | ||||
|             panic!("chunk index out of range"); | ||||
|         } | ||||
|         &self.index[pos].digest | ||||
|     } | ||||
|  | ||||
|     pub fn binary_search( | ||||
|         &self, | ||||
|         start_idx: usize, | ||||
|         start: u64, | ||||
|         end_idx: usize, | ||||
|         end: u64, | ||||
|         offset: u64, | ||||
|     ) -> Result<usize, Error> { | ||||
|         if (offset >= end) || (offset < start) { | ||||
|             bail!("offset out of range"); | ||||
|         } | ||||
|  | ||||
|         if end_idx == start_idx { | ||||
|             return Ok(start_idx); // found | ||||
|         } | ||||
|         let middle_idx = (start_idx + end_idx) / 2; | ||||
|         let middle_end = self.chunk_end(middle_idx); | ||||
|  | ||||
|         if offset < middle_end { | ||||
|             self.binary_search(start_idx, start, middle_idx, middle_end, offset) | ||||
|         } else { | ||||
|             self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl IndexFile for DynamicIndexReader { | ||||
|     fn index_count(&self) -> usize { | ||||
|         self.index.len() | ||||
|     } | ||||
|  | ||||
|     fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> { | ||||
|         if pos >= self.index.len() { | ||||
|             None | ||||
|         } else { | ||||
|             Some(unsafe { &*(self.chunk_digest(pos).as_ptr() as *const [u8; 32]) }) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn index_bytes(&self) -> u64 { | ||||
|         if self.index.is_empty() { | ||||
|             0 | ||||
|         } else { | ||||
|             self.chunk_end(self.index.len() - 1) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn compute_csum(&self) -> ([u8; 32], u64) { | ||||
|         let mut csum = openssl::sha::Sha256::new(); | ||||
|         let mut chunk_end = 0; | ||||
|         for pos in 0..self.index_count() { | ||||
|             let info = self.chunk_info(pos).unwrap(); | ||||
|             chunk_end = info.range.end; | ||||
|             csum.update(&chunk_end.to_le_bytes()); | ||||
|             csum.update(&info.digest); | ||||
|         } | ||||
|         let csum = csum.finish(); | ||||
|         (csum, chunk_end) | ||||
|     } | ||||
|  | ||||
|     fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> { | ||||
|         if pos >= self.index.len() { | ||||
|             return None; | ||||
|         } | ||||
|         let start = if pos == 0 { 0 } else { self.index[pos - 1].end() }; | ||||
|  | ||||
|         let end = self.index[pos].end(); | ||||
|  | ||||
|         Some(ChunkReadInfo { | ||||
|             range: start..end, | ||||
|             digest: self.index[pos].digest, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     fn index_ctime(&self) -> i64 { | ||||
|         self.ctime | ||||
|     } | ||||
|  | ||||
|     fn index_size(&self) -> usize { | ||||
|         self.size as usize | ||||
|     } | ||||
|  | ||||
|     fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> { | ||||
|         let end_idx = self.index.len() - 1; | ||||
|         let end = self.chunk_end(end_idx); | ||||
|         let found_idx = self.binary_search(0, 0, end_idx, end, offset); | ||||
|         let found_idx = match found_idx { | ||||
|             Ok(i) => i, | ||||
|             Err(_) => return None | ||||
|         }; | ||||
|  | ||||
|         let found_start = if found_idx == 0 { | ||||
|             0 | ||||
|         } else { | ||||
|             self.chunk_end(found_idx - 1) | ||||
|         }; | ||||
|  | ||||
|         Some((found_idx, offset - found_start)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Create dynamic index files (`.dixd`) | ||||
| pub struct DynamicIndexWriter { | ||||
|     store: Arc<ChunkStore>, | ||||
|     _lock: ProcessLockSharedGuard, | ||||
|     writer: BufWriter<File>, | ||||
|     closed: bool, | ||||
|     filename: PathBuf, | ||||
|     tmp_filename: PathBuf, | ||||
|     csum: Option<openssl::sha::Sha256>, | ||||
|     pub uuid: [u8; 16], | ||||
|     pub ctime: i64, | ||||
| } | ||||
|  | ||||
| impl Drop for DynamicIndexWriter { | ||||
|     fn drop(&mut self) { | ||||
|         let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl DynamicIndexWriter { | ||||
|     pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> { | ||||
|         let shared_lock = store.try_shared_lock()?; | ||||
|  | ||||
|         let full_path = store.relative_path(path); | ||||
|         let mut tmp_path = full_path.clone(); | ||||
|         tmp_path.set_extension("tmp_didx"); | ||||
|  | ||||
|         let file = std::fs::OpenOptions::new() | ||||
|             .create(true) | ||||
|             .truncate(true) | ||||
|             .read(true) | ||||
|             .write(true) | ||||
|             .open(&tmp_path)?; | ||||
|  | ||||
|         let mut writer = BufWriter::with_capacity(1024 * 1024, file); | ||||
|  | ||||
|         let ctime = proxmox::tools::time::epoch_i64(); | ||||
|  | ||||
|         let uuid = Uuid::generate(); | ||||
|  | ||||
|         let mut header = DynamicIndexHeader::zeroed(); | ||||
|         header.magic = file_formats::DYNAMIC_SIZED_CHUNK_INDEX_1_0; | ||||
|         header.ctime = i64::to_le(ctime); | ||||
|         header.uuid = *uuid.as_bytes(); | ||||
|         // header.index_csum = [0u8; 32]; | ||||
|         writer.write_all(header.as_bytes())?; | ||||
|  | ||||
|         let csum = Some(openssl::sha::Sha256::new()); | ||||
|  | ||||
|         Ok(Self { | ||||
|             store, | ||||
|             _lock: shared_lock, | ||||
|             writer, | ||||
|             closed: false, | ||||
|             filename: full_path, | ||||
|             tmp_filename: tmp_path, | ||||
|             ctime, | ||||
|             uuid: *uuid.as_bytes(), | ||||
|             csum, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     // fixme: use add_chunk instead? | ||||
|     pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> { | ||||
|         self.store.insert_chunk(chunk, digest) | ||||
|     } | ||||
|  | ||||
|     pub fn close(&mut self) -> Result<[u8; 32], Error> { | ||||
|         if self.closed { | ||||
|             bail!( | ||||
|                 "cannot close already closed archive index file {:?}", | ||||
|                 self.filename | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         self.closed = true; | ||||
|  | ||||
|         self.writer.flush()?; | ||||
|  | ||||
|         let csum_offset = proxmox::offsetof!(DynamicIndexHeader, index_csum); | ||||
|         self.writer.seek(SeekFrom::Start(csum_offset as u64))?; | ||||
|  | ||||
|         let csum = self.csum.take().unwrap(); | ||||
|         let index_csum = csum.finish(); | ||||
|  | ||||
|         self.writer.write_all(&index_csum)?; | ||||
|         self.writer.flush()?; | ||||
|  | ||||
|         if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { | ||||
|             bail!("Atomic rename file {:?} failed - {}", self.filename, err); | ||||
|         } | ||||
|  | ||||
|         Ok(index_csum) | ||||
|     } | ||||
|  | ||||
|     // fixme: rename to add_digest | ||||
|     pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> { | ||||
|         if self.closed { | ||||
|             bail!( | ||||
|                 "cannot write to closed dynamic index file {:?}", | ||||
|                 self.filename | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8; 8]>(offset.to_le()) }; | ||||
|  | ||||
|         if let Some(ref mut csum) = self.csum { | ||||
|             csum.update(offset_le); | ||||
|             csum.update(digest); | ||||
|         } | ||||
|  | ||||
|         self.writer.write_all(offset_le)?; | ||||
|         self.writer.write_all(digest)?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Writer which splits a binary stream into dynamic sized chunks | ||||
| /// | ||||
| /// And store the resulting chunk list into the index file. | ||||
| pub struct DynamicChunkWriter { | ||||
|     index: DynamicIndexWriter, | ||||
|     closed: bool, | ||||
|     chunker: Chunker, | ||||
|     stat: ChunkStat, | ||||
|     chunk_offset: usize, | ||||
|     last_chunk: usize, | ||||
|     chunk_buffer: Vec<u8>, | ||||
| } | ||||
|  | ||||
| impl DynamicChunkWriter { | ||||
|     pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self { | ||||
|         Self { | ||||
|             index, | ||||
|             closed: false, | ||||
|             chunker: Chunker::new(chunk_size), | ||||
|             stat: ChunkStat::new(0), | ||||
|             chunk_offset: 0, | ||||
|             last_chunk: 0, | ||||
|             chunk_buffer: Vec::with_capacity(chunk_size * 4), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn stat(&self) -> &ChunkStat { | ||||
|         &self.stat | ||||
|     } | ||||
|  | ||||
|     pub fn close(&mut self) -> Result<(), Error> { | ||||
|         if self.closed { | ||||
|             return Ok(()); | ||||
|         } | ||||
|  | ||||
|         self.closed = true; | ||||
|  | ||||
|         self.write_chunk_buffer()?; | ||||
|  | ||||
|         self.index.close()?; | ||||
|  | ||||
|         self.stat.size = self.chunk_offset as u64; | ||||
|  | ||||
|         // add size of index file | ||||
|         self.stat.size += | ||||
|             (self.stat.chunk_count * 40 + std::mem::size_of::<DynamicIndexHeader>()) as u64; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn write_chunk_buffer(&mut self) -> Result<(), Error> { | ||||
|         let chunk_size = self.chunk_buffer.len(); | ||||
|  | ||||
|         if chunk_size == 0 { | ||||
|             return Ok(()); | ||||
|         } | ||||
|  | ||||
|         let expected_chunk_size = self.chunk_offset - self.last_chunk; | ||||
|         if expected_chunk_size != self.chunk_buffer.len() { | ||||
|             bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size); | ||||
|         } | ||||
|  | ||||
|         self.stat.chunk_count += 1; | ||||
|  | ||||
|         self.last_chunk = self.chunk_offset; | ||||
|  | ||||
|         let (chunk, digest) = DataChunkBuilder::new(&self.chunk_buffer) | ||||
|             .compress(true) | ||||
|             .build()?; | ||||
|  | ||||
|         match self.index.insert_chunk(&chunk, &digest) { | ||||
|             Ok((is_duplicate, compressed_size)) => { | ||||
|                 self.stat.compressed_size += compressed_size; | ||||
|                 if is_duplicate { | ||||
|                     self.stat.duplicate_chunks += 1; | ||||
|                 } else { | ||||
|                     self.stat.disk_size += compressed_size; | ||||
|                 } | ||||
|  | ||||
|                 println!( | ||||
|                     "ADD CHUNK {:016x} {} {}% {} {}", | ||||
|                     self.chunk_offset, | ||||
|                     chunk_size, | ||||
|                     (compressed_size * 100) / (chunk_size as u64), | ||||
|                     is_duplicate, | ||||
|                     proxmox::tools::digest_to_hex(&digest) | ||||
|                 ); | ||||
|                 self.index.add_chunk(self.chunk_offset as u64, &digest)?; | ||||
|                 self.chunk_buffer.truncate(0); | ||||
|                 Ok(()) | ||||
|             } | ||||
|             Err(err) => { | ||||
|                 self.chunk_buffer.truncate(0); | ||||
|                 Err(err) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Write for DynamicChunkWriter { | ||||
|     fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> { | ||||
|         let chunker = &mut self.chunker; | ||||
|  | ||||
|         let pos = chunker.scan(data); | ||||
|  | ||||
|         if pos > 0 { | ||||
|             self.chunk_buffer.extend_from_slice(&data[0..pos]); | ||||
|             self.chunk_offset += pos; | ||||
|  | ||||
|             if let Err(err) = self.write_chunk_buffer() { | ||||
|                 return Err(std::io::Error::new( | ||||
|                     std::io::ErrorKind::Other, | ||||
|                     err.to_string(), | ||||
|                 )); | ||||
|             } | ||||
|             Ok(pos) | ||||
|         } else { | ||||
|             self.chunk_offset += data.len(); | ||||
|             self.chunk_buffer.extend_from_slice(data); | ||||
|             Ok(data.len()) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn flush(&mut self) -> std::result::Result<(), std::io::Error> { | ||||
|         Err(std::io::Error::new( | ||||
|             std::io::ErrorKind::Other, | ||||
|             "please use close() instead of flush()", | ||||
|         )) | ||||
|     } | ||||
| } | ||||
| @ -7,15 +7,17 @@ use std::io::{Seek, SeekFrom}; | ||||
| 
 | ||||
| use anyhow::{bail, format_err, Error}; | ||||
| 
 | ||||
| use pbs_datastore::chunk_stat::ChunkStat; | ||||
| use pbs_datastore::chunk_store::ChunkStore; | ||||
| use pbs_datastore::data_blob::ChunkInfo; | ||||
| use pbs_datastore::index::{ChunkReadInfo, IndexFile}; | ||||
| use pbs_tools::process_locker::ProcessLockSharedGuard; | ||||
| 
 | ||||
| use proxmox::tools::io::ReadExt; | ||||
| use proxmox::tools::Uuid; | ||||
| 
 | ||||
| use crate::chunk_stat::ChunkStat; | ||||
| use crate::chunk_store::ChunkStore; | ||||
| use crate::data_blob::ChunkInfo; | ||||
| use crate::file_formats; | ||||
| use crate::index::{ChunkReadInfo, IndexFile}; | ||||
| 
 | ||||
| /// Header format definition for fixed index files (`.fidx`)
 | ||||
| #[repr(C)] | ||||
| pub struct FixedIndexHeader { | ||||
| @ -81,7 +83,7 @@ impl FixedIndexReader { | ||||
| 
 | ||||
|         let header: Box<FixedIndexHeader> = unsafe { file.read_host_value_boxed()? }; | ||||
| 
 | ||||
|         if header.magic != super::FIXED_SIZED_CHUNK_INDEX_1_0 { | ||||
|         if header.magic != file_formats::FIXED_SIZED_CHUNK_INDEX_1_0 { | ||||
|             bail!("got unknown magic number"); | ||||
|         } | ||||
| 
 | ||||
| @ -286,7 +288,7 @@ impl FixedIndexWriter { | ||||
|         let buffer = vec![0u8; header_size]; | ||||
|         let header = unsafe { &mut *(buffer.as_ptr() as *mut FixedIndexHeader) }; | ||||
| 
 | ||||
|         header.magic = super::FIXED_SIZED_CHUNK_INDEX_1_0; | ||||
|         header.magic = file_formats::FIXED_SIZED_CHUNK_INDEX_1_0; | ||||
|         header.ctime = i64::to_le(ctime); | ||||
|         header.size = u64::to_le(size as u64); | ||||
|         header.chunk_size = u64::to_le(chunk_size as u64); | ||||
| @ -195,9 +195,13 @@ pub mod file_formats; | ||||
| pub mod index; | ||||
| pub mod key_derivation; | ||||
| pub mod manifest; | ||||
| pub mod prune; | ||||
| pub mod read_chunk; | ||||
| pub mod task; | ||||
|  | ||||
| pub mod dynamic_index; | ||||
| pub mod fixed_index; | ||||
|  | ||||
| pub use backup_info::{BackupDir, BackupGroup, BackupInfo}; | ||||
| pub use checksum_reader::ChecksumReader; | ||||
| pub use checksum_writer::ChecksumWriter; | ||||
|  | ||||
| @ -1,7 +1,8 @@ | ||||
| use anyhow::{Error}; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::path::PathBuf; | ||||
| 
 | ||||
| use anyhow::{Error}; | ||||
| 
 | ||||
| use super::BackupInfo; | ||||
| 
 | ||||
| enum PruneMark { Keep, KeepPartial, Remove } | ||||
| @ -12,22 +12,26 @@ use lazy_static::lazy_static; | ||||
|  | ||||
| use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions, open_file_locked}; | ||||
|  | ||||
| use pbs_api_types::upid::UPID; | ||||
| use pbs_api_types::{Authid, GarbageCollectionStatus}; | ||||
| use pbs_datastore::{task_log, task_warn}; | ||||
| use pbs_datastore::DataBlob; | ||||
| use pbs_datastore::backup_info::{BackupGroup, BackupDir}; | ||||
| use pbs_datastore::chunk_store::ChunkStore; | ||||
| use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; | ||||
| use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter}; | ||||
| use pbs_datastore::index::IndexFile; | ||||
| use pbs_datastore::manifest::{ | ||||
|     MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NAME, | ||||
|     ArchiveType, BackupManifest, | ||||
|     archive_type, | ||||
| }; | ||||
| use pbs_datastore::task::TaskState; | ||||
| use pbs_tools::format::HumanByte; | ||||
| use pbs_tools::fs::{lock_dir_noblock, DirLockGuard}; | ||||
|  | ||||
| use super::backup_info::{BackupGroup, BackupDir}; | ||||
| use super::chunk_store::ChunkStore; | ||||
| use super::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; | ||||
| use super::fixed_index::{FixedIndexReader, FixedIndexWriter}; | ||||
| use super::manifest::{MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NAME, BackupManifest}; | ||||
| use super::index::*; | ||||
| use super::{DataBlob, ArchiveType, archive_type}; | ||||
| use crate::config::datastore::{self, DataStoreConfig}; | ||||
| use crate::tools; | ||||
| use crate::api2::types::{Authid, GarbageCollectionStatus}; | ||||
| use crate::server::UPID; | ||||
|  | ||||
| lazy_static! { | ||||
|     static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new()); | ||||
|  | ||||
| @ -1,263 +1,16 @@ | ||||
| use std::fs::File; | ||||
| use std::io::{self, BufWriter, Seek, SeekFrom, Write}; | ||||
| use std::io::{self, Seek, SeekFrom}; | ||||
| use std::ops::Range; | ||||
| use std::os::unix::io::AsRawFd; | ||||
| use std::path::{Path, PathBuf}; | ||||
| use std::sync::{Arc, Mutex}; | ||||
| use std::task::Context; | ||||
| use std::pin::Pin; | ||||
|  | ||||
| use anyhow::{bail, format_err, Error}; | ||||
|  | ||||
| use proxmox::tools::io::ReadExt; | ||||
| use proxmox::tools::uuid::Uuid; | ||||
| use proxmox::tools::mmap::Mmap; | ||||
| use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; | ||||
|  | ||||
| use pbs_datastore::Chunker; | ||||
| use pbs_datastore::index::{IndexFile, ChunkReadInfo}; | ||||
| use pbs_datastore::chunk_stat::ChunkStat; | ||||
| use pbs_datastore::data_blob::{DataBlob, DataChunkBuilder}; | ||||
| use pbs_datastore::chunk_store::ChunkStore; | ||||
| use pbs_datastore::dynamic_index::DynamicIndexReader; | ||||
| use pbs_datastore::read_chunk::ReadChunk; | ||||
| use pbs_tools::process_locker::ProcessLockSharedGuard; | ||||
|  | ||||
| /// Header format definition for dynamic index files (`.dixd`) | ||||
| #[repr(C)] | ||||
| pub struct DynamicIndexHeader { | ||||
|     pub magic: [u8; 8], | ||||
|     pub uuid: [u8; 16], | ||||
|     pub ctime: i64, | ||||
|     /// Sha256 over the index ``SHA256(offset1||digest1||offset2||digest2||...)`` | ||||
|     pub index_csum: [u8; 32], | ||||
|     reserved: [u8; 4032], // overall size is one page (4096 bytes) | ||||
| } | ||||
| proxmox::static_assert_size!(DynamicIndexHeader, 4096); | ||||
| // TODO: Once non-Copy unions are stabilized, use: | ||||
| // union DynamicIndexHeader { | ||||
| //     reserved: [u8; 4096], | ||||
| //     pub data: DynamicIndexHeaderData, | ||||
| // } | ||||
|  | ||||
| impl DynamicIndexHeader { | ||||
|     /// Convenience method to allocate a zero-initialized header struct. | ||||
|     pub fn zeroed() -> Box<Self> { | ||||
|         unsafe { | ||||
|             Box::from_raw(std::alloc::alloc_zeroed(std::alloc::Layout::new::<Self>()) as *mut Self) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn as_bytes(&self) -> &[u8] { | ||||
|         unsafe { | ||||
|             std::slice::from_raw_parts( | ||||
|                 self as *const Self as *const u8, | ||||
|                 std::mem::size_of::<Self>(), | ||||
|             ) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone, Debug)] | ||||
| #[repr(C)] | ||||
| pub struct DynamicEntry { | ||||
|     end_le: u64, | ||||
|     digest: [u8; 32], | ||||
| } | ||||
|  | ||||
| impl DynamicEntry { | ||||
|     #[inline] | ||||
|     pub fn end(&self) -> u64 { | ||||
|         u64::from_le(self.end_le) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct DynamicIndexReader { | ||||
|     _file: File, | ||||
|     pub size: usize, | ||||
|     index: Mmap<DynamicEntry>, | ||||
|     pub uuid: [u8; 16], | ||||
|     pub ctime: i64, | ||||
|     pub index_csum: [u8; 32], | ||||
| } | ||||
|  | ||||
| impl DynamicIndexReader { | ||||
|     pub fn open(path: &Path) -> Result<Self, Error> { | ||||
|         File::open(path) | ||||
|             .map_err(Error::from) | ||||
|             .and_then(Self::new) | ||||
|             .map_err(|err| format_err!("Unable to open dynamic index {:?} - {}", path, err)) | ||||
|     } | ||||
|  | ||||
|     pub fn new(mut file: std::fs::File) -> Result<Self, Error> { | ||||
|         // FIXME: This is NOT OUR job! Check the callers of this method and remove this! | ||||
|         file.seek(SeekFrom::Start(0))?; | ||||
|  | ||||
|         let header_size = std::mem::size_of::<DynamicIndexHeader>(); | ||||
|  | ||||
|         let rawfd = file.as_raw_fd(); | ||||
|         let stat = match nix::sys::stat::fstat(rawfd) { | ||||
|             Ok(stat) => stat, | ||||
|             Err(err) => bail!("fstat failed - {}", err), | ||||
|         }; | ||||
|  | ||||
|         let size = stat.st_size as usize; | ||||
|  | ||||
|         if size < header_size { | ||||
|             bail!("index too small ({})", stat.st_size); | ||||
|         } | ||||
|  | ||||
|         let header: Box<DynamicIndexHeader> = unsafe { file.read_host_value_boxed()? }; | ||||
|  | ||||
|         if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 { | ||||
|             bail!("got unknown magic number"); | ||||
|         } | ||||
|  | ||||
|         let ctime = proxmox::tools::time::epoch_i64(); | ||||
|  | ||||
|         let index_size = stat.st_size as usize - header_size; | ||||
|         let index_count = index_size / 40; | ||||
|         if index_count * 40 != index_size { | ||||
|             bail!("got unexpected file size"); | ||||
|         } | ||||
|  | ||||
|         let index = unsafe { | ||||
|             Mmap::map_fd( | ||||
|                 rawfd, | ||||
|                 header_size as u64, | ||||
|                 index_count, | ||||
|                 nix::sys::mman::ProtFlags::PROT_READ, | ||||
|                 nix::sys::mman::MapFlags::MAP_PRIVATE, | ||||
|             )? | ||||
|         }; | ||||
|  | ||||
|         Ok(Self { | ||||
|             _file: file, | ||||
|             size, | ||||
|             index, | ||||
|             ctime, | ||||
|             uuid: header.uuid, | ||||
|             index_csum: header.index_csum, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     #[allow(clippy::cast_ptr_alignment)] | ||||
|     fn chunk_end(&self, pos: usize) -> u64 { | ||||
|         if pos >= self.index.len() { | ||||
|             panic!("chunk index out of range"); | ||||
|         } | ||||
|         self.index[pos].end() | ||||
|     } | ||||
|  | ||||
|     #[inline] | ||||
|     fn chunk_digest(&self, pos: usize) -> &[u8; 32] { | ||||
|         if pos >= self.index.len() { | ||||
|             panic!("chunk index out of range"); | ||||
|         } | ||||
|         &self.index[pos].digest | ||||
|     } | ||||
|  | ||||
|     // TODO: can we use std::slice::binary_search with Mmap now? | ||||
|     fn binary_search( | ||||
|         &self, | ||||
|         start_idx: usize, | ||||
|         start: u64, | ||||
|         end_idx: usize, | ||||
|         end: u64, | ||||
|         offset: u64, | ||||
|     ) -> Result<usize, Error> { | ||||
|         if (offset >= end) || (offset < start) { | ||||
|             bail!("offset out of range"); | ||||
|         } | ||||
|  | ||||
|         if end_idx == start_idx { | ||||
|             return Ok(start_idx); // found | ||||
|         } | ||||
|         let middle_idx = (start_idx + end_idx) / 2; | ||||
|         let middle_end = self.chunk_end(middle_idx); | ||||
|  | ||||
|         if offset < middle_end { | ||||
|             self.binary_search(start_idx, start, middle_idx, middle_end, offset) | ||||
|         } else { | ||||
|             self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl IndexFile for DynamicIndexReader { | ||||
|     fn index_count(&self) -> usize { | ||||
|         self.index.len() | ||||
|     } | ||||
|  | ||||
|     fn index_digest(&self, pos: usize) -> Option<&[u8; 32]> { | ||||
|         if pos >= self.index.len() { | ||||
|             None | ||||
|         } else { | ||||
|             Some(unsafe { &*(self.chunk_digest(pos).as_ptr() as *const [u8; 32]) }) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn index_bytes(&self) -> u64 { | ||||
|         if self.index.is_empty() { | ||||
|             0 | ||||
|         } else { | ||||
|             self.chunk_end(self.index.len() - 1) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn compute_csum(&self) -> ([u8; 32], u64) { | ||||
|         let mut csum = openssl::sha::Sha256::new(); | ||||
|         let mut chunk_end = 0; | ||||
|         for pos in 0..self.index_count() { | ||||
|             let info = self.chunk_info(pos).unwrap(); | ||||
|             chunk_end = info.range.end; | ||||
|             csum.update(&chunk_end.to_le_bytes()); | ||||
|             csum.update(&info.digest); | ||||
|         } | ||||
|         let csum = csum.finish(); | ||||
|         (csum, chunk_end) | ||||
|     } | ||||
|  | ||||
|     fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> { | ||||
|         if pos >= self.index.len() { | ||||
|             return None; | ||||
|         } | ||||
|         let start = if pos == 0 { 0 } else { self.index[pos - 1].end() }; | ||||
|  | ||||
|         let end = self.index[pos].end(); | ||||
|  | ||||
|         Some(ChunkReadInfo { | ||||
|             range: start..end, | ||||
|             digest: self.index[pos].digest, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     fn index_ctime(&self) -> i64 { | ||||
|         self.ctime | ||||
|     } | ||||
|  | ||||
|     fn index_size(&self) -> usize { | ||||
|         self.size as usize | ||||
|     } | ||||
|  | ||||
|     fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> { | ||||
|         let end_idx = self.index.len() - 1; | ||||
|         let end = self.chunk_end(end_idx); | ||||
|         let found_idx = self.binary_search(0, 0, end_idx, end, offset); | ||||
|         let found_idx = match found_idx { | ||||
|             Ok(i) => i, | ||||
|             Err(_) => return None | ||||
|         }; | ||||
|  | ||||
|         let found_start = if found_idx == 0 { | ||||
|             0 | ||||
|         } else { | ||||
|             self.chunk_end(found_idx - 1) | ||||
|         }; | ||||
|  | ||||
|         Some((found_idx, offset - found_start)) | ||||
|     } | ||||
| } | ||||
| use pbs_datastore::index::IndexFile; | ||||
|  | ||||
| struct CachedChunk { | ||||
|     range: Range<u64>, | ||||
| @ -358,7 +111,7 @@ impl<S: ReadChunk> crate::tools::BufferedRead for BufferedDynamicReader<S> { | ||||
|  | ||||
|         // optimization for sequential read | ||||
|         if buffer_len > 0 | ||||
|             && ((self.buffered_chunk_idx + 1) < index.index.len()) | ||||
|             && ((self.buffered_chunk_idx + 1) < index.index().len()) | ||||
|             && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | ||||
|         { | ||||
|             let next_idx = self.buffered_chunk_idx + 1; | ||||
| @ -374,7 +127,7 @@ impl<S: ReadChunk> crate::tools::BufferedRead for BufferedDynamicReader<S> { | ||||
|             || (offset < self.buffered_chunk_start) | ||||
|             || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) | ||||
|         { | ||||
|             let end_idx = index.index.len() - 1; | ||||
|             let end_idx = index.index().len() - 1; | ||||
|             let end = index.chunk_end(end_idx); | ||||
|             let idx = index.binary_search(0, 0, end_idx, end, offset)?; | ||||
|             self.buffer_chunk(idx)?; | ||||
| @ -474,252 +227,3 @@ impl<R: ReadChunk> ReadAt for LocalDynamicReadAt<R> { | ||||
|         panic!("LocalDynamicReadAt::start_read_at returned Pending"); | ||||
|     } | ||||
| } | ||||
|  | ||||
|  | ||||
| /// Create dynamic index files (`.dixd`) | ||||
| pub struct DynamicIndexWriter { | ||||
|     store: Arc<ChunkStore>, | ||||
|     _lock: ProcessLockSharedGuard, | ||||
|     writer: BufWriter<File>, | ||||
|     closed: bool, | ||||
|     filename: PathBuf, | ||||
|     tmp_filename: PathBuf, | ||||
|     csum: Option<openssl::sha::Sha256>, | ||||
|     pub uuid: [u8; 16], | ||||
|     pub ctime: i64, | ||||
| } | ||||
|  | ||||
| impl Drop for DynamicIndexWriter { | ||||
|     fn drop(&mut self) { | ||||
|         let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl DynamicIndexWriter { | ||||
|     pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> { | ||||
|         let shared_lock = store.try_shared_lock()?; | ||||
|  | ||||
|         let full_path = store.relative_path(path); | ||||
|         let mut tmp_path = full_path.clone(); | ||||
|         tmp_path.set_extension("tmp_didx"); | ||||
|  | ||||
|         let file = std::fs::OpenOptions::new() | ||||
|             .create(true) | ||||
|             .truncate(true) | ||||
|             .read(true) | ||||
|             .write(true) | ||||
|             .open(&tmp_path)?; | ||||
|  | ||||
|         let mut writer = BufWriter::with_capacity(1024 * 1024, file); | ||||
|  | ||||
|         let ctime = proxmox::tools::time::epoch_i64(); | ||||
|  | ||||
|         let uuid = Uuid::generate(); | ||||
|  | ||||
|         let mut header = DynamicIndexHeader::zeroed(); | ||||
|         header.magic = super::DYNAMIC_SIZED_CHUNK_INDEX_1_0; | ||||
|         header.ctime = i64::to_le(ctime); | ||||
|         header.uuid = *uuid.as_bytes(); | ||||
|         // header.index_csum = [0u8; 32]; | ||||
|         writer.write_all(header.as_bytes())?; | ||||
|  | ||||
|         let csum = Some(openssl::sha::Sha256::new()); | ||||
|  | ||||
|         Ok(Self { | ||||
|             store, | ||||
|             _lock: shared_lock, | ||||
|             writer, | ||||
|             closed: false, | ||||
|             filename: full_path, | ||||
|             tmp_filename: tmp_path, | ||||
|             ctime, | ||||
|             uuid: *uuid.as_bytes(), | ||||
|             csum, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     // fixme: use add_chunk instead? | ||||
|     pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> { | ||||
|         self.store.insert_chunk(chunk, digest) | ||||
|     } | ||||
|  | ||||
|     pub fn close(&mut self) -> Result<[u8; 32], Error> { | ||||
|         if self.closed { | ||||
|             bail!( | ||||
|                 "cannot close already closed archive index file {:?}", | ||||
|                 self.filename | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         self.closed = true; | ||||
|  | ||||
|         self.writer.flush()?; | ||||
|  | ||||
|         let csum_offset = proxmox::offsetof!(DynamicIndexHeader, index_csum); | ||||
|         self.writer.seek(SeekFrom::Start(csum_offset as u64))?; | ||||
|  | ||||
|         let csum = self.csum.take().unwrap(); | ||||
|         let index_csum = csum.finish(); | ||||
|  | ||||
|         self.writer.write_all(&index_csum)?; | ||||
|         self.writer.flush()?; | ||||
|  | ||||
|         if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { | ||||
|             bail!("Atomic rename file {:?} failed - {}", self.filename, err); | ||||
|         } | ||||
|  | ||||
|         Ok(index_csum) | ||||
|     } | ||||
|  | ||||
|     // fixme: rename to add_digest | ||||
|     pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> { | ||||
|         if self.closed { | ||||
|             bail!( | ||||
|                 "cannot write to closed dynamic index file {:?}", | ||||
|                 self.filename | ||||
|             ); | ||||
|         } | ||||
|  | ||||
|         let offset_le: &[u8; 8] = unsafe { &std::mem::transmute::<u64, [u8; 8]>(offset.to_le()) }; | ||||
|  | ||||
|         if let Some(ref mut csum) = self.csum { | ||||
|             csum.update(offset_le); | ||||
|             csum.update(digest); | ||||
|         } | ||||
|  | ||||
|         self.writer.write_all(offset_le)?; | ||||
|         self.writer.write_all(digest)?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Writer which splits a binary stream into dynamic sized chunks | ||||
| /// | ||||
| /// And store the resulting chunk list into the index file. | ||||
| pub struct DynamicChunkWriter { | ||||
|     index: DynamicIndexWriter, | ||||
|     closed: bool, | ||||
|     chunker: Chunker, | ||||
|     stat: ChunkStat, | ||||
|     chunk_offset: usize, | ||||
|     last_chunk: usize, | ||||
|     chunk_buffer: Vec<u8>, | ||||
| } | ||||
|  | ||||
| impl DynamicChunkWriter { | ||||
|     pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self { | ||||
|         Self { | ||||
|             index, | ||||
|             closed: false, | ||||
|             chunker: Chunker::new(chunk_size), | ||||
|             stat: ChunkStat::new(0), | ||||
|             chunk_offset: 0, | ||||
|             last_chunk: 0, | ||||
|             chunk_buffer: Vec::with_capacity(chunk_size * 4), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn stat(&self) -> &ChunkStat { | ||||
|         &self.stat | ||||
|     } | ||||
|  | ||||
|     pub fn close(&mut self) -> Result<(), Error> { | ||||
|         if self.closed { | ||||
|             return Ok(()); | ||||
|         } | ||||
|  | ||||
|         self.closed = true; | ||||
|  | ||||
|         self.write_chunk_buffer()?; | ||||
|  | ||||
|         self.index.close()?; | ||||
|  | ||||
|         self.stat.size = self.chunk_offset as u64; | ||||
|  | ||||
|         // add size of index file | ||||
|         self.stat.size += | ||||
|             (self.stat.chunk_count * 40 + std::mem::size_of::<DynamicIndexHeader>()) as u64; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn write_chunk_buffer(&mut self) -> Result<(), Error> { | ||||
|         let chunk_size = self.chunk_buffer.len(); | ||||
|  | ||||
|         if chunk_size == 0 { | ||||
|             return Ok(()); | ||||
|         } | ||||
|  | ||||
|         let expected_chunk_size = self.chunk_offset - self.last_chunk; | ||||
|         if expected_chunk_size != self.chunk_buffer.len() { | ||||
|             bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size); | ||||
|         } | ||||
|  | ||||
|         self.stat.chunk_count += 1; | ||||
|  | ||||
|         self.last_chunk = self.chunk_offset; | ||||
|  | ||||
|         let (chunk, digest) = DataChunkBuilder::new(&self.chunk_buffer) | ||||
|             .compress(true) | ||||
|             .build()?; | ||||
|  | ||||
|         match self.index.insert_chunk(&chunk, &digest) { | ||||
|             Ok((is_duplicate, compressed_size)) => { | ||||
|                 self.stat.compressed_size += compressed_size; | ||||
|                 if is_duplicate { | ||||
|                     self.stat.duplicate_chunks += 1; | ||||
|                 } else { | ||||
|                     self.stat.disk_size += compressed_size; | ||||
|                 } | ||||
|  | ||||
|                 println!( | ||||
|                     "ADD CHUNK {:016x} {} {}% {} {}", | ||||
|                     self.chunk_offset, | ||||
|                     chunk_size, | ||||
|                     (compressed_size * 100) / (chunk_size as u64), | ||||
|                     is_duplicate, | ||||
|                     proxmox::tools::digest_to_hex(&digest) | ||||
|                 ); | ||||
|                 self.index.add_chunk(self.chunk_offset as u64, &digest)?; | ||||
|                 self.chunk_buffer.truncate(0); | ||||
|                 Ok(()) | ||||
|             } | ||||
|             Err(err) => { | ||||
|                 self.chunk_buffer.truncate(0); | ||||
|                 Err(err) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Write for DynamicChunkWriter { | ||||
|     fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> { | ||||
|         let chunker = &mut self.chunker; | ||||
|  | ||||
|         let pos = chunker.scan(data); | ||||
|  | ||||
|         if pos > 0 { | ||||
|             self.chunk_buffer.extend_from_slice(&data[0..pos]); | ||||
|             self.chunk_offset += pos; | ||||
|  | ||||
|             if let Err(err) = self.write_chunk_buffer() { | ||||
|                 return Err(std::io::Error::new( | ||||
|                     std::io::ErrorKind::Other, | ||||
|                     err.to_string(), | ||||
|                 )); | ||||
|             } | ||||
|             Ok(pos) | ||||
|         } else { | ||||
|             self.chunk_offset += data.len(); | ||||
|             self.chunk_buffer.extend_from_slice(data); | ||||
|             Ok(data.len()) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn flush(&mut self) -> std::result::Result<(), std::io::Error> { | ||||
|         Err(std::io::Error::new( | ||||
|             std::io::ErrorKind::Other, | ||||
|             "please use close() instead of flush()", | ||||
|         )) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -1,146 +1,4 @@ | ||||
| //! This module implements the data storage and access layer. | ||||
| //! | ||||
| //! # Data formats | ||||
| //! | ||||
| //! PBS splits large files into chunks, and stores them deduplicated using | ||||
| //! a content addressable storage format. | ||||
| //! | ||||
| //! Backup snapshots are stored as folders containing a manifest file and | ||||
| //! potentially one or more index or blob files. | ||||
| //! | ||||
| //! The manifest contains hashes of all other files and can be signed by | ||||
| //! the client. | ||||
| //! | ||||
| //! Blob files contain data directly. They are used for config files and | ||||
| //! the like. | ||||
| //! | ||||
| //! Index files are used to reconstruct an original file. They contain a | ||||
| //! list of SHA256 checksums. The `DynamicIndex*` format is able to deal | ||||
| //! with dynamic chunk sizes (CT and host backups), whereas the | ||||
| //! `FixedIndex*` format is an optimization to store a list of equal sized | ||||
| //! chunks (VMs, whole block devices). | ||||
| //! | ||||
| //! A chunk is defined as a binary blob, which is stored inside a | ||||
| //! [ChunkStore](struct.ChunkStore.html) instead of the backup directory | ||||
| //! directly, and can be addressed by its SHA256 digest. | ||||
| //! | ||||
| //! | ||||
| //! # Garbage Collection (GC) | ||||
| //! | ||||
| //! Deleting backups is as easy as deleting the corresponding .idx files. | ||||
| //! However, this does not free up any storage, because those files just | ||||
| //! contain references to chunks. | ||||
| //! | ||||
| //! To free up some storage, we run a garbage collection process at | ||||
| //! regular intervals. The collector uses a mark and sweep approach. In | ||||
| //! the first phase, it scans all .idx files to mark used chunks. The | ||||
| //! second phase then removes all unmarked chunks from the store. | ||||
| //! | ||||
| //! The locking mechanisms mentioned below make sure that we are the only | ||||
| //! process running GC. We still want to be able to create backups during | ||||
| //! GC, so there may be multiple backup threads/tasks running, either | ||||
| //! started before GC, or while GC is running. | ||||
| //! | ||||
| //! ## `atime` based GC | ||||
| //! | ||||
| //! The idea here is to mark chunks by updating the `atime` (access | ||||
| //! timestamp) on the chunk file. This is quite simple and does not need | ||||
| //! additional RAM. | ||||
| //! | ||||
| //! One minor problem is that recent Linux versions use the `relatime` | ||||
| //! mount flag by default for performance reasons (and we want that). When | ||||
| //! enabled, `atime` data is written to the disk only if the file has been | ||||
| //! modified since the `atime` data was last updated (`mtime`), or if the | ||||
| //! file was last accessed more than a certain amount of time ago (by | ||||
| //! default 24h). So we may only delete chunks with `atime` older than 24 | ||||
| //! hours. | ||||
| //! | ||||
| //! Another problem arises from running backups. The mark phase does not | ||||
| //! find any chunks from those backups, because there is no .idx file for | ||||
| //! them (created after the backup). Chunks created or touched by those | ||||
| //! backups may have an `atime` as old as the start time of those backups. | ||||
| //! Please note that the backup start time may predate the GC start time. | ||||
| //! So we may only delete chunks older than the start time of those | ||||
| //! running backup jobs, which might be more than 24h back (this is the | ||||
| //! reason why ProcessLocker exclusive locks only have to be exclusive | ||||
| //! between processes, since within one we can determine the age of the | ||||
| //! oldest shared lock). | ||||
| //! | ||||
| //! ## Store `marks` in RAM using a HASH | ||||
| //! | ||||
| //! Might be better. Under investigation. | ||||
| //! | ||||
| //! | ||||
| //! # Locking | ||||
| //! | ||||
| //! Since PBS allows multiple potentially interfering operations at the | ||||
| //! same time (e.g. garbage collect, prune, multiple backup creations | ||||
| //! (only in separate groups), forget, ...), these need to lock against | ||||
| //! each other in certain scenarios. There is no overarching global lock | ||||
| //! though, instead always the finest grained lock possible is used, | ||||
| //! because running these operations concurrently is treated as a feature | ||||
| //! on its own. | ||||
| //! | ||||
| //! ## Inter-process Locking | ||||
| //! | ||||
| //! We need to be able to restart the proxmox-backup service daemons, so | ||||
| //! that we can update the software without rebooting the host. But such | ||||
| //! restarts must not abort running backup jobs, so we need to keep the | ||||
| //! old service running until those jobs are finished. This implies that | ||||
| //! we need some kind of locking for modifying chunks and indices in the | ||||
| //! ChunkStore. | ||||
| //! | ||||
| //! Please note that it is perfectly valid to have multiple | ||||
| //! parallel ChunkStore writers, even when they write the same chunk | ||||
| //! (because the chunk would have the same name and the same data, and | ||||
| //! writes are completed atomically via a rename). The only problem is | ||||
| //! garbage collection, because we need to avoid deleting chunks which are | ||||
| //! still referenced. | ||||
| //! | ||||
| //! To do this we use the | ||||
| //! [ProcessLocker](../tools/struct.ProcessLocker.html). | ||||
| //! | ||||
| //! ### ChunkStore-wide | ||||
| //! | ||||
| //! * Create Index Files: | ||||
| //! | ||||
| //!   Acquire shared lock for ChunkStore. | ||||
| //! | ||||
| //!   Note: When creating .idx files, we create a temporary .tmp file, | ||||
| //!   then do an atomic rename. | ||||
| //! | ||||
| //! * Garbage Collect: | ||||
| //! | ||||
| //!   Acquire exclusive lock for ChunkStore. If we have | ||||
| //!   already a shared lock for the ChunkStore, try to upgrade that | ||||
| //!   lock. | ||||
| //! | ||||
| //! Exclusive locks only work _between processes_. It is valid to have an | ||||
| //! exclusive and one or more shared locks held within one process. Writing | ||||
| //! chunks within one process is synchronized using the gc_mutex. | ||||
| //! | ||||
| //! On server restart, we stop any running GC in the old process to avoid | ||||
| //! having the exclusive lock held for too long. | ||||
| //! | ||||
| //! ## Locking table | ||||
| //! | ||||
| //! Below table shows all operations that play a role in locking, and which | ||||
| //! mechanisms are used to make their concurrent usage safe. | ||||
| //! | ||||
| //! | starting ><br>v during | read index file | create index file | GC mark | GC sweep | update manifest | forget | prune | create backup | verify | reader api | | ||||
| //! |-|-|-|-|-|-|-|-|-|-|-| | ||||
| //! | **read index file** | / | / | / | / | / | mmap stays valid, oldest_shared_lock prevents GC | see forget column | / | / | / | | ||||
| //! | **create index file** | / | / | / | / | / | / | / | /, happens at the end, after all chunks are touched | /, only happens without a manifest | / | | ||||
| //! | **GC mark** | / | Datastore process-lock shared | gc_mutex, exclusive ProcessLocker | gc_mutex | /, GC only cares about index files, not manifests | tells GC about removed chunks | see forget column | /, index files don’t exist yet | / | / | | ||||
| //! | **GC sweep** | / | Datastore process-lock shared | gc_mutex, exclusive ProcessLocker | gc_mutex | / | /, chunks already marked | see forget column | chunks get touched; chunk_store.mutex; oldest PL lock | / | / | | ||||
| //! | **update manifest** | / | / | / | / | update_manifest lock | update_manifest lock, remove dir under lock | see forget column | /, “write manifest” happens at the end | /, can call “write manifest”, see that column | / | | ||||
| //! | **forget** | / | / | removed_during_gc mutex is held during unlink | marking done, doesn’t matter if forgotten now | update_manifest lock, forget waits for lock | /, unlink is atomic | causes forget to fail, but that’s OK | running backup has snapshot flock | /, potentially detects missing folder | shared snap flock | | ||||
| //! | **prune** | / | / | see forget row | see forget row | see forget row | causes warn in prune, but no error | see forget column | running and last non-running can’t be pruned | see forget row | shared snap flock | | ||||
| //! | **create backup** | / | only time this happens, thus has snapshot flock | / | chunks get touched; chunk_store.mutex; oldest PL lock | no lock, but cannot exist beforehand | snapshot flock, can’t be forgotten | running and last non-running can’t be pruned | snapshot group flock, only one running per group | /, won’t be verified since manifest missing | / | | ||||
| //! | **verify** | / | / | / | / | see “update manifest” row | /, potentially detects missing folder | see forget column | / | /, but useless (“update manifest” protects itself) | / | | ||||
| //! | **reader api** | / | / | / | /, open snap can’t be forgotten, so ref must exist | / | prevented by shared snap flock | prevented by shared snap flock | / | / | /, lock is shared |! | ||||
| //! * / = no interaction | ||||
| //! * shared/exclusive from POV of 'starting' process | ||||
| //! Server/client-specific parts for what's otherwise in pbs-datastore. | ||||
|  | ||||
| use anyhow::{bail, Error}; | ||||
|  | ||||
| @ -216,6 +74,13 @@ pub use pbs_datastore::key_derivation; | ||||
| pub use pbs_datastore::key_derivation::*; | ||||
| pub use pbs_datastore::manifest; | ||||
| pub use pbs_datastore::manifest::*; | ||||
| pub use pbs_datastore::prune; | ||||
| pub use pbs_datastore::prune::*; | ||||
|  | ||||
| pub use pbs_datastore::dynamic_index::*; | ||||
| pub use pbs_datastore::fixed_index; | ||||
| pub use pbs_datastore::fixed_index::*; | ||||
|  | ||||
| pub use pbs_datastore::read_chunk::*; | ||||
|  | ||||
| mod chunk_stream; | ||||
| @ -225,15 +90,10 @@ pub use chunk_stream::*; | ||||
| mod read_chunk; | ||||
| pub use read_chunk::*; | ||||
|  | ||||
| mod fixed_index; | ||||
| pub use fixed_index::*; | ||||
|  | ||||
| // Split | ||||
| mod dynamic_index; | ||||
| pub use dynamic_index::*; | ||||
|  | ||||
| mod prune; | ||||
| pub use prune::*; | ||||
|  | ||||
| mod datastore; | ||||
| pub use datastore::*; | ||||
|  | ||||
|  | ||||
		Reference in New Issue
	
	Block a user