diff --git a/pbs-datastore/src/backup_info.rs b/pbs-datastore/src/backup_info.rs index f9718988..1902e135 100644 --- a/pbs-datastore/src/backup_info.rs +++ b/pbs-datastore/src/backup_info.rs @@ -5,13 +5,8 @@ use std::str::FromStr; use anyhow::{bail, format_err, Error}; use pbs_api_types::{ - BACKUP_ID_REGEX, - BACKUP_TYPE_REGEX, - BACKUP_DATE_REGEX, - GROUP_PATH_REGEX, - SNAPSHOT_PATH_REGEX, - BACKUP_FILE_REGEX, - GroupFilter, + GroupFilter, BACKUP_DATE_REGEX, BACKUP_FILE_REGEX, BACKUP_ID_REGEX, BACKUP_TYPE_REGEX, + GROUP_PATH_REGEX, SNAPSHOT_PATH_REGEX, }; use super::manifest::MANIFEST_BLOB_NAME; @@ -96,7 +91,11 @@ impl BackupGroup { let protected = backup_dir.is_protected(base_path.to_owned()); - list.push(BackupInfo { backup_dir, files, protected }); + list.push(BackupInfo { + backup_dir, + files, + protected, + }); Ok(()) }, @@ -331,7 +330,11 @@ impl BackupInfo { let files = list_backup_files(libc::AT_FDCWD, &path)?; let protected = backup_dir.is_protected(base_path.to_owned()); - Ok(BackupInfo { backup_dir, files, protected }) + Ok(BackupInfo { + backup_dir, + files, + protected, + }) } /// Finds the latest backup inside a backup group @@ -399,9 +402,7 @@ impl BackupInfo { pub fn is_finished(&self) -> bool { // backup is considered unfinished if there is no manifest - self.files - .iter() - .any(|name| name == MANIFEST_BLOB_NAME) + self.files.iter().any(|name| name == MANIFEST_BLOB_NAME) } } diff --git a/pbs-datastore/src/cached_chunk_reader.rs b/pbs-datastore/src/cached_chunk_reader.rs index 0a383904..27c68584 100644 --- a/pbs-datastore/src/cached_chunk_reader.rs +++ b/pbs-datastore/src/cached_chunk_reader.rs @@ -10,8 +10,8 @@ use anyhow::Error; use futures::ready; use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; -use proxmox_lang::io_format_err; use proxmox_lang::error::io_err_other; +use proxmox_lang::io_format_err; use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; diff --git a/pbs-datastore/src/catalog.rs b/pbs-datastore/src/catalog.rs index 22454921..c07b71a6 100644 --- a/pbs-datastore/src/catalog.rs +++ b/pbs-datastore/src/catalog.rs @@ -1,7 +1,7 @@ use std::convert::TryFrom; use std::ffi::{CStr, CString, OsStr}; use std::fmt; -use std::io::{Read, Write, Seek, SeekFrom}; +use std::io::{Read, Seek, SeekFrom, Write}; use std::os::unix::ffi::OsStrExt; use anyhow::{bail, format_err, Error}; @@ -31,7 +31,7 @@ pub trait BackupCatalogWriter { } #[repr(u8)] -#[derive(Copy,Clone,PartialEq)] +#[derive(Copy, Clone, PartialEq)] pub enum CatalogEntryType { Directory = b'd', File = b'f', @@ -44,7 +44,7 @@ pub enum CatalogEntryType { } impl TryFrom for CatalogEntryType { - type Error=Error; + type Error = Error; fn try_from(value: u8) -> Result { Ok(match value { @@ -106,51 +106,55 @@ pub enum DirEntryAttribute { } impl DirEntry { - fn new(etype: CatalogEntryType, name: Vec, start: u64, size: u64, mtime: i64) -> Self { match etype { - CatalogEntryType::Directory => { - DirEntry { name, attr: DirEntryAttribute::Directory { start } } - } - CatalogEntryType::File => { - DirEntry { name, attr: DirEntryAttribute::File { size, mtime } } - } - CatalogEntryType::Symlink => { - DirEntry { name, attr: DirEntryAttribute::Symlink } - } - CatalogEntryType::Hardlink => { - DirEntry { name, attr: DirEntryAttribute::Hardlink } - } - CatalogEntryType::BlockDevice => { - DirEntry { name, attr: DirEntryAttribute::BlockDevice } - } - CatalogEntryType::CharDevice => { - DirEntry { name, attr: DirEntryAttribute::CharDevice } - } - CatalogEntryType::Fifo => { - DirEntry { name, attr: DirEntryAttribute::Fifo } - } - CatalogEntryType::Socket => { - DirEntry { name, attr: DirEntryAttribute::Socket } - } + CatalogEntryType::Directory => DirEntry { + name, + attr: DirEntryAttribute::Directory { start }, + }, + CatalogEntryType::File => DirEntry { + name, + attr: DirEntryAttribute::File { size, mtime }, + }, + CatalogEntryType::Symlink => DirEntry { + name, + attr: DirEntryAttribute::Symlink, + }, + CatalogEntryType::Hardlink => DirEntry { + name, + attr: DirEntryAttribute::Hardlink, + }, + CatalogEntryType::BlockDevice => DirEntry { + name, + attr: DirEntryAttribute::BlockDevice, + }, + CatalogEntryType::CharDevice => DirEntry { + name, + attr: DirEntryAttribute::CharDevice, + }, + CatalogEntryType::Fifo => DirEntry { + name, + attr: DirEntryAttribute::Fifo, + }, + CatalogEntryType::Socket => DirEntry { + name, + attr: DirEntryAttribute::Socket, + }, } } /// Get file mode bits for this entry to be used with the `MatchList` api. pub fn get_file_mode(&self) -> Option { - Some( - match self.attr { - DirEntryAttribute::Directory { .. } => pxar::mode::IFDIR, - DirEntryAttribute::File { .. } => pxar::mode::IFREG, - DirEntryAttribute::Symlink => pxar::mode::IFLNK, - DirEntryAttribute::Hardlink => return None, - DirEntryAttribute::BlockDevice => pxar::mode::IFBLK, - DirEntryAttribute::CharDevice => pxar::mode::IFCHR, - DirEntryAttribute::Fifo => pxar::mode::IFIFO, - DirEntryAttribute::Socket => pxar::mode::IFSOCK, - } - as u32 - ) + Some(match self.attr { + DirEntryAttribute::Directory { .. } => pxar::mode::IFDIR, + DirEntryAttribute::File { .. } => pxar::mode::IFREG, + DirEntryAttribute::Symlink => pxar::mode::IFLNK, + DirEntryAttribute::Hardlink => return None, + DirEntryAttribute::BlockDevice => pxar::mode::IFBLK, + DirEntryAttribute::CharDevice => pxar::mode::IFCHR, + DirEntryAttribute::Fifo => pxar::mode::IFIFO, + DirEntryAttribute::Socket => pxar::mode::IFSOCK, + } as u32) } /// Check if DirEntry is a directory @@ -170,60 +174,82 @@ struct DirInfo { } impl DirInfo { - fn new(name: CString) -> Self { - DirInfo { name, entries: Vec::new() } + DirInfo { + name, + entries: Vec::new(), + } } fn new_rootdir() -> Self { DirInfo::new(CString::new(b"/".to_vec()).unwrap()) } - fn encode_entry( - writer: &mut W, - entry: &DirEntry, - pos: u64, - ) -> Result<(), Error> { + fn encode_entry(writer: &mut W, entry: &DirEntry, pos: u64) -> Result<(), Error> { match entry { - DirEntry { name, attr: DirEntryAttribute::Directory { start } } => { + DirEntry { + name, + attr: DirEntryAttribute::Directory { start }, + } => { writer.write_all(&[CatalogEntryType::Directory as u8])?; catalog_encode_u64(writer, name.len() as u64)?; writer.write_all(name)?; catalog_encode_u64(writer, pos - start)?; } - DirEntry { name, attr: DirEntryAttribute::File { size, mtime } } => { + DirEntry { + name, + attr: DirEntryAttribute::File { size, mtime }, + } => { writer.write_all(&[CatalogEntryType::File as u8])?; catalog_encode_u64(writer, name.len() as u64)?; writer.write_all(name)?; catalog_encode_u64(writer, *size)?; catalog_encode_i64(writer, *mtime)?; } - DirEntry { name, attr: DirEntryAttribute::Symlink } => { + DirEntry { + name, + attr: DirEntryAttribute::Symlink, + } => { writer.write_all(&[CatalogEntryType::Symlink as u8])?; catalog_encode_u64(writer, name.len() as u64)?; writer.write_all(name)?; } - DirEntry { name, attr: DirEntryAttribute::Hardlink } => { + DirEntry { + name, + attr: DirEntryAttribute::Hardlink, + } => { writer.write_all(&[CatalogEntryType::Hardlink as u8])?; catalog_encode_u64(writer, name.len() as u64)?; writer.write_all(name)?; } - DirEntry { name, attr: DirEntryAttribute::BlockDevice } => { + DirEntry { + name, + attr: DirEntryAttribute::BlockDevice, + } => { writer.write_all(&[CatalogEntryType::BlockDevice as u8])?; catalog_encode_u64(writer, name.len() as u64)?; writer.write_all(name)?; } - DirEntry { name, attr: DirEntryAttribute::CharDevice } => { + DirEntry { + name, + attr: DirEntryAttribute::CharDevice, + } => { writer.write_all(&[CatalogEntryType::CharDevice as u8])?; catalog_encode_u64(writer, name.len() as u64)?; writer.write_all(name)?; } - DirEntry { name, attr: DirEntryAttribute::Fifo } => { + DirEntry { + name, + attr: DirEntryAttribute::Fifo, + } => { writer.write_all(&[CatalogEntryType::Fifo as u8])?; catalog_encode_u64(writer, name.len() as u64)?; writer.write_all(name)?; } - DirEntry { name, attr: DirEntryAttribute::Socket } => { + DirEntry { + name, + attr: DirEntryAttribute::Socket, + } => { writer.write_all(&[CatalogEntryType::Socket as u8])?; catalog_encode_u64(writer, name.len() as u64)?; writer.write_all(name)?; @@ -250,7 +276,6 @@ impl DirInfo { data: &[u8], mut callback: C, ) -> Result<(), Error> { - let mut cursor = data; let entries = catalog_decode_u64(&mut cursor)?; @@ -258,14 +283,17 @@ impl DirInfo { let mut name_buf = vec![0u8; 4096]; for _ in 0..entries { - - let mut buf = [ 0u8 ]; + let mut buf = [0u8]; cursor.read_exact(&mut buf)?; let etype = CatalogEntryType::try_from(buf[0])?; let name_len = catalog_decode_u64(&mut cursor)? as usize; if name_len >= name_buf.len() { - bail!("directory entry name too long ({} >= {})", name_len, name_buf.len()); + bail!( + "directory entry name too long ({} >= {})", + name_len, + name_buf.len() + ); } let name = &mut name_buf[0..name_len]; cursor.read_exact(name)?; @@ -280,9 +308,7 @@ impl DirInfo { let mtime = catalog_decode_i64(&mut cursor)?; callback(etype, name, 0, size, mtime)? } - _ => { - callback(etype, name, 0, 0, 0)? - } + _ => callback(etype, name, 0, 0, 0)?, }; if !cont { return Ok(()); @@ -309,11 +335,14 @@ pub struct CatalogWriter { pos: u64, } -impl CatalogWriter { - +impl CatalogWriter { /// Create a new CatalogWriter instance pub fn new(writer: W) -> Result { - let mut me = Self { writer, dirstack: vec![ DirInfo::new_rootdir() ], pos: 0 }; + let mut me = Self { + writer, + dirstack: vec![DirInfo::new_rootdir()], + pos: 0, + }; me.write_all(&PROXMOX_CATALOG_FILE_MAGIC_1_0)?; Ok(me) } @@ -346,8 +375,7 @@ impl CatalogWriter { } } -impl BackupCatalogWriter for CatalogWriter { - +impl BackupCatalogWriter for CatalogWriter { fn start_directory(&mut self, name: &CStr) -> Result<(), Error> { let new = DirInfo::new(name.to_owned()); self.dirstack.push(new); @@ -367,59 +395,107 @@ impl BackupCatalogWriter for CatalogWriter { } }; - let current = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let current = self + .dirstack + .last_mut() + .ok_or_else(|| format_err!("outside root"))?; let name = name.to_bytes().to_vec(); - current.entries.push(DirEntry { name, attr: DirEntryAttribute::Directory { start } }); + current.entries.push(DirEntry { + name, + attr: DirEntryAttribute::Directory { start }, + }); Ok(()) } fn add_file(&mut self, name: &CStr, size: u64, mtime: i64) -> Result<(), Error> { - let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let dir = self + .dirstack + .last_mut() + .ok_or_else(|| format_err!("outside root"))?; let name = name.to_bytes().to_vec(); - dir.entries.push(DirEntry { name, attr: DirEntryAttribute::File { size, mtime } }); + dir.entries.push(DirEntry { + name, + attr: DirEntryAttribute::File { size, mtime }, + }); Ok(()) } fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> { - let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let dir = self + .dirstack + .last_mut() + .ok_or_else(|| format_err!("outside root"))?; let name = name.to_bytes().to_vec(); - dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Symlink }); + dir.entries.push(DirEntry { + name, + attr: DirEntryAttribute::Symlink, + }); Ok(()) } fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> { - let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let dir = self + .dirstack + .last_mut() + .ok_or_else(|| format_err!("outside root"))?; let name = name.to_bytes().to_vec(); - dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Hardlink }); + dir.entries.push(DirEntry { + name, + attr: DirEntryAttribute::Hardlink, + }); Ok(()) } fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> { - let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let dir = self + .dirstack + .last_mut() + .ok_or_else(|| format_err!("outside root"))?; let name = name.to_bytes().to_vec(); - dir.entries.push(DirEntry { name, attr: DirEntryAttribute::BlockDevice }); + dir.entries.push(DirEntry { + name, + attr: DirEntryAttribute::BlockDevice, + }); Ok(()) } fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> { - let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let dir = self + .dirstack + .last_mut() + .ok_or_else(|| format_err!("outside root"))?; let name = name.to_bytes().to_vec(); - dir.entries.push(DirEntry { name, attr: DirEntryAttribute::CharDevice }); + dir.entries.push(DirEntry { + name, + attr: DirEntryAttribute::CharDevice, + }); Ok(()) } fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> { - let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let dir = self + .dirstack + .last_mut() + .ok_or_else(|| format_err!("outside root"))?; let name = name.to_bytes().to_vec(); - dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Fifo }); + dir.entries.push(DirEntry { + name, + attr: DirEntryAttribute::Fifo, + }); Ok(()) } fn add_socket(&mut self, name: &CStr) -> Result<(), Error> { - let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let dir = self + .dirstack + .last_mut() + .ok_or_else(|| format_err!("outside root"))?; let name = name.to_bytes().to_vec(); - dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Socket }); + dir.entries.push(DirEntry { + name, + attr: DirEntryAttribute::Socket, + }); Ok(()) } } @@ -429,8 +505,7 @@ pub struct CatalogReader { reader: R, } -impl CatalogReader { - +impl CatalogReader { /// Create a new CatalogReader instance pub fn new(reader: R) -> Self { Self { reader } @@ -438,36 +513,35 @@ impl CatalogReader { /// Print whole catalog to stdout pub fn dump(&mut self) -> Result<(), Error> { - let root = self.root()?; match root { - DirEntry { attr: DirEntryAttribute::Directory { start }, .. }=> { - self.dump_dir(std::path::Path::new("./"), start) - } + DirEntry { + attr: DirEntryAttribute::Directory { start }, + .. + } => self.dump_dir(std::path::Path::new("./"), start), _ => unreachable!(), } } /// Get the root DirEntry - pub fn root(&mut self) -> Result { + pub fn root(&mut self) -> Result { // Root dir is special self.reader.seek(SeekFrom::Start(0))?; - let mut magic = [ 0u8; 8]; + let mut magic = [0u8; 8]; self.reader.read_exact(&mut magic)?; if magic != PROXMOX_CATALOG_FILE_MAGIC_1_0 { bail!("got unexpected magic number for catalog"); } self.reader.seek(SeekFrom::End(-8))?; let start = unsafe { self.reader.read_le_value::()? }; - Ok(DirEntry { name: b"".to_vec(), attr: DirEntryAttribute::Directory { start } }) + Ok(DirEntry { + name: b"".to_vec(), + attr: DirEntryAttribute::Directory { start }, + }) } /// Read all directory entries - pub fn read_dir( - &mut self, - parent: &DirEntry, - ) -> Result, Error> { - + pub fn read_dir(&mut self, parent: &DirEntry) -> Result, Error> { let start = match parent.attr { DirEntryAttribute::Directory { start } => start, _ => bail!("parent is not a directory - internal error"), @@ -487,10 +561,7 @@ impl CatalogReader { } /// Lookup a DirEntry from an absolute path - pub fn lookup_recursive( - &mut self, - path: &[u8], - ) -> Result { + pub fn lookup_recursive(&mut self, path: &[u8]) -> Result { let mut current = self.root()?; if path == b"/" { return Ok(current); @@ -500,13 +571,17 @@ impl CatalogReader { &path[1..] } else { path - }.split(|c| *c == b'/'); + } + .split(|c| *c == b'/'); for comp in components { if let Some(entry) = self.lookup(¤t, comp)? { current = entry; } else { - bail!("path {:?} not found in catalog", String::from_utf8_lossy(path)); + bail!( + "path {:?} not found in catalog", + String::from_utf8_lossy(path) + ); } } Ok(current) @@ -517,8 +592,7 @@ impl CatalogReader { &mut self, parent: &DirEntry, filename: &[u8], - ) -> Result, Error> { - + ) -> Result, Error> { let start = match parent.attr { DirEntryAttribute::Directory { start } => start, _ => bail!("parent is not a directory - internal error"), @@ -541,21 +615,21 @@ impl CatalogReader { } /// Read the raw directory info block from current reader position. - fn read_raw_dirinfo_block(&mut self, start: u64) -> Result, Error> { + fn read_raw_dirinfo_block(&mut self, start: u64) -> Result, Error> { self.reader.seek(SeekFrom::Start(start))?; let size = catalog_decode_u64(&mut self.reader)?; - if size < 1 { bail!("got small directory size {}", size) }; + if size < 1 { + bail!("got small directory size {}", size) + }; let data = self.reader.read_exact_allocated(size as usize)?; Ok(data) } /// Print the content of a directory to stdout pub fn dump_dir(&mut self, prefix: &std::path::Path, start: u64) -> Result<(), Error> { - let data = self.read_raw_dirinfo_block(start)?; DirInfo::parse(&data, |etype, name, offset, size, mtime| { - let mut path = std::path::PathBuf::from(prefix); let name: &OsStr = OsStrExt::from_bytes(name); path.push(name); @@ -575,13 +649,7 @@ impl CatalogReader { mtime_string = s; } - println!( - "{} {:?} {} {}", - etype, - path, - size, - mtime_string, - ); + println!("{} {:?} {} {}", etype, path, size, mtime_string,); } _ => { println!("{} {:?}", etype, path); @@ -602,7 +670,7 @@ impl CatalogReader { callback: &mut dyn FnMut(&[u8]) -> Result<(), Error>, ) -> Result<(), Error> { let file_len = file_path.len(); - for e in self.read_dir(parent)? { + for e in self.read_dir(parent)? { let is_dir = e.is_directory(); file_path.truncate(file_len); if !e.name.starts_with(b"/") { @@ -688,11 +756,11 @@ pub fn catalog_encode_i64(writer: &mut W, v: i64) -> Result<(), Error> /// value encoded is <= 2^63 (values > 2^63 cannot be represented in an i64) #[allow(clippy::neg_multiply)] pub fn catalog_decode_i64(reader: &mut R) -> Result { - let mut v: u64 = 0; let mut buf = [0u8]; - for i in 0..11 { // only allow 11 bytes (70 bits + sign marker) + for i in 0..11 { + // only allow 11 bytes (70 bits + sign marker) if buf.is_empty() { bail!("decode_i64 failed - unexpected EOB"); } @@ -706,10 +774,10 @@ pub fn catalog_decode_i64(reader: &mut R) -> Result { } return Ok(((v - 1) as i64 * -1) - 1); // also handles i64::MIN } else if t < 128 { - v |= (t as u64) << (i*7); + v |= (t as u64) << (i * 7); return Ok(v as i64); } else { - v |= ((t & 127) as u64) << (i*7); + v |= ((t & 127) as u64) << (i * 7); } } @@ -741,21 +809,21 @@ pub fn catalog_encode_u64(writer: &mut W, v: u64) -> Result<(), Error> /// We currently read maximal 10 bytes, which give a maximum of 70 bits, /// but we currently only encode up to 64 bits pub fn catalog_decode_u64(reader: &mut R) -> Result { - let mut v: u64 = 0; let mut buf = [0u8]; - for i in 0..10 { // only allow 10 bytes (70 bits) + for i in 0..10 { + // only allow 10 bytes (70 bits) if buf.is_empty() { bail!("decode_u64 failed - unexpected EOB"); } reader.read_exact(&mut buf)?; let t = buf[0]; if t < 128 { - v |= (t as u64) << (i*7); + v |= (t as u64) << (i * 7); return Ok(v); } else { - v |= ((t & 127) as u64) << (i*7); + v |= ((t & 127) as u64) << (i * 7); } } @@ -764,9 +832,7 @@ pub fn catalog_decode_u64(reader: &mut R) -> Result { #[test] fn test_catalog_u64_encoder() { - fn test_encode_decode(value: u64) { - let mut data = Vec::new(); catalog_encode_u64(&mut data, value).unwrap(); @@ -782,17 +848,15 @@ fn test_catalog_u64_encoder() { test_encode_decode(u64::MIN); test_encode_decode(126); - test_encode_decode((1<<12)-1); - test_encode_decode((1<<20)-1); - test_encode_decode((1<<50)-1); + test_encode_decode((1 << 12) - 1); + test_encode_decode((1 << 20) - 1); + test_encode_decode((1 << 50) - 1); test_encode_decode(u64::MAX); } #[test] fn test_catalog_i64_encoder() { - fn test_encode_decode(value: i64) { - let mut data = Vec::new(); catalog_encode_i64(&mut data, value).unwrap(); @@ -806,19 +870,17 @@ fn test_catalog_i64_encoder() { test_encode_decode(-0); test_encode_decode(126); test_encode_decode(-126); - test_encode_decode((1<<12)-1); - test_encode_decode(-(1<<12)-1); - test_encode_decode((1<<20)-1); - test_encode_decode(-(1<<20)-1); + test_encode_decode((1 << 12) - 1); + test_encode_decode(-(1 << 12) - 1); + test_encode_decode((1 << 20) - 1); + test_encode_decode(-(1 << 20) - 1); test_encode_decode(i64::MIN); test_encode_decode(i64::MAX); } #[test] fn test_catalog_i64_compatibility() { - fn test_encode_decode(value: u64) { - let mut data = Vec::new(); catalog_encode_u64(&mut data, value).unwrap(); @@ -830,9 +892,9 @@ fn test_catalog_i64_compatibility() { test_encode_decode(u64::MIN); test_encode_decode(126); - test_encode_decode((1<<12)-1); - test_encode_decode((1<<20)-1); - test_encode_decode((1<<50)-1); + test_encode_decode((1 << 12) - 1); + test_encode_decode((1 << 20) - 1); + test_encode_decode((1 << 50) - 1); test_encode_decode(u64::MAX); } @@ -850,10 +912,10 @@ pub struct ArchiveEntry { /// Is this entry a leaf node, or does it have children (i.e. a directory)? pub leaf: bool, /// The file size, if entry_type is 'f' (file) - #[serde(skip_serializing_if="Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none")] pub size: Option, /// The file "last modified" time stamp, if entry_type is 'f' (file) - #[serde(skip_serializing_if="Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none")] pub mtime: Option, } diff --git a/pbs-datastore/src/checksum_reader.rs b/pbs-datastore/src/checksum_reader.rs index a1a508bd..9f298563 100644 --- a/pbs-datastore/src/checksum_reader.rs +++ b/pbs-datastore/src/checksum_reader.rs @@ -1,6 +1,6 @@ -use anyhow::{Error}; -use std::sync::Arc; +use anyhow::Error; use std::io::Read; +use std::sync::Arc; use proxmox_borrow::Tied; @@ -12,8 +12,7 @@ pub struct ChecksumReader { signer: Option, openssl::sign::Signer<'static>>>, } -impl ChecksumReader { - +impl ChecksumReader { pub fn new(reader: R, config: Option>) -> Self { let hasher = crc32fast::Hasher::new(); let signer = match config { @@ -26,7 +25,11 @@ impl ChecksumReader { None => None, }; - Self { reader, hasher, signer } + Self { + reader, + hasher, + signer, + } } pub fn finish(mut self) -> Result<(R, u32, Option<[u8; 32]>), Error> { @@ -42,19 +45,18 @@ impl ChecksumReader { } } -impl Read for ChecksumReader { - +impl Read for ChecksumReader { fn read(&mut self, buf: &mut [u8]) -> Result { let count = self.reader.read(buf)?; if count > 0 { self.hasher.update(&buf[..count]); if let Some(ref mut signer) = self.signer { - signer.update(&buf[..count]) - .map_err(|err| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("hmac update failed - {}", err)) - })?; + signer.update(&buf[..count]).map_err(|err| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("hmac update failed - {}", err), + ) + })?; } } Ok(count) diff --git a/pbs-datastore/src/checksum_writer.rs b/pbs-datastore/src/checksum_writer.rs index ea9e1b9e..038593c8 100644 --- a/pbs-datastore/src/checksum_writer.rs +++ b/pbs-datastore/src/checksum_writer.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; use std::io::Write; +use std::sync::Arc; -use anyhow::{Error}; +use anyhow::Error; use proxmox_borrow::Tied; @@ -13,8 +13,7 @@ pub struct ChecksumWriter { signer: Option, openssl::sign::Signer<'static>>>, } -impl ChecksumWriter { - +impl ChecksumWriter { pub fn new(writer: W, config: Option>) -> Self { let hasher = crc32fast::Hasher::new(); let signer = match config { @@ -26,7 +25,11 @@ impl ChecksumWriter { } None => None, }; - Self { writer, hasher, signer } + Self { + writer, + hasher, + signer, + } } pub fn finish(mut self) -> Result<(W, u32, Option<[u8; 32]>), Error> { @@ -42,17 +45,16 @@ impl ChecksumWriter { } } -impl Write for ChecksumWriter { - +impl Write for ChecksumWriter { fn write(&mut self, buf: &[u8]) -> Result { self.hasher.update(buf); if let Some(ref mut signer) = self.signer { - signer.update(buf) - .map_err(|err| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("hmac update failed - {}", err)) - })?; + signer.update(buf).map_err(|err| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("hmac update failed - {}", err), + ) + })?; } self.writer.write(buf) } diff --git a/pbs-datastore/src/chunk_stat.rs b/pbs-datastore/src/chunk_stat.rs index 7ae66896..eb7d55d5 100644 --- a/pbs-datastore/src/chunk_stat.rs +++ b/pbs-datastore/src/chunk_stat.rs @@ -10,7 +10,6 @@ pub struct ChunkStat { } impl ChunkStat { - pub fn new(size: u64) -> Self { ChunkStat { size, @@ -27,15 +26,14 @@ impl ChunkStat { impl std::fmt::Debug for ChunkStat { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let avg = ((self.size as f64)/(self.chunk_count as f64)) as usize; - let compression = (self.compressed_size*100)/(self.size as u64); - let rate = (self.disk_size*100)/(self.size as u64); + let avg = ((self.size as f64) / (self.chunk_count as f64)) as usize; + let compression = (self.compressed_size * 100) / (self.size as u64); + let rate = (self.disk_size * 100) / (self.size as u64); let elapsed = self.start_time.elapsed().unwrap(); - let elapsed = (elapsed.as_secs() as f64) + - (elapsed.subsec_millis() as f64)/1000.0; + let elapsed = (elapsed.as_secs() as f64) + (elapsed.subsec_millis() as f64) / 1000.0; - let write_speed = ((self.size as f64)/(1024.0*1024.0))/elapsed; + let write_speed = ((self.size as f64) / (1024.0 * 1024.0)) / elapsed; write!(f, "Size: {}, average chunk size: {}, compression rate: {}%, disk_size: {} ({}%), speed: {:.2} MB/s", self.size, avg, compression, self.disk_size, rate, write_speed) diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs index b2f3ca32..739e349d 100644 --- a/pbs-datastore/src/chunk_store.rs +++ b/pbs-datastore/src/chunk_store.rs @@ -5,19 +5,20 @@ use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; -use proxmox_sys::fs::{CreateOptions, create_path, create_dir}; -use proxmox_sys::process_locker::{ProcessLocker, ProcessLockSharedGuard, ProcessLockExclusiveGuard}; -use proxmox_sys::WorkerTaskContext; -use proxmox_sys::task_log; use pbs_api_types::GarbageCollectionStatus; - +use proxmox_sys::fs::{create_dir, create_path, CreateOptions}; +use proxmox_sys::process_locker::{ + ProcessLockExclusiveGuard, ProcessLockSharedGuard, ProcessLocker, +}; +use proxmox_sys::task_log; +use proxmox_sys::WorkerTaskContext; use crate::DataBlob; /// File system based chunk store pub struct ChunkStore { name: String, // used for error reporting - pub (crate) base: PathBuf, + pub(crate) base: PathBuf, chunk_dir: PathBuf, mutex: Mutex<()>, locker: Arc>, @@ -26,8 +27,15 @@ pub struct ChunkStore { // TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ? pub fn verify_chunk_size(size: usize) -> Result<(), Error> { - - static SIZES: [usize; 7] = [64*1024, 128*1024, 256*1024, 512*1024, 1024*1024, 2048*1024, 4096*1024]; + static SIZES: [usize; 7] = [ + 64 * 1024, + 128 * 1024, + 256 * 1024, + 512 * 1024, + 1024 * 1024, + 2048 * 1024, + 4096 * 1024, + ]; if !SIZES.contains(&size) { bail!("Got unsupported chunk size '{}'", size); @@ -36,26 +44,23 @@ pub fn verify_chunk_size(size: usize) -> Result<(), Error> { } fn digest_to_prefix(digest: &[u8]) -> PathBuf { - - let mut buf = Vec::::with_capacity(2+1+2+1); + let mut buf = Vec::::with_capacity(2 + 1 + 2 + 1); const HEX_CHARS: &[u8; 16] = b"0123456789abcdef"; buf.push(HEX_CHARS[(digest[0] as usize) >> 4]); - buf.push(HEX_CHARS[(digest[0] as usize) &0xf]); + buf.push(HEX_CHARS[(digest[0] as usize) & 0xf]); buf.push(HEX_CHARS[(digest[1] as usize) >> 4]); buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]); buf.push(b'/'); - let path = unsafe { String::from_utf8_unchecked(buf)}; + let path = unsafe { String::from_utf8_unchecked(buf) }; path.into() } impl ChunkStore { - fn chunk_dir>(path: P) -> PathBuf { - let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref()); chunk_dir.push(".chunks"); @@ -66,11 +71,16 @@ impl ChunkStore { &self.base } - pub fn create

(name: &str, path: P, uid: nix::unistd::Uid, gid: nix::unistd::Gid, worker: Option<&dyn WorkerTaskContext>) -> Result + pub fn create

( + name: &str, + path: P, + uid: nix::unistd::Uid, + gid: nix::unistd::Gid, + worker: Option<&dyn WorkerTaskContext>, + ) -> Result where P: Into, { - let base: PathBuf = path.into(); if !base.is_absolute() { @@ -79,19 +89,31 @@ impl ChunkStore { let chunk_dir = Self::chunk_dir(&base); - let options = CreateOptions::new() - .owner(uid) - .group(gid); + let options = CreateOptions::new().owner(uid).group(gid); let default_options = CreateOptions::new(); match create_path(&base, Some(default_options), Some(options.clone())) { - Err(err) => bail!("unable to create chunk store '{}' at {:?} - {}", name, base, err), - Ok(res) => if ! res { nix::unistd::chown(&base, Some(uid), Some(gid))? }, + Err(err) => bail!( + "unable to create chunk store '{}' at {:?} - {}", + name, + base, + err + ), + Ok(res) => { + if !res { + nix::unistd::chown(&base, Some(uid), Some(gid))? + } + } } if let Err(err) = create_dir(&chunk_dir, options.clone()) { - bail!("unable to create chunk store '{}' subdir {:?} - {}", name, chunk_dir, err); + bail!( + "unable to create chunk store '{}' subdir {:?} - {}", + name, + chunk_dir, + err + ); } // create lock file with correct owner/group @@ -101,13 +123,18 @@ impl ChunkStore { // create 64*1024 subdirs let mut last_percentage = 0; - for i in 0..64*1024 { + for i in 0..64 * 1024 { let mut l1path = chunk_dir.clone(); l1path.push(format!("{:04x}", i)); if let Err(err) = create_dir(&l1path, options.clone()) { - bail!("unable to create chunk store '{}' subdir {:?} - {}", name, l1path, err); + bail!( + "unable to create chunk store '{}' subdir {:?} - {}", + name, + l1path, + err + ); } - let percentage = (i*100)/(64*1024); + let percentage = (i * 100) / (64 * 1024); if percentage != last_percentage { if let Some(worker) = worker { task_log!(worker, "Chunkstore create: {}%", percentage) @@ -128,7 +155,6 @@ impl ChunkStore { } pub fn open>(name: &str, base: P) -> Result { - let base: PathBuf = base.into(); if !base.is_absolute() { @@ -138,7 +164,12 @@ impl ChunkStore { let chunk_dir = Self::chunk_dir(&base); if let Err(err) = std::fs::metadata(&chunk_dir) { - bail!("unable to open chunk store '{}' at {:?} - {}", name, chunk_dir, err); + bail!( + "unable to open chunk store '{}' at {:?} - {}", + name, + chunk_dir, + err + ); } let lockfile_path = Self::lockfile_path(&base); @@ -150,7 +181,7 @@ impl ChunkStore { base, chunk_dir, locker, - mutex: Mutex::new(()) + mutex: Mutex::new(()), }) } @@ -159,7 +190,11 @@ impl ChunkStore { Ok(()) } - pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result { + pub fn cond_touch_chunk( + &self, + digest: &[u8; 32], + fail_if_not_exist: bool, + ) -> Result { let (chunk_path, _digest_str) = self.chunk_path(digest); self.cond_touch_path(&chunk_path, fail_if_not_exist) } @@ -169,8 +204,14 @@ impl ChunkStore { const UTIME_OMIT: i64 = (1 << 30) - 2; let times: [libc::timespec; 2] = [ - libc::timespec { tv_sec: 0, tv_nsec: UTIME_NOW }, - libc::timespec { tv_sec: 0, tv_nsec: UTIME_OMIT } + libc::timespec { + tv_sec: 0, + tv_nsec: UTIME_NOW, + }, + libc::timespec { + tv_sec: 0, + tv_nsec: UTIME_OMIT, + }, ]; use nix::NixPath; @@ -194,15 +235,16 @@ impl ChunkStore { pub fn get_chunk_iterator( &self, ) -> Result< - impl Iterator, usize, bool)> + std::iter::FusedIterator, - Error + impl Iterator, usize, bool)> + + std::iter::FusedIterator, + Error, > { use nix::dir::Dir; use nix::fcntl::OFlag; use nix::sys::stat::Mode; - let base_handle = Dir::open(&self.chunk_dir, OFlag::O_RDONLY, Mode::empty()) - .map_err(|err| { + let base_handle = + Dir::open(&self.chunk_dir, OFlag::O_RDONLY, Mode::empty()).map_err(|err| { format_err!( "unable to open store '{}' chunk dir {:?} - {}", self.name, @@ -270,11 +312,16 @@ impl ChunkStore { // other errors are fatal, so end our iteration done = true; // and pass the error through: - return Some((Err(format_err!("unable to read subdir '{}' - {}", subdir, err)), percentage, false)); + return Some(( + Err(format_err!("unable to read subdir '{}' - {}", subdir, err)), + percentage, + false, + )); } } } - }).fuse()) + }) + .fuse()) } pub fn oldest_writer(&self) -> Option { @@ -291,7 +338,7 @@ impl ChunkStore { use nix::sys::stat::fstatat; use nix::unistd::{unlinkat, UnlinkatFlags}; - let mut min_atime = phase1_start_time - 3600*24; // at least 24h (see mount option relatime) + let mut min_atime = phase1_start_time - 3600 * 24; // at least 24h (see mount option relatime) if oldest_writer < min_atime { min_atime = oldest_writer; @@ -305,12 +352,7 @@ impl ChunkStore { for (entry, percentage, bad) in self.get_chunk_iterator()? { if last_percentage != percentage { last_percentage = percentage; - task_log!( - worker, - "processed {}% ({} chunks)", - percentage, - chunk_count, - ); + task_log!(worker, "processed {}% ({} chunks)", percentage, chunk_count,); } worker.check_abort()?; @@ -318,12 +360,19 @@ impl ChunkStore { let (dirfd, entry) = match entry { Ok(entry) => (entry.parent_fd(), entry), - Err(err) => bail!("chunk iterator on chunk store '{}' failed - {}", self.name, err), + Err(err) => bail!( + "chunk iterator on chunk store '{}' failed - {}", + self.name, + err + ), }; let file_type = match entry.file_type() { Some(file_type) => file_type, - None => bail!("unsupported file system type on chunk store '{}'", self.name), + None => bail!( + "unsupported file system type on chunk store '{}'", + self.name + ), }; if file_type != nix::dir::Type::File { continue; @@ -376,12 +425,7 @@ impl ChunkStore { Ok(()) } - pub fn insert_chunk( - &self, - chunk: &DataBlob, - digest: &[u8; 32], - ) -> Result<(bool, u64), Error> { - + pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> { //println!("DIGEST {}", hex::encode(digest)); let (chunk_path, digest_str) = self.chunk_path(digest); @@ -393,7 +437,11 @@ impl ChunkStore { self.touch_chunk(digest)?; return Ok((true, metadata.len())); } else { - bail!("Got unexpected file type on store '{}' for chunk {}", self.name, digest_str); + bail!( + "Got unexpected file type on store '{}' for chunk {}", + self.name, + digest_str + ); } } @@ -422,7 +470,7 @@ impl ChunkStore { })?; if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) { - if std::fs::remove_file(&tmp_path).is_err() { /* ignore */ } + if std::fs::remove_file(&tmp_path).is_err() { /* ignore */ } bail!( "Atomic rename on store '{}' failed for chunk {} - {}", self.name, @@ -436,7 +484,7 @@ impl ChunkStore { Ok((false, encoded_size)) } - pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) { + pub fn chunk_path(&self, digest: &[u8; 32]) -> (PathBuf, String) { let mut chunk_path = self.chunk_dir.clone(); let prefix = digest_to_prefix(digest); chunk_path.push(&prefix); @@ -446,7 +494,6 @@ impl ChunkStore { } pub fn relative_path(&self, path: &Path) -> PathBuf { - let mut full_path = self.base.clone(); full_path.push(path); full_path @@ -469,10 +516,8 @@ impl ChunkStore { } } - #[test] fn test_chunk_store1() { - let mut path = std::fs::canonicalize(".").unwrap(); // we need absolute path path.push(".testdir"); @@ -481,10 +526,14 @@ fn test_chunk_store1() { let chunk_store = ChunkStore::open("test", &path); assert!(chunk_store.is_err()); - let user = nix::unistd::User::from_uid(nix::unistd::Uid::current()).unwrap().unwrap(); + let user = nix::unistd::User::from_uid(nix::unistd::Uid::current()) + .unwrap() + .unwrap(); let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None).unwrap(); - let (chunk, digest) = crate::data_blob::DataChunkBuilder::new(&[0u8, 1u8]).build().unwrap(); + let (chunk, digest) = crate::data_blob::DataChunkBuilder::new(&[0u8, 1u8]) + .build() + .unwrap(); let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap(); assert!(!exists); @@ -492,7 +541,6 @@ fn test_chunk_store1() { let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap(); assert!(exists); - let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None); assert!(chunk_store.is_err()); diff --git a/pbs-datastore/src/crypt_reader.rs b/pbs-datastore/src/crypt_reader.rs index a2d74427..d02f6b50 100644 --- a/pbs-datastore/src/crypt_reader.rs +++ b/pbs-datastore/src/crypt_reader.rs @@ -1,5 +1,5 @@ +use std::io::{BufRead, Read}; use std::sync::Arc; -use std::io::{Read, BufRead}; use anyhow::{bail, Error}; @@ -13,9 +13,13 @@ pub struct CryptReader { finalized: bool, } -impl CryptReader { - - pub fn new(reader: R, iv: [u8; 16], tag: [u8; 16], config: Arc) -> Result { +impl CryptReader { + pub fn new( + reader: R, + iv: [u8; 16], + tag: [u8; 16], + config: Arc, + ) -> Result { let block_size = config.cipher().block_size(); // Note: block size is normally 1 byte for stream ciphers if block_size.count_ones() != 1 || block_size > 512 { bail!("unexpected Cipher block size {}", block_size); @@ -23,7 +27,13 @@ impl CryptReader { let mut crypter = config.data_crypter(&iv, openssl::symm::Mode::Decrypt)?; crypter.set_tag(&tag)?; - Ok(Self { reader, crypter, block_size, finalized: false, small_read_buf: Vec::new() }) + Ok(Self { + reader, + crypter, + block_size, + finalized: false, + small_read_buf: Vec::new(), + }) } pub fn finish(self) -> Result { @@ -34,11 +44,14 @@ impl CryptReader { } } -impl Read for CryptReader { - +impl Read for CryptReader { fn read(&mut self, buf: &mut [u8]) -> Result { if !self.small_read_buf.is_empty() { - let max = if self.small_read_buf.len() > buf.len() { buf.len() } else { self.small_read_buf.len() }; + let max = if self.small_read_buf.len() > buf.len() { + buf.len() + } else { + self.small_read_buf.len() + }; let rest = self.small_read_buf.split_off(max); buf[..max].copy_from_slice(&self.small_read_buf); self.small_read_buf = rest; @@ -48,10 +61,11 @@ impl Read for CryptReader { let data = self.reader.fill_buf()?; // handle small read buffers - if buf.len() <= 2*self.block_size { + if buf.len() <= 2 * self.block_size { let mut outbuf = [0u8; 1024]; - let count = if data.is_empty() { // EOF + let count = if data.is_empty() { + // EOF let written = self.crypter.finalize(&mut outbuf)?; self.finalized = true; written @@ -73,7 +87,8 @@ impl Read for CryptReader { buf[..count].copy_from_slice(&outbuf[..count]); Ok(count) } - } else if data.is_empty() { // EOF + } else if data.is_empty() { + // EOF let rest = self.crypter.finalize(buf)?; self.finalized = true; Ok(rest) diff --git a/pbs-datastore/src/crypt_writer.rs b/pbs-datastore/src/crypt_writer.rs index 36e27b13..f7f4b65b 100644 --- a/pbs-datastore/src/crypt_writer.rs +++ b/pbs-datastore/src/crypt_writer.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::io::Write; +use std::sync::Arc; use anyhow::Error; @@ -8,13 +8,12 @@ use pbs_tools::crypt_config::CryptConfig; pub struct CryptWriter { writer: W, block_size: usize, - encr_buf: Box<[u8; 64*1024]>, + encr_buf: Box<[u8; 64 * 1024]>, iv: [u8; 16], crypter: openssl::symm::Crypter, } -impl CryptWriter { - +impl CryptWriter { pub fn new(writer: W, config: Arc) -> Result { let mut iv = [0u8; 16]; proxmox_sys::linux::fill_with_random_data(&mut iv)?; @@ -22,10 +21,16 @@ impl CryptWriter { let crypter = config.data_crypter(&iv, openssl::symm::Mode::Encrypt)?; - Ok(Self { writer, iv, crypter, block_size, encr_buf: Box::new([0u8; 64*1024]) }) + Ok(Self { + writer, + iv, + crypter, + block_size, + encr_buf: Box::new([0u8; 64 * 1024]), + }) } - pub fn finish(mut self) -> Result<(W, [u8; 16], [u8; 16]), Error> { + pub fn finish(mut self) -> Result<(W, [u8; 16], [u8; 16]), Error> { let rest = self.crypter.finalize(self.encr_buf.as_mut())?; if rest > 0 { self.writer.write_all(&self.encr_buf[..rest])?; @@ -40,18 +45,20 @@ impl CryptWriter { } } -impl Write for CryptWriter { - +impl Write for CryptWriter { fn write(&mut self, buf: &[u8]) -> Result { let mut write_size = buf.len(); if write_size > (self.encr_buf.len() - self.block_size) { write_size = self.encr_buf.len() - self.block_size; } - let count = self.crypter.update(&buf[..write_size], self.encr_buf.as_mut()) + let count = self + .crypter + .update(&buf[..write_size], self.encr_buf.as_mut()) .map_err(|err| { std::io::Error::new( std::io::ErrorKind::Other, - format!("crypter update failed - {}", err)) + format!("crypter update failed - {}", err), + ) })?; self.writer.write_all(&self.encr_buf[..count])?; diff --git a/pbs-datastore/src/data_blob.rs b/pbs-datastore/src/data_blob.rs index 5f02abe7..1b0abd5f 100644 --- a/pbs-datastore/src/data_blob.rs +++ b/pbs-datastore/src/data_blob.rs @@ -6,12 +6,12 @@ use openssl::symm::{decrypt_aead, Mode}; use proxmox_io::{ReadExt, WriteExt}; -use pbs_tools::crypt_config::CryptConfig; use pbs_api_types::CryptMode; +use pbs_tools::crypt_config::CryptConfig; use super::file_formats::*; -const MAX_BLOB_SIZE: usize = 128*1024*1024; +const MAX_BLOB_SIZE: usize = 128 * 1024 * 1024; /// Encoded data chunk with digest and positional information pub struct ChunkInfo { @@ -35,9 +35,8 @@ pub struct DataBlob { } impl DataBlob { - /// accessor to raw_data field - pub fn raw_data(&self) -> &[u8] { + pub fn raw_data(&self) -> &[u8] { &self.raw_data } @@ -59,13 +58,13 @@ impl DataBlob { /// accessor to crc32 checksum pub fn crc(&self) -> u32 { let crc_o = proxmox_lang::offsetof!(DataBlobHeader, crc); - u32::from_le_bytes(self.raw_data[crc_o..crc_o+4].try_into().unwrap()) + u32::from_le_bytes(self.raw_data[crc_o..crc_o + 4].try_into().unwrap()) } // set the CRC checksum field pub fn set_crc(&mut self, crc: u32) { let crc_o = proxmox_lang::offsetof!(DataBlobHeader, crc); - self.raw_data[crc_o..crc_o+4].copy_from_slice(&crc.to_le_bytes()); + self.raw_data[crc_o..crc_o + 4].copy_from_slice(&crc.to_le_bytes()); } /// compute the CRC32 checksum @@ -91,13 +90,11 @@ impl DataBlob { config: Option<&CryptConfig>, compress: bool, ) -> Result { - if data.len() > MAX_BLOB_SIZE { bail!("data blob too large ({} bytes).", data.len()); } let mut blob = if let Some(config) = config { - let compr_data; let (_compress, data, magic) = if compress { compr_data = zstd::block::compress(data, 1)?; @@ -115,7 +112,10 @@ impl DataBlob { let mut raw_data = Vec::with_capacity(data.len() + header_len); let dummy_head = EncryptedDataBlobHeader { - head: DataBlobHeader { magic: [0u8; 8], crc: [0; 4] }, + head: DataBlobHeader { + magic: [0u8; 8], + crc: [0; 4], + }, iv: [0u8; 16], tag: [0u8; 16], }; @@ -126,7 +126,9 @@ impl DataBlob { let (iv, tag) = Self::encrypt_to(config, data, &mut raw_data)?; let head = EncryptedDataBlobHeader { - head: DataBlobHeader { magic, crc: [0; 4] }, iv, tag, + head: DataBlobHeader { magic, crc: [0; 4] }, + iv, + tag, }; unsafe { @@ -135,12 +137,11 @@ impl DataBlob { DataBlob { raw_data } } else { - let max_data_len = data.len() + std::mem::size_of::(); if compress { let mut comp_data = Vec::with_capacity(max_data_len); - let head = DataBlobHeader { + let head = DataBlobHeader { magic: COMPRESSED_BLOB_MAGIC_1_0, crc: [0; 4], }; @@ -151,7 +152,9 @@ impl DataBlob { zstd::stream::copy_encode(data, &mut comp_data, 1)?; if comp_data.len() < max_data_len { - let mut blob = DataBlob { raw_data: comp_data }; + let mut blob = DataBlob { + raw_data: comp_data, + }; blob.set_crc(blob.compute_crc()); return Ok(blob); } @@ -159,7 +162,7 @@ impl DataBlob { let mut raw_data = Vec::with_capacity(max_data_len); - let head = DataBlobHeader { + let head = DataBlobHeader { magic: UNCOMPRESSED_BLOB_MAGIC_1_0, crc: [0; 4], }; @@ -180,18 +183,23 @@ impl DataBlob { pub fn crypt_mode(&self) -> Result { let magic = self.magic(); - Ok(if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 || magic == &COMPRESSED_BLOB_MAGIC_1_0 { - CryptMode::None - } else if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 { - CryptMode::Encrypt - } else { - bail!("Invalid blob magic number."); - }) + Ok( + if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 || magic == &COMPRESSED_BLOB_MAGIC_1_0 { + CryptMode::None + } else if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 { + CryptMode::Encrypt + } else { + bail!("Invalid blob magic number."); + }, + ) } /// Decode blob data - pub fn decode(&self, config: Option<&CryptConfig>, digest: Option<&[u8; 32]>) -> Result, Error> { - + pub fn decode( + &self, + config: Option<&CryptConfig>, + digest: Option<&[u8; 32]>, + ) -> Result, Error> { let magic = self.magic(); if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 { @@ -217,11 +225,21 @@ impl DataBlob { (&self.raw_data[..header_len]).read_le_value::()? }; - if let Some(config) = config { + if let Some(config) = config { let data = if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 { - Self::decode_compressed_chunk(config, &self.raw_data[header_len..], &head.iv, &head.tag)? + Self::decode_compressed_chunk( + config, + &self.raw_data[header_len..], + &head.iv, + &head.tag, + )? } else { - Self::decode_uncompressed_chunk(config, &self.raw_data[header_len..], &head.iv, &head.tag)? + Self::decode_uncompressed_chunk( + config, + &self.raw_data[header_len..], + &head.iv, + &head.tag, + )? }; if let Some(digest) = digest { Self::verify_digest(&data, Some(config), digest)?; @@ -237,8 +255,7 @@ impl DataBlob { /// Load blob from ``reader``, verify CRC pub fn load_from_reader(reader: &mut dyn std::io::Read) -> Result { - - let mut data = Vec::with_capacity(1024*1024); + let mut data = Vec::with_capacity(1024 * 1024); reader.read_to_end(&mut data)?; let blob = Self::from_raw(data)?; @@ -250,7 +267,6 @@ impl DataBlob { /// Create Instance from raw data pub fn from_raw(data: Vec) -> Result { - if data.len() < std::mem::size_of::() { bail!("blob too small ({} bytes).", data.len()); } @@ -258,7 +274,6 @@ impl DataBlob { let magic = &data[0..8]; if magic == ENCR_COMPR_BLOB_MAGIC_1_0 || magic == ENCRYPTED_BLOB_MAGIC_1_0 { - if data.len() < std::mem::size_of::() { bail!("encrypted blob too small ({} bytes).", data.len()); } @@ -267,7 +282,6 @@ impl DataBlob { Ok(blob) } else if magic == COMPRESSED_BLOB_MAGIC_1_0 || magic == UNCOMPRESSED_BLOB_MAGIC_1_0 { - let blob = DataBlob { raw_data: data }; Ok(blob) @@ -293,7 +307,6 @@ impl DataBlob { expected_chunk_size: usize, expected_digest: &[u8; 32], ) -> Result<(), Error> { - let magic = self.magic(); if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 { @@ -304,7 +317,11 @@ impl DataBlob { let data = self.decode(None, Some(expected_digest))?; if expected_chunk_size != data.len() { - bail!("detected chunk with wrong length ({} != {})", expected_chunk_size, data.len()); + bail!( + "detected chunk with wrong length ({} != {})", + expected_chunk_size, + data.len() + ); } Ok(()) @@ -315,7 +332,6 @@ impl DataBlob { config: Option<&CryptConfig>, expected_digest: &[u8; 32], ) -> Result<(), Error> { - let digest = match config { Some(config) => config.compute_digest(data), None => openssl::sha::sha256(data), @@ -344,8 +360,7 @@ impl DataBlob { config: &CryptConfig, data: &[u8], mut output: W, - ) -> Result<([u8;16], [u8;16]), Error> { - + ) -> Result<([u8; 16], [u8; 16]), Error> { let mut iv = [0u8; 16]; proxmox_sys::linux::fill_with_random_data(&mut iv)?; @@ -353,7 +368,7 @@ impl DataBlob { let mut c = config.data_crypter(&iv, Mode::Encrypt)?; - const BUFFER_SIZE: usize = 32*1024; + const BUFFER_SIZE: usize = 32 * 1024; let mut encr_buf = [0u8; BUFFER_SIZE]; let max_encoder_input = BUFFER_SIZE - config.cipher().block_size(); @@ -361,7 +376,9 @@ impl DataBlob { let mut start = 0; loop { let mut end = start + max_encoder_input; - if end > data.len() { end = data.len(); } + if end > data.len() { + end = data.len(); + } if end > start { let count = c.update(&data[start..end], &mut encr_buf)?; output.write_all(&encr_buf[..count])?; @@ -372,7 +389,9 @@ impl DataBlob { } let rest = c.finalize(&mut encr_buf)?; - if rest > 0 { output.write_all(&encr_buf[..rest])?; } + if rest > 0 { + output.write_all(&encr_buf[..rest])?; + } output.flush()?; @@ -388,14 +407,13 @@ impl DataBlob { iv: &[u8; 16], tag: &[u8; 16], ) -> Result, Error> { - - let dec = Vec::with_capacity(1024*1024); + let dec = Vec::with_capacity(1024 * 1024); let mut decompressor = zstd::stream::write::Decoder::new(dec)?; let mut c = config.data_crypter(iv, Mode::Decrypt)?; - const BUFFER_SIZE: usize = 32*1024; + const BUFFER_SIZE: usize = 32 * 1024; let mut decr_buf = [0u8; BUFFER_SIZE]; let max_decoder_input = BUFFER_SIZE - config.cipher().block_size(); @@ -403,7 +421,9 @@ impl DataBlob { let mut start = 0; loop { let mut end = start + max_decoder_input; - if end > data.len() { end = data.len(); } + if end > data.len() { + end = data.len(); + } if end > start { let count = c.update(&data[start..end], &mut decr_buf)?; decompressor.write_all(&decr_buf[0..count])?; @@ -415,7 +435,9 @@ impl DataBlob { c.set_tag(tag)?; let rest = c.finalize(&mut decr_buf)?; - if rest > 0 { decompressor.write_all(&decr_buf[..rest])?; } + if rest > 0 { + decompressor.write_all(&decr_buf[..rest])?; + } decompressor.flush()?; @@ -429,7 +451,6 @@ impl DataBlob { iv: &[u8; 16], tag: &[u8; 16], ) -> Result, Error> { - let decr_data = decrypt_aead( *config.cipher(), config.enc_key(), @@ -441,7 +462,6 @@ impl DataBlob { Ok(decr_data) } - } /// Builder for chunk DataBlobs @@ -457,8 +477,7 @@ pub struct DataChunkBuilder<'a, 'b> { compress: bool, } -impl <'a, 'b> DataChunkBuilder<'a, 'b> { - +impl<'a, 'b> DataChunkBuilder<'a, 'b> { /// Create a new builder instance. pub fn new(orig_data: &'a [u8]) -> Self { Self { @@ -537,5 +556,4 @@ impl <'a, 'b> DataChunkBuilder<'a, 'b> { chunk_builder.build() } - } diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs index d062d1cf..1d389d58 100644 --- a/pbs-datastore/src/datastore.rs +++ b/pbs-datastore/src/datastore.rs @@ -1,9 +1,9 @@ -use std::collections::{HashSet, HashMap}; +use std::collections::{HashMap, HashSet}; +use std::convert::TryFrom; use std::io::{self, Write}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; -use std::convert::TryFrom; use std::str::FromStr; +use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{bail, format_err, Error}; @@ -11,43 +11,40 @@ use lazy_static::lazy_static; use proxmox_schema::ApiType; -use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions}; +use proxmox_sys::fs::{file_read_optional_string, replace_file, CreateOptions}; +use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard}; use proxmox_sys::process_locker::ProcessLockSharedGuard; use proxmox_sys::WorkerTaskContext; use proxmox_sys::{task_log, task_warn}; -use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard}; use pbs_api_types::{ - UPID, DataStoreConfig, Authid, GarbageCollectionStatus, HumanByte, - ChunkOrder, DatastoreTuning, Operation, + Authid, ChunkOrder, DataStoreConfig, DatastoreTuning, GarbageCollectionStatus, HumanByte, + Operation, UPID, }; use pbs_config::{open_backup_lockfile, BackupLockGuard, ConfigVersionCache}; -use crate::DataBlob; -use crate::backup_info::{BackupGroup, BackupDir}; +use crate::backup_info::{BackupDir, BackupGroup}; use crate::chunk_store::ChunkStore; use crate::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; use crate::fixed_index::{FixedIndexReader, FixedIndexWriter}; use crate::index::IndexFile; use crate::manifest::{ - MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NAME, - ArchiveType, BackupManifest, - archive_type, + archive_type, ArchiveType, BackupManifest, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, + MANIFEST_LOCK_NAME, }; use crate::task_tracking::update_active_operations; +use crate::DataBlob; lazy_static! { - static ref DATASTORE_MAP: Mutex>> = Mutex::new(HashMap::new()); + static ref DATASTORE_MAP: Mutex>> = + Mutex::new(HashMap::new()); } /// checks if auth_id is owner, or, if owner is a token, if /// auth_id is the user of the token -pub fn check_backup_owner( - owner: &Authid, - auth_id: &Authid, -) -> Result<(), Error> { - let correct_owner = owner == auth_id - || (owner.is_token() && &Authid::from(owner.user().clone()) == auth_id); +pub fn check_backup_owner(owner: &Authid, auth_id: &Authid) -> Result<(), Error> { + let correct_owner = + owner == auth_id || (owner.is_token() && &Authid::from(owner.user().clone()) == auth_id); if !correct_owner { bail!("backup owner check failed ({} != {})", auth_id, owner); } @@ -147,14 +144,12 @@ impl DataStore { } /// removes all datastores that are not configured anymore - pub fn remove_unused_datastores() -> Result<(), Error>{ + pub fn remove_unused_datastores() -> Result<(), Error> { let (config, _digest) = pbs_config::datastore::config()?; let mut map = DATASTORE_MAP.lock().unwrap(); // removes all elements that are not in the config - map.retain(|key, _| { - config.sections.contains_key(key) - }); + map.retain(|key, _| config.sections.contains_key(key)); Ok(()) } @@ -183,7 +178,8 @@ impl DataStore { }; let tuning: DatastoreTuning = serde_json::from_value( - DatastoreTuning::API_SCHEMA.parse_property_string(config.tuning.as_deref().unwrap_or(""))? + DatastoreTuning::API_SCHEMA + .parse_property_string(config.tuning.as_deref().unwrap_or(""))?, )?; let chunk_order = tuning.chunk_order.unwrap_or(ChunkOrder::Inode); @@ -202,21 +198,32 @@ impl DataStore { &self, ) -> Result< impl Iterator, usize, bool)>, - Error + Error, > { self.inner.chunk_store.get_chunk_iterator() } - pub fn create_fixed_writer>(&self, filename: P, size: usize, chunk_size: usize) -> Result { - - let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?; + pub fn create_fixed_writer>( + &self, + filename: P, + size: usize, + chunk_size: usize, + ) -> Result { + let index = FixedIndexWriter::create( + self.inner.chunk_store.clone(), + filename.as_ref(), + size, + chunk_size, + )?; Ok(index) } - pub fn open_fixed_reader>(&self, filename: P) -> Result { - - let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); + pub fn open_fixed_reader>( + &self, + filename: P, + ) -> Result { + let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); let index = FixedIndexReader::open(&full_path)?; @@ -224,18 +231,19 @@ impl DataStore { } pub fn create_dynamic_writer>( - &self, filename: P, + &self, + filename: P, ) -> Result { - - let index = DynamicIndexWriter::create( - self.inner.chunk_store.clone(), filename.as_ref())?; + let index = DynamicIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref())?; Ok(index) } - pub fn open_dynamic_reader>(&self, filename: P) -> Result { - - let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); + pub fn open_dynamic_reader>( + &self, + filename: P, + ) -> Result { + let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); let index = DynamicIndexReader::open(&full_path)?; @@ -247,12 +255,11 @@ impl DataStore { P: AsRef, { let filename = filename.as_ref(); - let out: Box = - match archive_type(filename)? { - ArchiveType::DynamicIndex => Box::new(self.open_dynamic_reader(filename)?), - ArchiveType::FixedIndex => Box::new(self.open_fixed_reader(filename)?), - _ => bail!("cannot open index file of unknown type: {:?}", filename), - }; + let out: Box = match archive_type(filename)? { + ArchiveType::DynamicIndex => Box::new(self.open_dynamic_reader(filename)?), + ArchiveType::FixedIndex => Box::new(self.open_fixed_reader(filename)?), + _ => bail!("cannot open index file of unknown type: {:?}", filename), + }; Ok(out) } @@ -260,23 +267,21 @@ impl DataStore { pub fn fast_index_verification( &self, index: &dyn IndexFile, - checked: &mut HashSet<[u8;32]>, + checked: &mut HashSet<[u8; 32]>, ) -> Result<(), Error> { - for pos in 0..index.index_count() { let info = index.chunk_info(pos).unwrap(); if checked.contains(&info.digest) { continue; } - self.stat_chunk(&info.digest). - map_err(|err| { - format_err!( - "fast_index_verification error, stat_chunk {} failed - {}", - hex::encode(&info.digest), - err, - ) - })?; + self.stat_chunk(&info.digest).map_err(|err| { + format_err!( + "fast_index_verification error, stat_chunk {} failed - {}", + hex::encode(&info.digest), + err, + ) + })?; checked.insert(info.digest); } @@ -295,25 +300,35 @@ impl DataStore { /// Cleanup a backup directory /// /// Removes all files not mentioned in the manifest. - pub fn cleanup_backup_dir(&self, backup_dir: &BackupDir, manifest: &BackupManifest - ) -> Result<(), Error> { - + pub fn cleanup_backup_dir( + &self, + backup_dir: &BackupDir, + manifest: &BackupManifest, + ) -> Result<(), Error> { let mut full_path = self.base_path(); full_path.push(backup_dir.relative_path()); let mut wanted_files = HashSet::new(); wanted_files.insert(MANIFEST_BLOB_NAME.to_string()); wanted_files.insert(CLIENT_LOG_BLOB_NAME.to_string()); - manifest.files().iter().for_each(|item| { wanted_files.insert(item.filename.clone()); }); + manifest.files().iter().for_each(|item| { + wanted_files.insert(item.filename.clone()); + }); for item in proxmox_sys::fs::read_subdir(libc::AT_FDCWD, &full_path)?.flatten() { if let Some(file_type) = item.file_type() { - if file_type != nix::dir::Type::File { continue; } + if file_type != nix::dir::Type::File { + continue; + } } let file_name = item.file_name().to_bytes(); - if file_name == b"." || file_name == b".." { continue; }; + if file_name == b"." || file_name == b".." { + continue; + }; if let Ok(name) = std::str::from_utf8(file_name) { - if wanted_files.contains(name) { continue; } + if wanted_files.contains(name) { + continue; + } } println!("remove unused file {:?}", item.file_name()); let dirfd = item.parent_fd(); @@ -339,11 +354,14 @@ impl DataStore { /// Remove a complete backup group including all snapshots, returns true /// if all snapshots were removed, and false if some were protected - pub fn remove_backup_group(&self, backup_group: &BackupGroup) -> Result { - + pub fn remove_backup_group(&self, backup_group: &BackupGroup) -> Result { let full_path = self.group_path(backup_group); - let _guard = proxmox_sys::fs::lock_dir_noblock(&full_path, "backup group", "possible running backup")?; + let _guard = proxmox_sys::fs::lock_dir_noblock( + &full_path, + "backup group", + "possible running backup", + )?; log::info!("removing backup group {:?}", full_path); @@ -360,22 +378,20 @@ impl DataStore { if removed_all { // no snapshots left, we can now safely remove the empty folder - std::fs::remove_dir_all(&full_path) - .map_err(|err| { - format_err!( - "removing backup group directory {:?} failed - {}", - full_path, - err, - ) - })?; + std::fs::remove_dir_all(&full_path).map_err(|err| { + format_err!( + "removing backup group directory {:?} failed - {}", + full_path, + err, + ) + })?; } Ok(removed_all) } /// Remove a backup directory including all content - pub fn remove_backup_dir(&self, backup_dir: &BackupDir, force: bool) -> Result<(), Error> { - + pub fn remove_backup_dir(&self, backup_dir: &BackupDir, force: bool) -> Result<(), Error> { let full_path = self.snapshot_path(backup_dir); let (_guard, _manifest_guard); @@ -389,14 +405,9 @@ impl DataStore { } log::info!("removing backup snapshot {:?}", full_path); - std::fs::remove_dir_all(&full_path) - .map_err(|err| { - format_err!( - "removing backup snapshot {:?} failed - {}", - full_path, - err, - ) - })?; + std::fs::remove_dir_all(&full_path).map_err(|err| { + format_err!("removing backup snapshot {:?} failed - {}", full_path, err,) + })?; // the manifest does not exists anymore, we do not need to keep the lock if let Ok(path) = self.manifest_lock_path(backup_dir) { @@ -460,7 +471,8 @@ impl DataStore { open_options.create_new(true); } - let mut file = open_options.open(&path) + let mut file = open_options + .open(&path) .map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?; writeln!(file, "{}", auth_id) @@ -490,13 +502,21 @@ impl DataStore { // create the last component now match std::fs::create_dir(&full_path) { Ok(_) => { - let guard = lock_dir_noblock(&full_path, "backup group", "another backup is already running")?; + let guard = lock_dir_noblock( + &full_path, + "backup group", + "another backup is already running", + )?; self.set_owner(backup_group, auth_id, false)?; let owner = self.get_owner(backup_group)?; // just to be sure Ok((owner, guard)) } Err(ref err) if err.kind() == io::ErrorKind::AlreadyExists => { - let guard = lock_dir_noblock(&full_path, "backup group", "another backup is already running")?; + let guard = lock_dir_noblock( + &full_path, + "backup group", + "another backup is already running", + )?; let owner = self.get_owner(backup_group)?; // just to be sure Ok((owner, guard)) } @@ -507,20 +527,28 @@ impl DataStore { /// Creates a new backup snapshot inside a BackupGroup /// /// The BackupGroup directory needs to exist. - pub fn create_locked_backup_dir(&self, backup_dir: &BackupDir) - -> Result<(PathBuf, bool, DirLockGuard), Error> - { + pub fn create_locked_backup_dir( + &self, + backup_dir: &BackupDir, + ) -> Result<(PathBuf, bool, DirLockGuard), Error> { let relative_path = backup_dir.relative_path(); let mut full_path = self.base_path(); full_path.push(&relative_path); - let lock = || - lock_dir_noblock(&full_path, "snapshot", "internal error - tried creating snapshot that's already in use"); + let lock = || { + lock_dir_noblock( + &full_path, + "snapshot", + "internal error - tried creating snapshot that's already in use", + ) + }; match std::fs::create_dir(&full_path) { Ok(_) => Ok((relative_path, true, lock()?)), - Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false, lock()?)), - Err(e) => Err(e.into()) + Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => { + Ok((relative_path, false, lock()?)) + } + Err(e) => Err(e.into()), } } @@ -535,7 +563,8 @@ impl DataStore { // make sure we skip .chunks (and other hidden files to keep it simple) fn is_hidden(entry: &walkdir::DirEntry) -> bool { - entry.file_name() + entry + .file_name() .to_str() .map(|s| s.starts_with('.')) .unwrap_or(false) @@ -550,7 +579,11 @@ impl DataStore { bail!("cannot continue garbage-collection safely, permission denied on: {:?}", path) } } else { - bail!("unexpected error on datastore traversal: {} - {:?}", inner, path) + bail!( + "unexpected error on datastore traversal: {} - {:?}", + inner, + path + ) } } else { bail!("unexpected error on datastore traversal: {}", inner) @@ -563,11 +596,13 @@ impl DataStore { Ok(entry) => entry.into_path(), Err(err) => { handle_entry_err(err)?; - continue - }, + continue; + } }; if let Ok(archive_type) = archive_type(&path) { - if archive_type == ArchiveType::FixedIndex || archive_type == ArchiveType::DynamicIndex { + if archive_type == ArchiveType::FixedIndex + || archive_type == ArchiveType::DynamicIndex + { list.push(path); } } @@ -584,7 +619,6 @@ impl DataStore { status: &mut GarbageCollectionStatus, worker: &dyn WorkerTaskContext, ) -> Result<(), Error> { - status.index_file_count += 1; status.index_data_bytes += index.index_bytes(); @@ -620,7 +654,6 @@ impl DataStore { status: &mut GarbageCollectionStatus, worker: &dyn WorkerTaskContext, ) -> Result<(), Error> { - let image_list = self.list_images()?; let image_count = image_list.len(); @@ -629,7 +662,6 @@ impl DataStore { let mut strange_paths_count: u64 = 0; for (i, img) in image_list.into_iter().enumerate() { - worker.check_abort()?; worker.fail_on_shutdown()?; @@ -683,7 +715,6 @@ impl DataStore { ); } - Ok(()) } @@ -695,17 +726,23 @@ impl DataStore { !matches!(self.inner.gc_mutex.try_lock(), Ok(_)) } - pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> { - + pub fn garbage_collection( + &self, + worker: &dyn WorkerTaskContext, + upid: &UPID, + ) -> Result<(), Error> { if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() { - // avoids that we run GC if an old daemon process has still a // running backup writer, which is not save as we have no "oldest // writer" information and thus no safe atime cutoff - let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?; + let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?; let phase1_start_time = proxmox_time::epoch_i64(); - let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time); + let oldest_writer = self + .inner + .chunk_store + .oldest_writer() + .unwrap_or(phase1_start_time); let mut gc_status = GarbageCollectionStatus::default(); gc_status.upid = Some(upid.to_string()); @@ -751,7 +788,8 @@ impl DataStore { ); if gc_status.index_data_bytes > 0 { - let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64; + let comp_per = + (gc_status.disk_bytes as f64 * 100.) / gc_status.index_data_bytes as f64; task_log!( worker, "On-Disk usage: {} ({:.2}%)", @@ -763,7 +801,7 @@ impl DataStore { task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks); let deduplication_factor = if gc_status.disk_bytes > 0 { - (gc_status.index_data_bytes as f64)/(gc_status.disk_bytes as f64) + (gc_status.index_data_bytes as f64) / (gc_status.disk_bytes as f64) } else { 1.0 }; @@ -771,7 +809,7 @@ impl DataStore { task_log!(worker, "Deduplication factor: {:.2}", deduplication_factor); if gc_status.disk_chunks > 0 { - let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64); + let avg_chunk = gc_status.disk_bytes / (gc_status.disk_chunks as u64); task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk)); } @@ -793,7 +831,6 @@ impl DataStore { } *self.inner.last_gc_status.lock().unwrap() = gc_status; - } else { bail!("Start GC failed - (already running/locked)"); } @@ -805,19 +842,21 @@ impl DataStore { self.inner.chunk_store.try_shared_lock() } - pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) { + pub fn chunk_path(&self, digest: &[u8; 32]) -> (PathBuf, String) { self.inner.chunk_store.chunk_path(digest) } - pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result { - self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist) + pub fn cond_touch_chunk( + &self, + digest: &[u8; 32], + fail_if_not_exist: bool, + ) -> Result { + self.inner + .chunk_store + .cond_touch_chunk(digest, fail_if_not_exist) } - pub fn insert_chunk( - &self, - chunk: &DataBlob, - digest: &[u8; 32], - ) -> Result<(bool, u64), Error> { + pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> { self.inner.chunk_store.insert_chunk(chunk, digest) } @@ -829,38 +868,37 @@ impl DataStore { proxmox_lang::try_block!({ let mut file = std::fs::File::open(&path)?; DataBlob::load_from_reader(&mut file) - }).map_err(|err| format_err!("unable to load blob '{:?}' - {}", path, err)) + }) + .map_err(|err| format_err!("unable to load blob '{:?}' - {}", path, err)) } - pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result { let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest); std::fs::metadata(chunk_path).map_err(Error::from) } pub fn load_chunk(&self, digest: &[u8; 32]) -> Result { - let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest); proxmox_lang::try_block!({ let mut file = std::fs::File::open(&chunk_path)?; DataBlob::load_from_reader(&mut file) - }).map_err(|err| format_err!( - "store '{}', unable to load chunk '{}' - {}", - self.name(), - digest_str, - err, - )) + }) + .map_err(|err| { + format_err!( + "store '{}', unable to load chunk '{}' - {}", + self.name(), + digest_str, + err, + ) + }) } /// Returns the filename to lock a manifest /// /// Also creates the basedir. The lockfile is located in /// '/run/proxmox-backup/locks/{datastore}/{type}/{id}/{timestamp}.index.json.lck' - fn manifest_lock_path( - &self, - backup_dir: &BackupDir, - ) -> Result { + fn manifest_lock_path(&self, backup_dir: &BackupDir) -> Result { let mut path = format!( "/run/proxmox-backup/locks/{}/{}/{}", self.name(), @@ -869,32 +907,27 @@ impl DataStore { ); std::fs::create_dir_all(&path)?; use std::fmt::Write; - write!(path, "/{}{}", backup_dir.backup_time_string(), &MANIFEST_LOCK_NAME)?; + write!( + path, + "/{}{}", + backup_dir.backup_time_string(), + &MANIFEST_LOCK_NAME + )?; Ok(path) } - fn lock_manifest( - &self, - backup_dir: &BackupDir, - ) -> Result { + fn lock_manifest(&self, backup_dir: &BackupDir) -> Result { let path = self.manifest_lock_path(backup_dir)?; // update_manifest should never take a long time, so if someone else has // the lock we can simply block a bit and should get it soon open_backup_lockfile(&path, Some(Duration::from_secs(5)), true) - .map_err(|err| { - format_err!( - "unable to acquire manifest lock {:?} - {}", &path, err - ) - }) + .map_err(|err| format_err!("unable to acquire manifest lock {:?} - {}", &path, err)) } /// Load the manifest without a lock. Must not be written back. - pub fn load_manifest( - &self, - backup_dir: &BackupDir, - ) -> Result<(BackupManifest, u64), Error> { + pub fn load_manifest(&self, backup_dir: &BackupDir) -> Result<(BackupManifest, u64), Error> { let blob = self.load_blob(backup_dir, MANIFEST_BLOB_NAME)?; let raw_size = blob.raw_size(); let manifest = BackupManifest::try_from(blob)?; @@ -908,7 +941,6 @@ impl DataStore { backup_dir: &BackupDir, update_fn: impl FnOnce(&mut BackupManifest), ) -> Result<(), Error> { - let _guard = self.lock_manifest(backup_dir)?; let (mut manifest, _) = self.load_manifest(backup_dir)?; @@ -930,11 +962,7 @@ impl DataStore { } /// Updates the protection status of the specified snapshot. - pub fn update_protection( - &self, - backup_dir: &BackupDir, - protection: bool - ) -> Result<(), Error> { + pub fn update_protection(&self, backup_dir: &BackupDir, protection: bool) -> Result<(), Error> { let full_path = self.snapshot_path(backup_dir); let _guard = lock_dir_noblock(&full_path, "snapshot", "possibly running or in use")?; diff --git a/pbs-datastore/src/dynamic_index.rs b/pbs-datastore/src/dynamic_index.rs index 7a1a39d3..9eb035d1 100644 --- a/pbs-datastore/src/dynamic_index.rs +++ b/pbs-datastore/src/dynamic_index.rs @@ -9,21 +9,21 @@ use std::task::Context; use anyhow::{bail, format_err, Error}; -use proxmox_sys::mmap::Mmap; use proxmox_io::ReadExt; -use proxmox_uuid::Uuid; +use proxmox_sys::mmap::Mmap; use proxmox_sys::process_locker::ProcessLockSharedGuard; +use proxmox_uuid::Uuid; use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; use pbs_tools::lru_cache::LruCache; -use crate::Chunker; use crate::chunk_stat::ChunkStat; use crate::chunk_store::ChunkStore; use crate::data_blob::{DataBlob, DataChunkBuilder}; use crate::file_formats; -use crate::index::{IndexFile, ChunkReadInfo}; +use crate::index::{ChunkReadInfo, IndexFile}; use crate::read_chunk::ReadChunk; +use crate::Chunker; /// Header format definition for dynamic index files (`.dixd`) #[repr(C)] @@ -228,7 +228,11 @@ impl IndexFile for DynamicIndexReader { if pos >= self.index.len() { return None; } - let start = if pos == 0 { 0 } else { self.index[pos - 1].end() }; + let start = if pos == 0 { + 0 + } else { + self.index[pos - 1].end() + }; let end = self.index[pos].end(); @@ -252,7 +256,7 @@ impl IndexFile for DynamicIndexReader { let found_idx = self.binary_search(0, 0, end_idx, end, offset); let found_idx = match found_idx { Ok(i) => i, - Err(_) => return None + Err(_) => return None, }; let found_start = if found_idx == 0 { @@ -581,13 +585,16 @@ impl BufferedDynamicReader { fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { //let (start, end, data) = self.lru_cache.access( - let cached_chunk = self.lru_cache.access( - idx, - &mut ChunkCacher { - store: &mut self.store, - index: &self.index, - }, - )?.ok_or_else(|| format_err!("chunk not found by cacher"))?; + let cached_chunk = self + .lru_cache + .access( + idx, + &mut ChunkCacher { + store: &mut self.store, + index: &self.index, + }, + )? + .ok_or_else(|| format_err!("chunk not found by cacher"))?; // fixme: avoid copy self.read_buffer.clear(); diff --git a/pbs-datastore/src/file_formats.rs b/pbs-datastore/src/file_formats.rs index a25263f2..7e08e530 100644 --- a/pbs-datastore/src/file_formats.rs +++ b/pbs-datastore/src/file_formats.rs @@ -38,7 +38,7 @@ pub const DYNAMIC_SIZED_CHUNK_INDEX_1_0: [u8; 8] = [28, 145, 78, 165, 25, 186, 1 /// This is basically the same format we use for chunks, but /// with other magic numbers so that we can distinguish them. #[derive(Endian)] -#[repr(C,packed)] +#[repr(C, packed)] pub struct DataBlobHeader { pub magic: [u8; 8], pub crc: [u8; 4], @@ -52,7 +52,7 @@ pub struct DataBlobHeader { /// /// (MAGIC || CRC32 || IV || TAG || EncryptedData). #[derive(Endian)] -#[repr(C,packed)] +#[repr(C, packed)] pub struct EncryptedDataBlobHeader { pub head: DataBlobHeader, pub iv: [u8; 16], diff --git a/pbs-datastore/src/fixed_index.rs b/pbs-datastore/src/fixed_index.rs index 21404eed..5fc3d2ab 100644 --- a/pbs-datastore/src/fixed_index.rs +++ b/pbs-datastore/src/fixed_index.rs @@ -1,14 +1,14 @@ use std::fs::File; use std::io::Write; +use std::io::{Seek, SeekFrom}; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::io::{Seek, SeekFrom}; use anyhow::{bail, format_err, Error}; -use proxmox_sys::process_locker::ProcessLockSharedGuard; use proxmox_io::ReadExt; +use proxmox_sys::process_locker::ProcessLockSharedGuard; use proxmox_uuid::Uuid; use crate::chunk_stat::ChunkStat; diff --git a/pbs-datastore/src/index.rs b/pbs-datastore/src/index.rs index 69788f80..b5d20ad9 100644 --- a/pbs-datastore/src/index.rs +++ b/pbs-datastore/src/index.rs @@ -45,13 +45,17 @@ pub trait IndexFile { let mut most_used = Vec::new(); for (digest, count) in map { - if count <= 1 { continue; } + if count <= 1 { + continue; + } match most_used.binary_search_by_key(&count, |&(_digest, count)| count) { Ok(p) => most_used.insert(p, (digest, count)), Err(p) => most_used.insert(p, (digest, count)), } - if most_used.len() > max { let _ = most_used.pop(); } + if most_used.len() > max { + let _ = most_used.pop(); + } } let mut map = HashMap::new(); diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs index 131040c6..0b10ae0e 100644 --- a/pbs-datastore/src/lib.rs +++ b/pbs-datastore/src/lib.rs @@ -146,7 +146,10 @@ pub const CATALOG_NAME: &str = "catalog.pcat1.didx"; /// Directory path where active operations counters are saved. -pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations"); +pub const ACTIVE_OPERATIONS_DIR: &str = concat!( + pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), + "/active-operations" +); #[macro_export] macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 { @@ -163,8 +166,8 @@ macro_rules! PROXMOX_BACKUP_READER_PROTOCOL_ID_V1 { } pub mod backup_info; -pub mod catalog; pub mod cached_chunk_reader; +pub mod catalog; pub mod checksum_reader; pub mod checksum_writer; pub mod chunk_stat; diff --git a/pbs-datastore/src/local_chunk_reader.rs b/pbs-datastore/src/local_chunk_reader.rs index 0f818c03..05a70c06 100644 --- a/pbs-datastore/src/local_chunk_reader.rs +++ b/pbs-datastore/src/local_chunk_reader.rs @@ -4,11 +4,11 @@ use std::sync::Arc; use anyhow::{bail, Error}; -use pbs_tools::crypt_config::CryptConfig; use pbs_api_types::CryptMode; +use pbs_tools::crypt_config::CryptConfig; use crate::data_blob::DataBlob; -use crate::read_chunk::{ReadChunk, AsyncReadChunk}; +use crate::read_chunk::{AsyncReadChunk, ReadChunk}; use crate::DataStore; #[derive(Clone)] @@ -19,7 +19,11 @@ pub struct LocalChunkReader { } impl LocalChunkReader { - pub fn new(store: Arc, crypt_config: Option>, crypt_mode: CryptMode) -> Self { + pub fn new( + store: Arc, + crypt_config: Option>, + crypt_mode: CryptMode, + ) -> Self { Self { store, crypt_config, @@ -29,17 +33,15 @@ impl LocalChunkReader { fn ensure_crypt_mode(&self, chunk_mode: CryptMode) -> Result<(), Error> { match self.crypt_mode { - CryptMode::Encrypt => { - match chunk_mode { - CryptMode::Encrypt => Ok(()), - CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."), + CryptMode::Encrypt => match chunk_mode { + CryptMode::Encrypt => Ok(()), + CryptMode::SignOnly | CryptMode::None => { + bail!("Index and chunk CryptMode don't match.") } }, - CryptMode::SignOnly | CryptMode::None => { - match chunk_mode { - CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."), - CryptMode::SignOnly | CryptMode::None => Ok(()), - } + CryptMode::SignOnly | CryptMode::None => match chunk_mode { + CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."), + CryptMode::SignOnly | CryptMode::None => Ok(()), }, } } @@ -66,7 +68,7 @@ impl AsyncReadChunk for LocalChunkReader { &'a self, digest: &'a [u8; 32], ) -> Pin> + Send + 'a>> { - Box::pin(async move{ + Box::pin(async move { let (path, _) = self.store.chunk_path(digest); let raw_data = tokio::fs::read(&path).await?; @@ -85,7 +87,8 @@ impl AsyncReadChunk for LocalChunkReader { Box::pin(async move { let chunk = AsyncReadChunk::read_raw_chunk(self, digest).await?; - let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; + let raw_data = + chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; // fixme: verify digest? diff --git a/pbs-datastore/src/manifest.rs b/pbs-datastore/src/manifest.rs index d55ce792..e05925e4 100644 --- a/pbs-datastore/src/manifest.rs +++ b/pbs-datastore/src/manifest.rs @@ -3,11 +3,11 @@ use std::path::Path; use anyhow::{bail, format_err, Error}; -use serde_json::{json, Value}; use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; -use pbs_tools::crypt_config::CryptConfig; use pbs_api_types::{CryptMode, Fingerprint}; +use pbs_tools::crypt_config::CryptConfig; use crate::BackupDir; @@ -16,15 +16,18 @@ pub const MANIFEST_LOCK_NAME: &str = ".index.json.lck"; pub const CLIENT_LOG_BLOB_NAME: &str = "client.log.blob"; pub const ENCRYPTED_KEY_BLOB_NAME: &str = "rsa-encrypted.key.blob"; - -fn crypt_mode_none() -> CryptMode { CryptMode::None } -fn empty_value() -> Value { json!({}) } +fn crypt_mode_none() -> CryptMode { + CryptMode::None +} +fn empty_value() -> Value { + json!({}) +} #[derive(Serialize, Deserialize)] -#[serde(rename_all="kebab-case")] +#[serde(rename_all = "kebab-case")] pub struct FileInfo { pub filename: String, - #[serde(default="crypt_mode_none")] // to be compatible with < 0.8.0 backups + #[serde(default = "crypt_mode_none")] // to be compatible with < 0.8.0 backups pub crypt_mode: CryptMode, pub size: u64, #[serde(with = "hex::serde")] @@ -32,12 +35,11 @@ pub struct FileInfo { } impl FileInfo { - /// Return expected CryptMode of referenced chunks /// /// Encrypted Indices should only reference encrypted chunks, while signed or plain indices /// should only reference plain chunks. - pub fn chunk_crypt_mode (&self) -> CryptMode { + pub fn chunk_crypt_mode(&self) -> CryptMode { match self.crypt_mode { CryptMode::Encrypt => CryptMode::Encrypt, CryptMode::SignOnly | CryptMode::None => CryptMode::None, @@ -46,13 +48,13 @@ impl FileInfo { } #[derive(Serialize, Deserialize)] -#[serde(rename_all="kebab-case")] +#[serde(rename_all = "kebab-case")] pub struct BackupManifest { backup_type: String, backup_id: String, backup_time: i64, files: Vec, - #[serde(default="empty_value")] // to be compatible with < 0.8.0 backups + #[serde(default = "empty_value")] // to be compatible with < 0.8.0 backups pub unprotected: Value, pub signature: Option, } @@ -78,14 +80,11 @@ impl ArchiveType { } //#[deprecated(note = "use ArchivType::from_path instead")] later... -pub fn archive_type>( - archive_name: P, -) -> Result { +pub fn archive_type>(archive_name: P) -> Result { ArchiveType::from_path(archive_name) } impl BackupManifest { - pub fn new(snapshot: BackupDir) -> Self { Self { backup_type: snapshot.group().backup_type().into(), @@ -97,9 +96,20 @@ impl BackupManifest { } } - pub fn add_file(&mut self, filename: String, size: u64, csum: [u8; 32], crypt_mode: CryptMode) -> Result<(), Error> { + pub fn add_file( + &mut self, + filename: String, + size: u64, + csum: [u8; 32], + crypt_mode: CryptMode, + ) -> Result<(), Error> { let _archive_type = ArchiveType::from_path(&filename)?; // check type - self.files.push(FileInfo { filename, size, csum, crypt_mode }); + self.files.push(FileInfo { + filename, + size, + csum, + crypt_mode, + }); Ok(()) } @@ -108,7 +118,6 @@ impl BackupManifest { } pub fn lookup_file_info(&self, name: &str) -> Result<&FileInfo, Error> { - let info = self.files.iter().find(|item| item.filename == name); match info { @@ -118,7 +127,6 @@ impl BackupManifest { } pub fn verify_file(&self, name: &str, csum: &[u8; 32], size: u64) -> Result<(), Error> { - let info = self.lookup_file_info(name)?; if size != info.size { @@ -146,7 +154,6 @@ impl BackupManifest { } fn json_signature(data: &Value, crypt_config: &CryptConfig) -> Result<[u8; 32], Error> { - let mut signed_data = data.clone(); signed_data.as_object_mut().unwrap().remove("unprotected"); // exclude @@ -161,7 +168,6 @@ impl BackupManifest { /// Converts the Manifest into json string, and add a signature if there is a crypt_config. pub fn to_string(&self, crypt_config: Option<&CryptConfig>) -> Result { - let mut manifest = serde_json::to_value(&self)?; if let Some(crypt_config) = crypt_config { @@ -178,7 +184,7 @@ impl BackupManifest { pub fn fingerprint(&self) -> Result, Error> { match &self.unprotected["key-fingerprint"] { Value::Null => Ok(None), - value => Ok(Some(Deserialize::deserialize(value)?)) + value => Ok(Some(Deserialize::deserialize(value)?)), } } @@ -210,7 +216,10 @@ impl BackupManifest { } /// Try to read the manifest. This verifies the signature if there is a crypt_config. - pub fn from_data(data: &[u8], crypt_config: Option<&CryptConfig>) -> Result { + pub fn from_data( + data: &[u8], + crypt_config: Option<&CryptConfig>, + ) -> Result { let json: Value = serde_json::from_slice(data)?; let signature = json["signature"].as_str().map(String::from); @@ -243,13 +252,13 @@ impl BackupManifest { } } - impl TryFrom for BackupManifest { type Error = Error; fn try_from(blob: super::DataBlob) -> Result { // no expected digest available - let data = blob.decode(None, None) + let data = blob + .decode(None, None) .map_err(|err| format_err!("decode backup manifest blob failed - {}", err))?; let json: Value = serde_json::from_slice(&data[..]) .map_err(|err| format_err!("unable to parse backup manifest json - {}", err))?; @@ -258,10 +267,8 @@ impl TryFrom for BackupManifest { } } - #[test] fn test_manifest_signature() -> Result<(), Error> { - use pbs_config::key_config::KeyDerivationConfig; let pw = b"test"; @@ -291,7 +298,10 @@ fn test_manifest_signature() -> Result<(), Error> { let manifest: Value = serde_json::from_str(&text)?; let signature = manifest["signature"].as_str().unwrap().to_string(); - assert_eq!(signature, "d7b446fb7db081662081d4b40fedd858a1d6307a5aff4ecff7d5bf4fd35679e9"); + assert_eq!( + signature, + "d7b446fb7db081662081d4b40fedd858a1d6307a5aff4ecff7d5bf4fd35679e9" + ); let manifest: BackupManifest = serde_json::from_value(manifest)?; let expected_signature = hex::encode(&manifest.signature(&crypt_config)?); diff --git a/pbs-datastore/src/prune.rs b/pbs-datastore/src/prune.rs index 09448f6f..87e667a9 100644 --- a/pbs-datastore/src/prune.rs +++ b/pbs-datastore/src/prune.rs @@ -1,14 +1,19 @@ use std::collections::{HashMap, HashSet}; use std::path::PathBuf; -use anyhow::{Error}; +use anyhow::Error; use pbs_api_types::PruneOptions; use super::BackupInfo; #[derive(Clone, Copy, PartialEq, Eq)] -pub enum PruneMark { Protected, Keep, KeepPartial, Remove } +pub enum PruneMark { + Protected, + Keep, + KeepPartial, + Remove, +} impl PruneMark { pub fn keep(self) -> bool { @@ -31,13 +36,12 @@ impl std::fmt::Display for PruneMark { } } -fn mark_selections Result> ( +fn mark_selections Result>( mark: &mut HashMap, list: &[BackupInfo], keep: usize, select_id: F, ) -> Result<(), Error> { - let mut include_hash = HashSet::new(); let mut already_included = HashSet::new(); @@ -51,17 +55,23 @@ fn mark_selections Result> ( for info in list { let backup_id = info.backup_dir.relative_path(); - if mark.get(&backup_id).is_some() { continue; } + if mark.get(&backup_id).is_some() { + continue; + } if info.protected { mark.insert(backup_id, PruneMark::Protected); continue; } let sel_id: String = select_id(info)?; - if already_included.contains(&sel_id) { continue; } + if already_included.contains(&sel_id) { + continue; + } if !include_hash.contains(&sel_id) { - if include_hash.len() >= keep { break; } + if include_hash.len() >= keep { + break; + } include_hash.insert(sel_id); mark.insert(backup_id, PruneMark::Keep); } else { @@ -72,11 +82,7 @@ fn mark_selections Result> ( Ok(()) } -fn remove_incomplete_snapshots( - mark: &mut HashMap, - list: &[BackupInfo], -) { - +fn remove_incomplete_snapshots(mark: &mut HashMap, list: &[BackupInfo]) { let mut keep_unfinished = true; for info in list.iter() { // backup is considered unfinished if there is no manifest @@ -86,7 +92,8 @@ fn remove_incomplete_snapshots( keep_unfinished = false; } else { let backup_id = info.backup_dir.relative_path(); - if keep_unfinished { // keep first unfinished + if keep_unfinished { + // keep first unfinished mark.insert(backup_id, PruneMark::KeepPartial); } else { mark.insert(backup_id, PruneMark::Remove); @@ -98,12 +105,36 @@ fn remove_incomplete_snapshots( pub fn keeps_something(options: &PruneOptions) -> bool { let mut keep_something = false; - if let Some(count) = options.keep_last { if count > 0 { keep_something = true; } } - if let Some(count) = options.keep_hourly { if count > 0 { keep_something = true; } } - if let Some(count) = options.keep_daily { if count > 0 { keep_something = true; } } - if let Some(count) = options.keep_weekly { if count > 0 { keep_something = true; } } - if let Some(count) = options.keep_monthly { if count > 0 { keep_something = true; } } - if let Some(count) = options.keep_yearly { if count > 0 { keep_something = true; } } + if let Some(count) = options.keep_last { + if count > 0 { + keep_something = true; + } + } + if let Some(count) = options.keep_hourly { + if count > 0 { + keep_something = true; + } + } + if let Some(count) = options.keep_daily { + if count > 0 { + keep_something = true; + } + } + if let Some(count) = options.keep_weekly { + if count > 0 { + keep_something = true; + } + } + if let Some(count) = options.keep_monthly { + if count > 0 { + keep_something = true; + } + } + if let Some(count) = options.keep_yearly { + if count > 0 { + keep_something = true; + } + } keep_something } @@ -148,7 +179,6 @@ pub fn compute_prune_info( mut list: Vec, options: &PruneOptions, ) -> Result, Error> { - let mut mark = HashMap::new(); BackupInfo::sort_list(&mut list, false); @@ -195,7 +225,8 @@ pub fn compute_prune_info( })?; } - let prune_info: Vec<(BackupInfo, PruneMark)> = list.into_iter() + let prune_info: Vec<(BackupInfo, PruneMark)> = list + .into_iter() .map(|info| { let backup_id = info.backup_dir.relative_path(); let mark = if info.protected { diff --git a/pbs-datastore/src/snapshot_reader.rs b/pbs-datastore/src/snapshot_reader.rs index d7df3f23..0e262481 100644 --- a/pbs-datastore/src/snapshot_reader.rs +++ b/pbs-datastore/src/snapshot_reader.rs @@ -1,7 +1,7 @@ +use std::fs::File; +use std::os::unix::io::{AsRawFd, FromRawFd}; use std::path::Path; use std::sync::Arc; -use std::os::unix::io::{AsRawFd, FromRawFd}; -use std::fs::File; use anyhow::{bail, Error}; use nix::dir::Dir; @@ -9,9 +9,9 @@ use nix::dir::Dir; use proxmox_sys::fs::lock_dir_noblock_shared; use crate::backup_info::BackupDir; -use crate::index::IndexFile; -use crate::fixed_index::FixedIndexReader; use crate::dynamic_index::DynamicIndexReader; +use crate::fixed_index::FixedIndexReader; +use crate::index::IndexFile; use crate::manifest::{archive_type, ArchiveType, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME}; use crate::DataStore; use pbs_api_types::Operation; @@ -27,24 +27,24 @@ pub struct SnapshotReader { } impl SnapshotReader { - /// Lock snapshot, reads the manifest and returns a new instance pub fn new(datastore: Arc, snapshot: BackupDir) -> Result { - let snapshot_path = datastore.snapshot_path(&snapshot); - let locked_dir = lock_dir_noblock_shared( - &snapshot_path, - "snapshot", - "locked by another operation")?; + let locked_dir = + lock_dir_noblock_shared(&snapshot_path, "snapshot", "locked by another operation")?; let datastore_name = datastore.name().to_string(); let manifest = match datastore.load_manifest(&snapshot) { Ok((manifest, _)) => manifest, Err(err) => { - bail!("manifest load error on datastore '{}' snapshot '{}' - {}", - datastore_name, snapshot, err); + bail!( + "manifest load error on datastore '{}' snapshot '{}' - {}", + datastore_name, + snapshot, + err + ); } }; @@ -53,12 +53,19 @@ impl SnapshotReader { let mut file_list = Vec::new(); file_list.push(MANIFEST_BLOB_NAME.to_string()); - for item in manifest.files() { file_list.push(item.filename.clone()); } + for item in manifest.files() { + file_list.push(item.filename.clone()); + } if client_log_path.exists() { file_list.push(CLIENT_LOG_BLOB_NAME.to_string()); } - Ok(Self { snapshot, datastore_name, file_list, locked_dir }) + Ok(Self { + snapshot, + datastore_name, + file_list, + locked_dir, + }) } /// Return the snapshot directory @@ -89,7 +96,10 @@ impl SnapshotReader { } /// Returns an iterator for all chunks not skipped by `skip_fn`. - pub fn chunk_iterator bool>(&self, skip_fn: F) -> Result, Error> { + pub fn chunk_iterator bool>( + &self, + skip_fn: F, + ) -> Result, Error> { SnapshotChunkIterator::new(self, skip_fn) } } @@ -99,14 +109,14 @@ impl SnapshotReader { /// Note: The iterator returns a `Result`, and the iterator state is /// undefined after the first error. So it make no sense to continue /// iteration after the first error. -pub struct SnapshotChunkIterator<'a, F: Fn(&[u8;32]) -> bool> { +pub struct SnapshotChunkIterator<'a, F: Fn(&[u8; 32]) -> bool> { snapshot_reader: &'a SnapshotReader, todo_list: Vec, skip_fn: F, current_index: Option<(Arc>, usize, Vec<(usize, u64)>)>, } -impl <'a, F: Fn(&[u8;32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> { +impl<'a, F: Fn(&[u8; 32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> { type Item = Result<[u8; 32], Error>; fn next(&mut self) -> Option { @@ -118,15 +128,17 @@ impl <'a, F: Fn(&[u8;32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> { let index: Box = match archive_type(&filename)? { ArchiveType::FixedIndex => Box::new(FixedIndexReader::new(file)?), ArchiveType::DynamicIndex => Box::new(DynamicIndexReader::new(file)?), - _ => bail!("SnapshotChunkIterator: got unknown file type - internal error"), + _ => bail!( + "SnapshotChunkIterator: got unknown file type - internal error" + ), }; - let datastore = - DataStore::lookup_datastore( - self.snapshot_reader.datastore_name(), - Some(Operation::Read) - )?; - let order = datastore.get_chunks_in_order(&index, &self.skip_fn, |_| Ok(()))?; + let datastore = DataStore::lookup_datastore( + self.snapshot_reader.datastore_name(), + Some(Operation::Read), + )?; + let order = + datastore.get_chunks_in_order(&index, &self.skip_fn, |_| Ok(()))?; self.current_index = Some((Arc::new(index), 0, order)); } else { @@ -143,25 +155,29 @@ impl <'a, F: Fn(&[u8;32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> { // pop next index } } - }).transpose() + }) + .transpose() } } -impl <'a, F: Fn(&[u8;32]) -> bool> SnapshotChunkIterator<'a, F> { - +impl<'a, F: Fn(&[u8; 32]) -> bool> SnapshotChunkIterator<'a, F> { pub fn new(snapshot_reader: &'a SnapshotReader, skip_fn: F) -> Result { - let mut todo_list = Vec::new(); for filename in snapshot_reader.file_list() { match archive_type(filename)? { ArchiveType::FixedIndex | ArchiveType::DynamicIndex => { todo_list.push(filename.to_owned()); - }, - ArchiveType::Blob => { /* no chunks, do nothing */ }, + } + ArchiveType::Blob => { /* no chunks, do nothing */ } } } - Ok(Self { snapshot_reader, todo_list, current_index: None, skip_fn }) + Ok(Self { + snapshot_reader, + todo_list, + current_index: None, + skip_fn, + }) } } diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs index 754727fc..a32bb9a9 100644 --- a/pbs-datastore/src/store_progress.rs +++ b/pbs-datastore/src/store_progress.rs @@ -15,7 +15,7 @@ impl StoreProgress { pub fn new(total_groups: u64) -> Self { StoreProgress { total_groups, - .. Default::default() + ..Default::default() } }