datastore: rustfmt whole package

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2022-04-14 13:27:53 +02:00
parent fb3c007f8a
commit 42c2b5bec9
21 changed files with 843 additions and 588 deletions

View File

@ -5,13 +5,8 @@ use std::str::FromStr;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use pbs_api_types::{ use pbs_api_types::{
BACKUP_ID_REGEX, GroupFilter, BACKUP_DATE_REGEX, BACKUP_FILE_REGEX, BACKUP_ID_REGEX, BACKUP_TYPE_REGEX,
BACKUP_TYPE_REGEX, GROUP_PATH_REGEX, SNAPSHOT_PATH_REGEX,
BACKUP_DATE_REGEX,
GROUP_PATH_REGEX,
SNAPSHOT_PATH_REGEX,
BACKUP_FILE_REGEX,
GroupFilter,
}; };
use super::manifest::MANIFEST_BLOB_NAME; use super::manifest::MANIFEST_BLOB_NAME;
@ -96,7 +91,11 @@ impl BackupGroup {
let protected = backup_dir.is_protected(base_path.to_owned()); let protected = backup_dir.is_protected(base_path.to_owned());
list.push(BackupInfo { backup_dir, files, protected }); list.push(BackupInfo {
backup_dir,
files,
protected,
});
Ok(()) Ok(())
}, },
@ -331,7 +330,11 @@ impl BackupInfo {
let files = list_backup_files(libc::AT_FDCWD, &path)?; let files = list_backup_files(libc::AT_FDCWD, &path)?;
let protected = backup_dir.is_protected(base_path.to_owned()); let protected = backup_dir.is_protected(base_path.to_owned());
Ok(BackupInfo { backup_dir, files, protected }) Ok(BackupInfo {
backup_dir,
files,
protected,
})
} }
/// Finds the latest backup inside a backup group /// Finds the latest backup inside a backup group
@ -399,9 +402,7 @@ impl BackupInfo {
pub fn is_finished(&self) -> bool { pub fn is_finished(&self) -> bool {
// backup is considered unfinished if there is no manifest // backup is considered unfinished if there is no manifest
self.files self.files.iter().any(|name| name == MANIFEST_BLOB_NAME)
.iter()
.any(|name| name == MANIFEST_BLOB_NAME)
} }
} }

View File

@ -10,8 +10,8 @@ use anyhow::Error;
use futures::ready; use futures::ready;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use proxmox_lang::io_format_err;
use proxmox_lang::error::io_err_other; use proxmox_lang::error::io_err_other;
use proxmox_lang::io_format_err;
use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache};

View File

@ -1,7 +1,7 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use std::ffi::{CStr, CString, OsStr}; use std::ffi::{CStr, CString, OsStr};
use std::fmt; use std::fmt;
use std::io::{Read, Write, Seek, SeekFrom}; use std::io::{Read, Seek, SeekFrom, Write};
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
@ -31,7 +31,7 @@ pub trait BackupCatalogWriter {
} }
#[repr(u8)] #[repr(u8)]
#[derive(Copy,Clone,PartialEq)] #[derive(Copy, Clone, PartialEq)]
pub enum CatalogEntryType { pub enum CatalogEntryType {
Directory = b'd', Directory = b'd',
File = b'f', File = b'f',
@ -44,7 +44,7 @@ pub enum CatalogEntryType {
} }
impl TryFrom<u8> for CatalogEntryType { impl TryFrom<u8> for CatalogEntryType {
type Error=Error; type Error = Error;
fn try_from(value: u8) -> Result<Self, Error> { fn try_from(value: u8) -> Result<Self, Error> {
Ok(match value { Ok(match value {
@ -106,51 +106,55 @@ pub enum DirEntryAttribute {
} }
impl DirEntry { impl DirEntry {
fn new(etype: CatalogEntryType, name: Vec<u8>, start: u64, size: u64, mtime: i64) -> Self { fn new(etype: CatalogEntryType, name: Vec<u8>, start: u64, size: u64, mtime: i64) -> Self {
match etype { match etype {
CatalogEntryType::Directory => { CatalogEntryType::Directory => DirEntry {
DirEntry { name, attr: DirEntryAttribute::Directory { start } } name,
} attr: DirEntryAttribute::Directory { start },
CatalogEntryType::File => { },
DirEntry { name, attr: DirEntryAttribute::File { size, mtime } } CatalogEntryType::File => DirEntry {
} name,
CatalogEntryType::Symlink => { attr: DirEntryAttribute::File { size, mtime },
DirEntry { name, attr: DirEntryAttribute::Symlink } },
} CatalogEntryType::Symlink => DirEntry {
CatalogEntryType::Hardlink => { name,
DirEntry { name, attr: DirEntryAttribute::Hardlink } attr: DirEntryAttribute::Symlink,
} },
CatalogEntryType::BlockDevice => { CatalogEntryType::Hardlink => DirEntry {
DirEntry { name, attr: DirEntryAttribute::BlockDevice } name,
} attr: DirEntryAttribute::Hardlink,
CatalogEntryType::CharDevice => { },
DirEntry { name, attr: DirEntryAttribute::CharDevice } CatalogEntryType::BlockDevice => DirEntry {
} name,
CatalogEntryType::Fifo => { attr: DirEntryAttribute::BlockDevice,
DirEntry { name, attr: DirEntryAttribute::Fifo } },
} CatalogEntryType::CharDevice => DirEntry {
CatalogEntryType::Socket => { name,
DirEntry { name, attr: DirEntryAttribute::Socket } attr: DirEntryAttribute::CharDevice,
} },
CatalogEntryType::Fifo => DirEntry {
name,
attr: DirEntryAttribute::Fifo,
},
CatalogEntryType::Socket => DirEntry {
name,
attr: DirEntryAttribute::Socket,
},
} }
} }
/// Get file mode bits for this entry to be used with the `MatchList` api. /// Get file mode bits for this entry to be used with the `MatchList` api.
pub fn get_file_mode(&self) -> Option<u32> { pub fn get_file_mode(&self) -> Option<u32> {
Some( Some(match self.attr {
match self.attr { DirEntryAttribute::Directory { .. } => pxar::mode::IFDIR,
DirEntryAttribute::Directory { .. } => pxar::mode::IFDIR, DirEntryAttribute::File { .. } => pxar::mode::IFREG,
DirEntryAttribute::File { .. } => pxar::mode::IFREG, DirEntryAttribute::Symlink => pxar::mode::IFLNK,
DirEntryAttribute::Symlink => pxar::mode::IFLNK, DirEntryAttribute::Hardlink => return None,
DirEntryAttribute::Hardlink => return None, DirEntryAttribute::BlockDevice => pxar::mode::IFBLK,
DirEntryAttribute::BlockDevice => pxar::mode::IFBLK, DirEntryAttribute::CharDevice => pxar::mode::IFCHR,
DirEntryAttribute::CharDevice => pxar::mode::IFCHR, DirEntryAttribute::Fifo => pxar::mode::IFIFO,
DirEntryAttribute::Fifo => pxar::mode::IFIFO, DirEntryAttribute::Socket => pxar::mode::IFSOCK,
DirEntryAttribute::Socket => pxar::mode::IFSOCK, } as u32)
}
as u32
)
} }
/// Check if DirEntry is a directory /// Check if DirEntry is a directory
@ -170,60 +174,82 @@ struct DirInfo {
} }
impl DirInfo { impl DirInfo {
fn new(name: CString) -> Self { fn new(name: CString) -> Self {
DirInfo { name, entries: Vec::new() } DirInfo {
name,
entries: Vec::new(),
}
} }
fn new_rootdir() -> Self { fn new_rootdir() -> Self {
DirInfo::new(CString::new(b"/".to_vec()).unwrap()) DirInfo::new(CString::new(b"/".to_vec()).unwrap())
} }
fn encode_entry<W: Write>( fn encode_entry<W: Write>(writer: &mut W, entry: &DirEntry, pos: u64) -> Result<(), Error> {
writer: &mut W,
entry: &DirEntry,
pos: u64,
) -> Result<(), Error> {
match entry { match entry {
DirEntry { name, attr: DirEntryAttribute::Directory { start } } => { DirEntry {
name,
attr: DirEntryAttribute::Directory { start },
} => {
writer.write_all(&[CatalogEntryType::Directory as u8])?; writer.write_all(&[CatalogEntryType::Directory as u8])?;
catalog_encode_u64(writer, name.len() as u64)?; catalog_encode_u64(writer, name.len() as u64)?;
writer.write_all(name)?; writer.write_all(name)?;
catalog_encode_u64(writer, pos - start)?; catalog_encode_u64(writer, pos - start)?;
} }
DirEntry { name, attr: DirEntryAttribute::File { size, mtime } } => { DirEntry {
name,
attr: DirEntryAttribute::File { size, mtime },
} => {
writer.write_all(&[CatalogEntryType::File as u8])?; writer.write_all(&[CatalogEntryType::File as u8])?;
catalog_encode_u64(writer, name.len() as u64)?; catalog_encode_u64(writer, name.len() as u64)?;
writer.write_all(name)?; writer.write_all(name)?;
catalog_encode_u64(writer, *size)?; catalog_encode_u64(writer, *size)?;
catalog_encode_i64(writer, *mtime)?; catalog_encode_i64(writer, *mtime)?;
} }
DirEntry { name, attr: DirEntryAttribute::Symlink } => { DirEntry {
name,
attr: DirEntryAttribute::Symlink,
} => {
writer.write_all(&[CatalogEntryType::Symlink as u8])?; writer.write_all(&[CatalogEntryType::Symlink as u8])?;
catalog_encode_u64(writer, name.len() as u64)?; catalog_encode_u64(writer, name.len() as u64)?;
writer.write_all(name)?; writer.write_all(name)?;
} }
DirEntry { name, attr: DirEntryAttribute::Hardlink } => { DirEntry {
name,
attr: DirEntryAttribute::Hardlink,
} => {
writer.write_all(&[CatalogEntryType::Hardlink as u8])?; writer.write_all(&[CatalogEntryType::Hardlink as u8])?;
catalog_encode_u64(writer, name.len() as u64)?; catalog_encode_u64(writer, name.len() as u64)?;
writer.write_all(name)?; writer.write_all(name)?;
} }
DirEntry { name, attr: DirEntryAttribute::BlockDevice } => { DirEntry {
name,
attr: DirEntryAttribute::BlockDevice,
} => {
writer.write_all(&[CatalogEntryType::BlockDevice as u8])?; writer.write_all(&[CatalogEntryType::BlockDevice as u8])?;
catalog_encode_u64(writer, name.len() as u64)?; catalog_encode_u64(writer, name.len() as u64)?;
writer.write_all(name)?; writer.write_all(name)?;
} }
DirEntry { name, attr: DirEntryAttribute::CharDevice } => { DirEntry {
name,
attr: DirEntryAttribute::CharDevice,
} => {
writer.write_all(&[CatalogEntryType::CharDevice as u8])?; writer.write_all(&[CatalogEntryType::CharDevice as u8])?;
catalog_encode_u64(writer, name.len() as u64)?; catalog_encode_u64(writer, name.len() as u64)?;
writer.write_all(name)?; writer.write_all(name)?;
} }
DirEntry { name, attr: DirEntryAttribute::Fifo } => { DirEntry {
name,
attr: DirEntryAttribute::Fifo,
} => {
writer.write_all(&[CatalogEntryType::Fifo as u8])?; writer.write_all(&[CatalogEntryType::Fifo as u8])?;
catalog_encode_u64(writer, name.len() as u64)?; catalog_encode_u64(writer, name.len() as u64)?;
writer.write_all(name)?; writer.write_all(name)?;
} }
DirEntry { name, attr: DirEntryAttribute::Socket } => { DirEntry {
name,
attr: DirEntryAttribute::Socket,
} => {
writer.write_all(&[CatalogEntryType::Socket as u8])?; writer.write_all(&[CatalogEntryType::Socket as u8])?;
catalog_encode_u64(writer, name.len() as u64)?; catalog_encode_u64(writer, name.len() as u64)?;
writer.write_all(name)?; writer.write_all(name)?;
@ -250,7 +276,6 @@ impl DirInfo {
data: &[u8], data: &[u8],
mut callback: C, mut callback: C,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut cursor = data; let mut cursor = data;
let entries = catalog_decode_u64(&mut cursor)?; let entries = catalog_decode_u64(&mut cursor)?;
@ -258,14 +283,17 @@ impl DirInfo {
let mut name_buf = vec![0u8; 4096]; let mut name_buf = vec![0u8; 4096];
for _ in 0..entries { for _ in 0..entries {
let mut buf = [0u8];
let mut buf = [ 0u8 ];
cursor.read_exact(&mut buf)?; cursor.read_exact(&mut buf)?;
let etype = CatalogEntryType::try_from(buf[0])?; let etype = CatalogEntryType::try_from(buf[0])?;
let name_len = catalog_decode_u64(&mut cursor)? as usize; let name_len = catalog_decode_u64(&mut cursor)? as usize;
if name_len >= name_buf.len() { if name_len >= name_buf.len() {
bail!("directory entry name too long ({} >= {})", name_len, name_buf.len()); bail!(
"directory entry name too long ({} >= {})",
name_len,
name_buf.len()
);
} }
let name = &mut name_buf[0..name_len]; let name = &mut name_buf[0..name_len];
cursor.read_exact(name)?; cursor.read_exact(name)?;
@ -280,9 +308,7 @@ impl DirInfo {
let mtime = catalog_decode_i64(&mut cursor)?; let mtime = catalog_decode_i64(&mut cursor)?;
callback(etype, name, 0, size, mtime)? callback(etype, name, 0, size, mtime)?
} }
_ => { _ => callback(etype, name, 0, 0, 0)?,
callback(etype, name, 0, 0, 0)?
}
}; };
if !cont { if !cont {
return Ok(()); return Ok(());
@ -309,11 +335,14 @@ pub struct CatalogWriter<W> {
pos: u64, pos: u64,
} }
impl <W: Write> CatalogWriter<W> { impl<W: Write> CatalogWriter<W> {
/// Create a new CatalogWriter instance /// Create a new CatalogWriter instance
pub fn new(writer: W) -> Result<Self, Error> { pub fn new(writer: W) -> Result<Self, Error> {
let mut me = Self { writer, dirstack: vec![ DirInfo::new_rootdir() ], pos: 0 }; let mut me = Self {
writer,
dirstack: vec![DirInfo::new_rootdir()],
pos: 0,
};
me.write_all(&PROXMOX_CATALOG_FILE_MAGIC_1_0)?; me.write_all(&PROXMOX_CATALOG_FILE_MAGIC_1_0)?;
Ok(me) Ok(me)
} }
@ -346,8 +375,7 @@ impl <W: Write> CatalogWriter<W> {
} }
} }
impl <W: Write> BackupCatalogWriter for CatalogWriter<W> { impl<W: Write> BackupCatalogWriter for CatalogWriter<W> {
fn start_directory(&mut self, name: &CStr) -> Result<(), Error> { fn start_directory(&mut self, name: &CStr) -> Result<(), Error> {
let new = DirInfo::new(name.to_owned()); let new = DirInfo::new(name.to_owned());
self.dirstack.push(new); self.dirstack.push(new);
@ -367,59 +395,107 @@ impl <W: Write> BackupCatalogWriter for CatalogWriter<W> {
} }
}; };
let current = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; let current = self
.dirstack
.last_mut()
.ok_or_else(|| format_err!("outside root"))?;
let name = name.to_bytes().to_vec(); let name = name.to_bytes().to_vec();
current.entries.push(DirEntry { name, attr: DirEntryAttribute::Directory { start } }); current.entries.push(DirEntry {
name,
attr: DirEntryAttribute::Directory { start },
});
Ok(()) Ok(())
} }
fn add_file(&mut self, name: &CStr, size: u64, mtime: i64) -> Result<(), Error> { fn add_file(&mut self, name: &CStr, size: u64, mtime: i64) -> Result<(), Error> {
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; let dir = self
.dirstack
.last_mut()
.ok_or_else(|| format_err!("outside root"))?;
let name = name.to_bytes().to_vec(); let name = name.to_bytes().to_vec();
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::File { size, mtime } }); dir.entries.push(DirEntry {
name,
attr: DirEntryAttribute::File { size, mtime },
});
Ok(()) Ok(())
} }
fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> { fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> {
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; let dir = self
.dirstack
.last_mut()
.ok_or_else(|| format_err!("outside root"))?;
let name = name.to_bytes().to_vec(); let name = name.to_bytes().to_vec();
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Symlink }); dir.entries.push(DirEntry {
name,
attr: DirEntryAttribute::Symlink,
});
Ok(()) Ok(())
} }
fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> { fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> {
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; let dir = self
.dirstack
.last_mut()
.ok_or_else(|| format_err!("outside root"))?;
let name = name.to_bytes().to_vec(); let name = name.to_bytes().to_vec();
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Hardlink }); dir.entries.push(DirEntry {
name,
attr: DirEntryAttribute::Hardlink,
});
Ok(()) Ok(())
} }
fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> { fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> {
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; let dir = self
.dirstack
.last_mut()
.ok_or_else(|| format_err!("outside root"))?;
let name = name.to_bytes().to_vec(); let name = name.to_bytes().to_vec();
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::BlockDevice }); dir.entries.push(DirEntry {
name,
attr: DirEntryAttribute::BlockDevice,
});
Ok(()) Ok(())
} }
fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> { fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> {
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; let dir = self
.dirstack
.last_mut()
.ok_or_else(|| format_err!("outside root"))?;
let name = name.to_bytes().to_vec(); let name = name.to_bytes().to_vec();
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::CharDevice }); dir.entries.push(DirEntry {
name,
attr: DirEntryAttribute::CharDevice,
});
Ok(()) Ok(())
} }
fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> { fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> {
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; let dir = self
.dirstack
.last_mut()
.ok_or_else(|| format_err!("outside root"))?;
let name = name.to_bytes().to_vec(); let name = name.to_bytes().to_vec();
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Fifo }); dir.entries.push(DirEntry {
name,
attr: DirEntryAttribute::Fifo,
});
Ok(()) Ok(())
} }
fn add_socket(&mut self, name: &CStr) -> Result<(), Error> { fn add_socket(&mut self, name: &CStr) -> Result<(), Error> {
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; let dir = self
.dirstack
.last_mut()
.ok_or_else(|| format_err!("outside root"))?;
let name = name.to_bytes().to_vec(); let name = name.to_bytes().to_vec();
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Socket }); dir.entries.push(DirEntry {
name,
attr: DirEntryAttribute::Socket,
});
Ok(()) Ok(())
} }
} }
@ -429,8 +505,7 @@ pub struct CatalogReader<R> {
reader: R, reader: R,
} }
impl <R: Read + Seek> CatalogReader<R> { impl<R: Read + Seek> CatalogReader<R> {
/// Create a new CatalogReader instance /// Create a new CatalogReader instance
pub fn new(reader: R) -> Self { pub fn new(reader: R) -> Self {
Self { reader } Self { reader }
@ -438,36 +513,35 @@ impl <R: Read + Seek> CatalogReader<R> {
/// Print whole catalog to stdout /// Print whole catalog to stdout
pub fn dump(&mut self) -> Result<(), Error> { pub fn dump(&mut self) -> Result<(), Error> {
let root = self.root()?; let root = self.root()?;
match root { match root {
DirEntry { attr: DirEntryAttribute::Directory { start }, .. }=> { DirEntry {
self.dump_dir(std::path::Path::new("./"), start) attr: DirEntryAttribute::Directory { start },
} ..
} => self.dump_dir(std::path::Path::new("./"), start),
_ => unreachable!(), _ => unreachable!(),
} }
} }
/// Get the root DirEntry /// Get the root DirEntry
pub fn root(&mut self) -> Result<DirEntry, Error> { pub fn root(&mut self) -> Result<DirEntry, Error> {
// Root dir is special // Root dir is special
self.reader.seek(SeekFrom::Start(0))?; self.reader.seek(SeekFrom::Start(0))?;
let mut magic = [ 0u8; 8]; let mut magic = [0u8; 8];
self.reader.read_exact(&mut magic)?; self.reader.read_exact(&mut magic)?;
if magic != PROXMOX_CATALOG_FILE_MAGIC_1_0 { if magic != PROXMOX_CATALOG_FILE_MAGIC_1_0 {
bail!("got unexpected magic number for catalog"); bail!("got unexpected magic number for catalog");
} }
self.reader.seek(SeekFrom::End(-8))?; self.reader.seek(SeekFrom::End(-8))?;
let start = unsafe { self.reader.read_le_value::<u64>()? }; let start = unsafe { self.reader.read_le_value::<u64>()? };
Ok(DirEntry { name: b"".to_vec(), attr: DirEntryAttribute::Directory { start } }) Ok(DirEntry {
name: b"".to_vec(),
attr: DirEntryAttribute::Directory { start },
})
} }
/// Read all directory entries /// Read all directory entries
pub fn read_dir( pub fn read_dir(&mut self, parent: &DirEntry) -> Result<Vec<DirEntry>, Error> {
&mut self,
parent: &DirEntry,
) -> Result<Vec<DirEntry>, Error> {
let start = match parent.attr { let start = match parent.attr {
DirEntryAttribute::Directory { start } => start, DirEntryAttribute::Directory { start } => start,
_ => bail!("parent is not a directory - internal error"), _ => bail!("parent is not a directory - internal error"),
@ -487,10 +561,7 @@ impl <R: Read + Seek> CatalogReader<R> {
} }
/// Lookup a DirEntry from an absolute path /// Lookup a DirEntry from an absolute path
pub fn lookup_recursive( pub fn lookup_recursive(&mut self, path: &[u8]) -> Result<DirEntry, Error> {
&mut self,
path: &[u8],
) -> Result<DirEntry, Error> {
let mut current = self.root()?; let mut current = self.root()?;
if path == b"/" { if path == b"/" {
return Ok(current); return Ok(current);
@ -500,13 +571,17 @@ impl <R: Read + Seek> CatalogReader<R> {
&path[1..] &path[1..]
} else { } else {
path path
}.split(|c| *c == b'/'); }
.split(|c| *c == b'/');
for comp in components { for comp in components {
if let Some(entry) = self.lookup(&current, comp)? { if let Some(entry) = self.lookup(&current, comp)? {
current = entry; current = entry;
} else { } else {
bail!("path {:?} not found in catalog", String::from_utf8_lossy(path)); bail!(
"path {:?} not found in catalog",
String::from_utf8_lossy(path)
);
} }
} }
Ok(current) Ok(current)
@ -517,8 +592,7 @@ impl <R: Read + Seek> CatalogReader<R> {
&mut self, &mut self,
parent: &DirEntry, parent: &DirEntry,
filename: &[u8], filename: &[u8],
) -> Result<Option<DirEntry>, Error> { ) -> Result<Option<DirEntry>, Error> {
let start = match parent.attr { let start = match parent.attr {
DirEntryAttribute::Directory { start } => start, DirEntryAttribute::Directory { start } => start,
_ => bail!("parent is not a directory - internal error"), _ => bail!("parent is not a directory - internal error"),
@ -541,21 +615,21 @@ impl <R: Read + Seek> CatalogReader<R> {
} }
/// Read the raw directory info block from current reader position. /// Read the raw directory info block from current reader position.
fn read_raw_dirinfo_block(&mut self, start: u64) -> Result<Vec<u8>, Error> { fn read_raw_dirinfo_block(&mut self, start: u64) -> Result<Vec<u8>, Error> {
self.reader.seek(SeekFrom::Start(start))?; self.reader.seek(SeekFrom::Start(start))?;
let size = catalog_decode_u64(&mut self.reader)?; let size = catalog_decode_u64(&mut self.reader)?;
if size < 1 { bail!("got small directory size {}", size) }; if size < 1 {
bail!("got small directory size {}", size)
};
let data = self.reader.read_exact_allocated(size as usize)?; let data = self.reader.read_exact_allocated(size as usize)?;
Ok(data) Ok(data)
} }
/// Print the content of a directory to stdout /// Print the content of a directory to stdout
pub fn dump_dir(&mut self, prefix: &std::path::Path, start: u64) -> Result<(), Error> { pub fn dump_dir(&mut self, prefix: &std::path::Path, start: u64) -> Result<(), Error> {
let data = self.read_raw_dirinfo_block(start)?; let data = self.read_raw_dirinfo_block(start)?;
DirInfo::parse(&data, |etype, name, offset, size, mtime| { DirInfo::parse(&data, |etype, name, offset, size, mtime| {
let mut path = std::path::PathBuf::from(prefix); let mut path = std::path::PathBuf::from(prefix);
let name: &OsStr = OsStrExt::from_bytes(name); let name: &OsStr = OsStrExt::from_bytes(name);
path.push(name); path.push(name);
@ -575,13 +649,7 @@ impl <R: Read + Seek> CatalogReader<R> {
mtime_string = s; mtime_string = s;
} }
println!( println!("{} {:?} {} {}", etype, path, size, mtime_string,);
"{} {:?} {} {}",
etype,
path,
size,
mtime_string,
);
} }
_ => { _ => {
println!("{} {:?}", etype, path); println!("{} {:?}", etype, path);
@ -602,7 +670,7 @@ impl <R: Read + Seek> CatalogReader<R> {
callback: &mut dyn FnMut(&[u8]) -> Result<(), Error>, callback: &mut dyn FnMut(&[u8]) -> Result<(), Error>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let file_len = file_path.len(); let file_len = file_path.len();
for e in self.read_dir(parent)? { for e in self.read_dir(parent)? {
let is_dir = e.is_directory(); let is_dir = e.is_directory();
file_path.truncate(file_len); file_path.truncate(file_len);
if !e.name.starts_with(b"/") { if !e.name.starts_with(b"/") {
@ -688,11 +756,11 @@ pub fn catalog_encode_i64<W: Write>(writer: &mut W, v: i64) -> Result<(), Error>
/// value encoded is <= 2^63 (values > 2^63 cannot be represented in an i64) /// value encoded is <= 2^63 (values > 2^63 cannot be represented in an i64)
#[allow(clippy::neg_multiply)] #[allow(clippy::neg_multiply)]
pub fn catalog_decode_i64<R: Read>(reader: &mut R) -> Result<i64, Error> { pub fn catalog_decode_i64<R: Read>(reader: &mut R) -> Result<i64, Error> {
let mut v: u64 = 0; let mut v: u64 = 0;
let mut buf = [0u8]; let mut buf = [0u8];
for i in 0..11 { // only allow 11 bytes (70 bits + sign marker) for i in 0..11 {
// only allow 11 bytes (70 bits + sign marker)
if buf.is_empty() { if buf.is_empty() {
bail!("decode_i64 failed - unexpected EOB"); bail!("decode_i64 failed - unexpected EOB");
} }
@ -706,10 +774,10 @@ pub fn catalog_decode_i64<R: Read>(reader: &mut R) -> Result<i64, Error> {
} }
return Ok(((v - 1) as i64 * -1) - 1); // also handles i64::MIN return Ok(((v - 1) as i64 * -1) - 1); // also handles i64::MIN
} else if t < 128 { } else if t < 128 {
v |= (t as u64) << (i*7); v |= (t as u64) << (i * 7);
return Ok(v as i64); return Ok(v as i64);
} else { } else {
v |= ((t & 127) as u64) << (i*7); v |= ((t & 127) as u64) << (i * 7);
} }
} }
@ -741,21 +809,21 @@ pub fn catalog_encode_u64<W: Write>(writer: &mut W, v: u64) -> Result<(), Error>
/// We currently read maximal 10 bytes, which give a maximum of 70 bits, /// We currently read maximal 10 bytes, which give a maximum of 70 bits,
/// but we currently only encode up to 64 bits /// but we currently only encode up to 64 bits
pub fn catalog_decode_u64<R: Read>(reader: &mut R) -> Result<u64, Error> { pub fn catalog_decode_u64<R: Read>(reader: &mut R) -> Result<u64, Error> {
let mut v: u64 = 0; let mut v: u64 = 0;
let mut buf = [0u8]; let mut buf = [0u8];
for i in 0..10 { // only allow 10 bytes (70 bits) for i in 0..10 {
// only allow 10 bytes (70 bits)
if buf.is_empty() { if buf.is_empty() {
bail!("decode_u64 failed - unexpected EOB"); bail!("decode_u64 failed - unexpected EOB");
} }
reader.read_exact(&mut buf)?; reader.read_exact(&mut buf)?;
let t = buf[0]; let t = buf[0];
if t < 128 { if t < 128 {
v |= (t as u64) << (i*7); v |= (t as u64) << (i * 7);
return Ok(v); return Ok(v);
} else { } else {
v |= ((t & 127) as u64) << (i*7); v |= ((t & 127) as u64) << (i * 7);
} }
} }
@ -764,9 +832,7 @@ pub fn catalog_decode_u64<R: Read>(reader: &mut R) -> Result<u64, Error> {
#[test] #[test]
fn test_catalog_u64_encoder() { fn test_catalog_u64_encoder() {
fn test_encode_decode(value: u64) { fn test_encode_decode(value: u64) {
let mut data = Vec::new(); let mut data = Vec::new();
catalog_encode_u64(&mut data, value).unwrap(); catalog_encode_u64(&mut data, value).unwrap();
@ -782,17 +848,15 @@ fn test_catalog_u64_encoder() {
test_encode_decode(u64::MIN); test_encode_decode(u64::MIN);
test_encode_decode(126); test_encode_decode(126);
test_encode_decode((1<<12)-1); test_encode_decode((1 << 12) - 1);
test_encode_decode((1<<20)-1); test_encode_decode((1 << 20) - 1);
test_encode_decode((1<<50)-1); test_encode_decode((1 << 50) - 1);
test_encode_decode(u64::MAX); test_encode_decode(u64::MAX);
} }
#[test] #[test]
fn test_catalog_i64_encoder() { fn test_catalog_i64_encoder() {
fn test_encode_decode(value: i64) { fn test_encode_decode(value: i64) {
let mut data = Vec::new(); let mut data = Vec::new();
catalog_encode_i64(&mut data, value).unwrap(); catalog_encode_i64(&mut data, value).unwrap();
@ -806,19 +870,17 @@ fn test_catalog_i64_encoder() {
test_encode_decode(-0); test_encode_decode(-0);
test_encode_decode(126); test_encode_decode(126);
test_encode_decode(-126); test_encode_decode(-126);
test_encode_decode((1<<12)-1); test_encode_decode((1 << 12) - 1);
test_encode_decode(-(1<<12)-1); test_encode_decode(-(1 << 12) - 1);
test_encode_decode((1<<20)-1); test_encode_decode((1 << 20) - 1);
test_encode_decode(-(1<<20)-1); test_encode_decode(-(1 << 20) - 1);
test_encode_decode(i64::MIN); test_encode_decode(i64::MIN);
test_encode_decode(i64::MAX); test_encode_decode(i64::MAX);
} }
#[test] #[test]
fn test_catalog_i64_compatibility() { fn test_catalog_i64_compatibility() {
fn test_encode_decode(value: u64) { fn test_encode_decode(value: u64) {
let mut data = Vec::new(); let mut data = Vec::new();
catalog_encode_u64(&mut data, value).unwrap(); catalog_encode_u64(&mut data, value).unwrap();
@ -830,9 +892,9 @@ fn test_catalog_i64_compatibility() {
test_encode_decode(u64::MIN); test_encode_decode(u64::MIN);
test_encode_decode(126); test_encode_decode(126);
test_encode_decode((1<<12)-1); test_encode_decode((1 << 12) - 1);
test_encode_decode((1<<20)-1); test_encode_decode((1 << 20) - 1);
test_encode_decode((1<<50)-1); test_encode_decode((1 << 50) - 1);
test_encode_decode(u64::MAX); test_encode_decode(u64::MAX);
} }
@ -850,10 +912,10 @@ pub struct ArchiveEntry {
/// Is this entry a leaf node, or does it have children (i.e. a directory)? /// Is this entry a leaf node, or does it have children (i.e. a directory)?
pub leaf: bool, pub leaf: bool,
/// The file size, if entry_type is 'f' (file) /// The file size, if entry_type is 'f' (file)
#[serde(skip_serializing_if="Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub size: Option<u64>, pub size: Option<u64>,
/// The file "last modified" time stamp, if entry_type is 'f' (file) /// The file "last modified" time stamp, if entry_type is 'f' (file)
#[serde(skip_serializing_if="Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub mtime: Option<i64>, pub mtime: Option<i64>,
} }

View File

@ -1,6 +1,6 @@
use anyhow::{Error}; use anyhow::Error;
use std::sync::Arc;
use std::io::Read; use std::io::Read;
use std::sync::Arc;
use proxmox_borrow::Tied; use proxmox_borrow::Tied;
@ -12,8 +12,7 @@ pub struct ChecksumReader<R> {
signer: Option<Tied<Arc<CryptConfig>, openssl::sign::Signer<'static>>>, signer: Option<Tied<Arc<CryptConfig>, openssl::sign::Signer<'static>>>,
} }
impl <R: Read> ChecksumReader<R> { impl<R: Read> ChecksumReader<R> {
pub fn new(reader: R, config: Option<Arc<CryptConfig>>) -> Self { pub fn new(reader: R, config: Option<Arc<CryptConfig>>) -> Self {
let hasher = crc32fast::Hasher::new(); let hasher = crc32fast::Hasher::new();
let signer = match config { let signer = match config {
@ -26,7 +25,11 @@ impl <R: Read> ChecksumReader<R> {
None => None, None => None,
}; };
Self { reader, hasher, signer } Self {
reader,
hasher,
signer,
}
} }
pub fn finish(mut self) -> Result<(R, u32, Option<[u8; 32]>), Error> { pub fn finish(mut self) -> Result<(R, u32, Option<[u8; 32]>), Error> {
@ -42,19 +45,18 @@ impl <R: Read> ChecksumReader<R> {
} }
} }
impl <R: Read> Read for ChecksumReader<R> { impl<R: Read> Read for ChecksumReader<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let count = self.reader.read(buf)?; let count = self.reader.read(buf)?;
if count > 0 { if count > 0 {
self.hasher.update(&buf[..count]); self.hasher.update(&buf[..count]);
if let Some(ref mut signer) = self.signer { if let Some(ref mut signer) = self.signer {
signer.update(&buf[..count]) signer.update(&buf[..count]).map_err(|err| {
.map_err(|err| { std::io::Error::new(
std::io::Error::new( std::io::ErrorKind::Other,
std::io::ErrorKind::Other, format!("hmac update failed - {}", err),
format!("hmac update failed - {}", err)) )
})?; })?;
} }
} }
Ok(count) Ok(count)

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::io::Write; use std::io::Write;
use std::sync::Arc;
use anyhow::{Error}; use anyhow::Error;
use proxmox_borrow::Tied; use proxmox_borrow::Tied;
@ -13,8 +13,7 @@ pub struct ChecksumWriter<W> {
signer: Option<Tied<Arc<CryptConfig>, openssl::sign::Signer<'static>>>, signer: Option<Tied<Arc<CryptConfig>, openssl::sign::Signer<'static>>>,
} }
impl <W: Write> ChecksumWriter<W> { impl<W: Write> ChecksumWriter<W> {
pub fn new(writer: W, config: Option<Arc<CryptConfig>>) -> Self { pub fn new(writer: W, config: Option<Arc<CryptConfig>>) -> Self {
let hasher = crc32fast::Hasher::new(); let hasher = crc32fast::Hasher::new();
let signer = match config { let signer = match config {
@ -26,7 +25,11 @@ impl <W: Write> ChecksumWriter<W> {
} }
None => None, None => None,
}; };
Self { writer, hasher, signer } Self {
writer,
hasher,
signer,
}
} }
pub fn finish(mut self) -> Result<(W, u32, Option<[u8; 32]>), Error> { pub fn finish(mut self) -> Result<(W, u32, Option<[u8; 32]>), Error> {
@ -42,17 +45,16 @@ impl <W: Write> ChecksumWriter<W> {
} }
} }
impl <W: Write> Write for ChecksumWriter<W> { impl<W: Write> Write for ChecksumWriter<W> {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> { fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.hasher.update(buf); self.hasher.update(buf);
if let Some(ref mut signer) = self.signer { if let Some(ref mut signer) = self.signer {
signer.update(buf) signer.update(buf).map_err(|err| {
.map_err(|err| { std::io::Error::new(
std::io::Error::new( std::io::ErrorKind::Other,
std::io::ErrorKind::Other, format!("hmac update failed - {}", err),
format!("hmac update failed - {}", err)) )
})?; })?;
} }
self.writer.write(buf) self.writer.write(buf)
} }

View File

@ -10,7 +10,6 @@ pub struct ChunkStat {
} }
impl ChunkStat { impl ChunkStat {
pub fn new(size: u64) -> Self { pub fn new(size: u64) -> Self {
ChunkStat { ChunkStat {
size, size,
@ -27,15 +26,14 @@ impl ChunkStat {
impl std::fmt::Debug for ChunkStat { impl std::fmt::Debug for ChunkStat {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let avg = ((self.size as f64)/(self.chunk_count as f64)) as usize; let avg = ((self.size as f64) / (self.chunk_count as f64)) as usize;
let compression = (self.compressed_size*100)/(self.size as u64); let compression = (self.compressed_size * 100) / (self.size as u64);
let rate = (self.disk_size*100)/(self.size as u64); let rate = (self.disk_size * 100) / (self.size as u64);
let elapsed = self.start_time.elapsed().unwrap(); let elapsed = self.start_time.elapsed().unwrap();
let elapsed = (elapsed.as_secs() as f64) + let elapsed = (elapsed.as_secs() as f64) + (elapsed.subsec_millis() as f64) / 1000.0;
(elapsed.subsec_millis() as f64)/1000.0;
let write_speed = ((self.size as f64)/(1024.0*1024.0))/elapsed; let write_speed = ((self.size as f64) / (1024.0 * 1024.0)) / elapsed;
write!(f, "Size: {}, average chunk size: {}, compression rate: {}%, disk_size: {} ({}%), speed: {:.2} MB/s", write!(f, "Size: {}, average chunk size: {}, compression rate: {}%, disk_size: {} ({}%), speed: {:.2} MB/s",
self.size, avg, compression, self.disk_size, rate, write_speed) self.size, avg, compression, self.disk_size, rate, write_speed)

View File

@ -5,19 +5,20 @@ use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use proxmox_sys::fs::{CreateOptions, create_path, create_dir};
use proxmox_sys::process_locker::{ProcessLocker, ProcessLockSharedGuard, ProcessLockExclusiveGuard};
use proxmox_sys::WorkerTaskContext;
use proxmox_sys::task_log;
use pbs_api_types::GarbageCollectionStatus; use pbs_api_types::GarbageCollectionStatus;
use proxmox_sys::fs::{create_dir, create_path, CreateOptions};
use proxmox_sys::process_locker::{
ProcessLockExclusiveGuard, ProcessLockSharedGuard, ProcessLocker,
};
use proxmox_sys::task_log;
use proxmox_sys::WorkerTaskContext;
use crate::DataBlob; use crate::DataBlob;
/// File system based chunk store /// File system based chunk store
pub struct ChunkStore { pub struct ChunkStore {
name: String, // used for error reporting name: String, // used for error reporting
pub (crate) base: PathBuf, pub(crate) base: PathBuf,
chunk_dir: PathBuf, chunk_dir: PathBuf,
mutex: Mutex<()>, mutex: Mutex<()>,
locker: Arc<Mutex<ProcessLocker>>, locker: Arc<Mutex<ProcessLocker>>,
@ -26,8 +27,15 @@ pub struct ChunkStore {
// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ? // TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
pub fn verify_chunk_size(size: usize) -> Result<(), Error> { pub fn verify_chunk_size(size: usize) -> Result<(), Error> {
static SIZES: [usize; 7] = [
static SIZES: [usize; 7] = [64*1024, 128*1024, 256*1024, 512*1024, 1024*1024, 2048*1024, 4096*1024]; 64 * 1024,
128 * 1024,
256 * 1024,
512 * 1024,
1024 * 1024,
2048 * 1024,
4096 * 1024,
];
if !SIZES.contains(&size) { if !SIZES.contains(&size) {
bail!("Got unsupported chunk size '{}'", size); bail!("Got unsupported chunk size '{}'", size);
@ -36,26 +44,23 @@ pub fn verify_chunk_size(size: usize) -> Result<(), Error> {
} }
fn digest_to_prefix(digest: &[u8]) -> PathBuf { fn digest_to_prefix(digest: &[u8]) -> PathBuf {
let mut buf = Vec::<u8>::with_capacity(2 + 1 + 2 + 1);
let mut buf = Vec::<u8>::with_capacity(2+1+2+1);
const HEX_CHARS: &[u8; 16] = b"0123456789abcdef"; const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
buf.push(HEX_CHARS[(digest[0] as usize) >> 4]); buf.push(HEX_CHARS[(digest[0] as usize) >> 4]);
buf.push(HEX_CHARS[(digest[0] as usize) &0xf]); buf.push(HEX_CHARS[(digest[0] as usize) & 0xf]);
buf.push(HEX_CHARS[(digest[1] as usize) >> 4]); buf.push(HEX_CHARS[(digest[1] as usize) >> 4]);
buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]); buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]);
buf.push(b'/'); buf.push(b'/');
let path = unsafe { String::from_utf8_unchecked(buf)}; let path = unsafe { String::from_utf8_unchecked(buf) };
path.into() path.into()
} }
impl ChunkStore { impl ChunkStore {
fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf { fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref()); let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
chunk_dir.push(".chunks"); chunk_dir.push(".chunks");
@ -66,11 +71,16 @@ impl ChunkStore {
&self.base &self.base
} }
pub fn create<P>(name: &str, path: P, uid: nix::unistd::Uid, gid: nix::unistd::Gid, worker: Option<&dyn WorkerTaskContext>) -> Result<Self, Error> pub fn create<P>(
name: &str,
path: P,
uid: nix::unistd::Uid,
gid: nix::unistd::Gid,
worker: Option<&dyn WorkerTaskContext>,
) -> Result<Self, Error>
where where
P: Into<PathBuf>, P: Into<PathBuf>,
{ {
let base: PathBuf = path.into(); let base: PathBuf = path.into();
if !base.is_absolute() { if !base.is_absolute() {
@ -79,19 +89,31 @@ impl ChunkStore {
let chunk_dir = Self::chunk_dir(&base); let chunk_dir = Self::chunk_dir(&base);
let options = CreateOptions::new() let options = CreateOptions::new().owner(uid).group(gid);
.owner(uid)
.group(gid);
let default_options = CreateOptions::new(); let default_options = CreateOptions::new();
match create_path(&base, Some(default_options), Some(options.clone())) { match create_path(&base, Some(default_options), Some(options.clone())) {
Err(err) => bail!("unable to create chunk store '{}' at {:?} - {}", name, base, err), Err(err) => bail!(
Ok(res) => if ! res { nix::unistd::chown(&base, Some(uid), Some(gid))? }, "unable to create chunk store '{}' at {:?} - {}",
name,
base,
err
),
Ok(res) => {
if !res {
nix::unistd::chown(&base, Some(uid), Some(gid))?
}
}
} }
if let Err(err) = create_dir(&chunk_dir, options.clone()) { if let Err(err) = create_dir(&chunk_dir, options.clone()) {
bail!("unable to create chunk store '{}' subdir {:?} - {}", name, chunk_dir, err); bail!(
"unable to create chunk store '{}' subdir {:?} - {}",
name,
chunk_dir,
err
);
} }
// create lock file with correct owner/group // create lock file with correct owner/group
@ -101,13 +123,18 @@ impl ChunkStore {
// create 64*1024 subdirs // create 64*1024 subdirs
let mut last_percentage = 0; let mut last_percentage = 0;
for i in 0..64*1024 { for i in 0..64 * 1024 {
let mut l1path = chunk_dir.clone(); let mut l1path = chunk_dir.clone();
l1path.push(format!("{:04x}", i)); l1path.push(format!("{:04x}", i));
if let Err(err) = create_dir(&l1path, options.clone()) { if let Err(err) = create_dir(&l1path, options.clone()) {
bail!("unable to create chunk store '{}' subdir {:?} - {}", name, l1path, err); bail!(
"unable to create chunk store '{}' subdir {:?} - {}",
name,
l1path,
err
);
} }
let percentage = (i*100)/(64*1024); let percentage = (i * 100) / (64 * 1024);
if percentage != last_percentage { if percentage != last_percentage {
if let Some(worker) = worker { if let Some(worker) = worker {
task_log!(worker, "Chunkstore create: {}%", percentage) task_log!(worker, "Chunkstore create: {}%", percentage)
@ -128,7 +155,6 @@ impl ChunkStore {
} }
pub fn open<P: Into<PathBuf>>(name: &str, base: P) -> Result<Self, Error> { pub fn open<P: Into<PathBuf>>(name: &str, base: P) -> Result<Self, Error> {
let base: PathBuf = base.into(); let base: PathBuf = base.into();
if !base.is_absolute() { if !base.is_absolute() {
@ -138,7 +164,12 @@ impl ChunkStore {
let chunk_dir = Self::chunk_dir(&base); let chunk_dir = Self::chunk_dir(&base);
if let Err(err) = std::fs::metadata(&chunk_dir) { if let Err(err) = std::fs::metadata(&chunk_dir) {
bail!("unable to open chunk store '{}' at {:?} - {}", name, chunk_dir, err); bail!(
"unable to open chunk store '{}' at {:?} - {}",
name,
chunk_dir,
err
);
} }
let lockfile_path = Self::lockfile_path(&base); let lockfile_path = Self::lockfile_path(&base);
@ -150,7 +181,7 @@ impl ChunkStore {
base, base,
chunk_dir, chunk_dir,
locker, locker,
mutex: Mutex::new(()) mutex: Mutex::new(()),
}) })
} }
@ -159,7 +190,11 @@ impl ChunkStore {
Ok(()) Ok(())
} }
pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> { pub fn cond_touch_chunk(
&self,
digest: &[u8; 32],
fail_if_not_exist: bool,
) -> Result<bool, Error> {
let (chunk_path, _digest_str) = self.chunk_path(digest); let (chunk_path, _digest_str) = self.chunk_path(digest);
self.cond_touch_path(&chunk_path, fail_if_not_exist) self.cond_touch_path(&chunk_path, fail_if_not_exist)
} }
@ -169,8 +204,14 @@ impl ChunkStore {
const UTIME_OMIT: i64 = (1 << 30) - 2; const UTIME_OMIT: i64 = (1 << 30) - 2;
let times: [libc::timespec; 2] = [ let times: [libc::timespec; 2] = [
libc::timespec { tv_sec: 0, tv_nsec: UTIME_NOW }, libc::timespec {
libc::timespec { tv_sec: 0, tv_nsec: UTIME_OMIT } tv_sec: 0,
tv_nsec: UTIME_NOW,
},
libc::timespec {
tv_sec: 0,
tv_nsec: UTIME_OMIT,
},
]; ];
use nix::NixPath; use nix::NixPath;
@ -194,15 +235,16 @@ impl ChunkStore {
pub fn get_chunk_iterator( pub fn get_chunk_iterator(
&self, &self,
) -> Result< ) -> Result<
impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)> + std::iter::FusedIterator, impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>
Error + std::iter::FusedIterator,
Error,
> { > {
use nix::dir::Dir; use nix::dir::Dir;
use nix::fcntl::OFlag; use nix::fcntl::OFlag;
use nix::sys::stat::Mode; use nix::sys::stat::Mode;
let base_handle = Dir::open(&self.chunk_dir, OFlag::O_RDONLY, Mode::empty()) let base_handle =
.map_err(|err| { Dir::open(&self.chunk_dir, OFlag::O_RDONLY, Mode::empty()).map_err(|err| {
format_err!( format_err!(
"unable to open store '{}' chunk dir {:?} - {}", "unable to open store '{}' chunk dir {:?} - {}",
self.name, self.name,
@ -270,11 +312,16 @@ impl ChunkStore {
// other errors are fatal, so end our iteration // other errors are fatal, so end our iteration
done = true; done = true;
// and pass the error through: // and pass the error through:
return Some((Err(format_err!("unable to read subdir '{}' - {}", subdir, err)), percentage, false)); return Some((
Err(format_err!("unable to read subdir '{}' - {}", subdir, err)),
percentage,
false,
));
} }
} }
} }
}).fuse()) })
.fuse())
} }
pub fn oldest_writer(&self) -> Option<i64> { pub fn oldest_writer(&self) -> Option<i64> {
@ -291,7 +338,7 @@ impl ChunkStore {
use nix::sys::stat::fstatat; use nix::sys::stat::fstatat;
use nix::unistd::{unlinkat, UnlinkatFlags}; use nix::unistd::{unlinkat, UnlinkatFlags};
let mut min_atime = phase1_start_time - 3600*24; // at least 24h (see mount option relatime) let mut min_atime = phase1_start_time - 3600 * 24; // at least 24h (see mount option relatime)
if oldest_writer < min_atime { if oldest_writer < min_atime {
min_atime = oldest_writer; min_atime = oldest_writer;
@ -305,12 +352,7 @@ impl ChunkStore {
for (entry, percentage, bad) in self.get_chunk_iterator()? { for (entry, percentage, bad) in self.get_chunk_iterator()? {
if last_percentage != percentage { if last_percentage != percentage {
last_percentage = percentage; last_percentage = percentage;
task_log!( task_log!(worker, "processed {}% ({} chunks)", percentage, chunk_count,);
worker,
"processed {}% ({} chunks)",
percentage,
chunk_count,
);
} }
worker.check_abort()?; worker.check_abort()?;
@ -318,12 +360,19 @@ impl ChunkStore {
let (dirfd, entry) = match entry { let (dirfd, entry) = match entry {
Ok(entry) => (entry.parent_fd(), entry), Ok(entry) => (entry.parent_fd(), entry),
Err(err) => bail!("chunk iterator on chunk store '{}' failed - {}", self.name, err), Err(err) => bail!(
"chunk iterator on chunk store '{}' failed - {}",
self.name,
err
),
}; };
let file_type = match entry.file_type() { let file_type = match entry.file_type() {
Some(file_type) => file_type, Some(file_type) => file_type,
None => bail!("unsupported file system type on chunk store '{}'", self.name), None => bail!(
"unsupported file system type on chunk store '{}'",
self.name
),
}; };
if file_type != nix::dir::Type::File { if file_type != nix::dir::Type::File {
continue; continue;
@ -376,12 +425,7 @@ impl ChunkStore {
Ok(()) Ok(())
} }
pub fn insert_chunk( pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
&self,
chunk: &DataBlob,
digest: &[u8; 32],
) -> Result<(bool, u64), Error> {
//println!("DIGEST {}", hex::encode(digest)); //println!("DIGEST {}", hex::encode(digest));
let (chunk_path, digest_str) = self.chunk_path(digest); let (chunk_path, digest_str) = self.chunk_path(digest);
@ -393,7 +437,11 @@ impl ChunkStore {
self.touch_chunk(digest)?; self.touch_chunk(digest)?;
return Ok((true, metadata.len())); return Ok((true, metadata.len()));
} else { } else {
bail!("Got unexpected file type on store '{}' for chunk {}", self.name, digest_str); bail!(
"Got unexpected file type on store '{}' for chunk {}",
self.name,
digest_str
);
} }
} }
@ -422,7 +470,7 @@ impl ChunkStore {
})?; })?;
if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) { if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) {
if std::fs::remove_file(&tmp_path).is_err() { /* ignore */ } if std::fs::remove_file(&tmp_path).is_err() { /* ignore */ }
bail!( bail!(
"Atomic rename on store '{}' failed for chunk {} - {}", "Atomic rename on store '{}' failed for chunk {} - {}",
self.name, self.name,
@ -436,7 +484,7 @@ impl ChunkStore {
Ok((false, encoded_size)) Ok((false, encoded_size))
} }
pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) { pub fn chunk_path(&self, digest: &[u8; 32]) -> (PathBuf, String) {
let mut chunk_path = self.chunk_dir.clone(); let mut chunk_path = self.chunk_dir.clone();
let prefix = digest_to_prefix(digest); let prefix = digest_to_prefix(digest);
chunk_path.push(&prefix); chunk_path.push(&prefix);
@ -446,7 +494,6 @@ impl ChunkStore {
} }
pub fn relative_path(&self, path: &Path) -> PathBuf { pub fn relative_path(&self, path: &Path) -> PathBuf {
let mut full_path = self.base.clone(); let mut full_path = self.base.clone();
full_path.push(path); full_path.push(path);
full_path full_path
@ -469,10 +516,8 @@ impl ChunkStore {
} }
} }
#[test] #[test]
fn test_chunk_store1() { fn test_chunk_store1() {
let mut path = std::fs::canonicalize(".").unwrap(); // we need absolute path let mut path = std::fs::canonicalize(".").unwrap(); // we need absolute path
path.push(".testdir"); path.push(".testdir");
@ -481,10 +526,14 @@ fn test_chunk_store1() {
let chunk_store = ChunkStore::open("test", &path); let chunk_store = ChunkStore::open("test", &path);
assert!(chunk_store.is_err()); assert!(chunk_store.is_err());
let user = nix::unistd::User::from_uid(nix::unistd::Uid::current()).unwrap().unwrap(); let user = nix::unistd::User::from_uid(nix::unistd::Uid::current())
.unwrap()
.unwrap();
let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None).unwrap(); let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None).unwrap();
let (chunk, digest) = crate::data_blob::DataChunkBuilder::new(&[0u8, 1u8]).build().unwrap(); let (chunk, digest) = crate::data_blob::DataChunkBuilder::new(&[0u8, 1u8])
.build()
.unwrap();
let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap(); let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
assert!(!exists); assert!(!exists);
@ -492,7 +541,6 @@ fn test_chunk_store1() {
let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap(); let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
assert!(exists); assert!(exists);
let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None); let chunk_store = ChunkStore::create("test", &path, user.uid, user.gid, None);
assert!(chunk_store.is_err()); assert!(chunk_store.is_err());

View File

@ -1,5 +1,5 @@
use std::io::{BufRead, Read};
use std::sync::Arc; use std::sync::Arc;
use std::io::{Read, BufRead};
use anyhow::{bail, Error}; use anyhow::{bail, Error};
@ -13,9 +13,13 @@ pub struct CryptReader<R> {
finalized: bool, finalized: bool,
} }
impl <R: BufRead> CryptReader<R> { impl<R: BufRead> CryptReader<R> {
pub fn new(
pub fn new(reader: R, iv: [u8; 16], tag: [u8; 16], config: Arc<CryptConfig>) -> Result<Self, Error> { reader: R,
iv: [u8; 16],
tag: [u8; 16],
config: Arc<CryptConfig>,
) -> Result<Self, Error> {
let block_size = config.cipher().block_size(); // Note: block size is normally 1 byte for stream ciphers let block_size = config.cipher().block_size(); // Note: block size is normally 1 byte for stream ciphers
if block_size.count_ones() != 1 || block_size > 512 { if block_size.count_ones() != 1 || block_size > 512 {
bail!("unexpected Cipher block size {}", block_size); bail!("unexpected Cipher block size {}", block_size);
@ -23,7 +27,13 @@ impl <R: BufRead> CryptReader<R> {
let mut crypter = config.data_crypter(&iv, openssl::symm::Mode::Decrypt)?; let mut crypter = config.data_crypter(&iv, openssl::symm::Mode::Decrypt)?;
crypter.set_tag(&tag)?; crypter.set_tag(&tag)?;
Ok(Self { reader, crypter, block_size, finalized: false, small_read_buf: Vec::new() }) Ok(Self {
reader,
crypter,
block_size,
finalized: false,
small_read_buf: Vec::new(),
})
} }
pub fn finish(self) -> Result<R, Error> { pub fn finish(self) -> Result<R, Error> {
@ -34,11 +44,14 @@ impl <R: BufRead> CryptReader<R> {
} }
} }
impl <R: BufRead> Read for CryptReader<R> { impl<R: BufRead> Read for CryptReader<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
if !self.small_read_buf.is_empty() { if !self.small_read_buf.is_empty() {
let max = if self.small_read_buf.len() > buf.len() { buf.len() } else { self.small_read_buf.len() }; let max = if self.small_read_buf.len() > buf.len() {
buf.len()
} else {
self.small_read_buf.len()
};
let rest = self.small_read_buf.split_off(max); let rest = self.small_read_buf.split_off(max);
buf[..max].copy_from_slice(&self.small_read_buf); buf[..max].copy_from_slice(&self.small_read_buf);
self.small_read_buf = rest; self.small_read_buf = rest;
@ -48,10 +61,11 @@ impl <R: BufRead> Read for CryptReader<R> {
let data = self.reader.fill_buf()?; let data = self.reader.fill_buf()?;
// handle small read buffers // handle small read buffers
if buf.len() <= 2*self.block_size { if buf.len() <= 2 * self.block_size {
let mut outbuf = [0u8; 1024]; let mut outbuf = [0u8; 1024];
let count = if data.is_empty() { // EOF let count = if data.is_empty() {
// EOF
let written = self.crypter.finalize(&mut outbuf)?; let written = self.crypter.finalize(&mut outbuf)?;
self.finalized = true; self.finalized = true;
written written
@ -73,7 +87,8 @@ impl <R: BufRead> Read for CryptReader<R> {
buf[..count].copy_from_slice(&outbuf[..count]); buf[..count].copy_from_slice(&outbuf[..count]);
Ok(count) Ok(count)
} }
} else if data.is_empty() { // EOF } else if data.is_empty() {
// EOF
let rest = self.crypter.finalize(buf)?; let rest = self.crypter.finalize(buf)?;
self.finalized = true; self.finalized = true;
Ok(rest) Ok(rest)

View File

@ -1,5 +1,5 @@
use std::sync::Arc;
use std::io::Write; use std::io::Write;
use std::sync::Arc;
use anyhow::Error; use anyhow::Error;
@ -8,13 +8,12 @@ use pbs_tools::crypt_config::CryptConfig;
pub struct CryptWriter<W> { pub struct CryptWriter<W> {
writer: W, writer: W,
block_size: usize, block_size: usize,
encr_buf: Box<[u8; 64*1024]>, encr_buf: Box<[u8; 64 * 1024]>,
iv: [u8; 16], iv: [u8; 16],
crypter: openssl::symm::Crypter, crypter: openssl::symm::Crypter,
} }
impl <W: Write> CryptWriter<W> { impl<W: Write> CryptWriter<W> {
pub fn new(writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> { pub fn new(writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> {
let mut iv = [0u8; 16]; let mut iv = [0u8; 16];
proxmox_sys::linux::fill_with_random_data(&mut iv)?; proxmox_sys::linux::fill_with_random_data(&mut iv)?;
@ -22,10 +21,16 @@ impl <W: Write> CryptWriter<W> {
let crypter = config.data_crypter(&iv, openssl::symm::Mode::Encrypt)?; let crypter = config.data_crypter(&iv, openssl::symm::Mode::Encrypt)?;
Ok(Self { writer, iv, crypter, block_size, encr_buf: Box::new([0u8; 64*1024]) }) Ok(Self {
writer,
iv,
crypter,
block_size,
encr_buf: Box::new([0u8; 64 * 1024]),
})
} }
pub fn finish(mut self) -> Result<(W, [u8; 16], [u8; 16]), Error> { pub fn finish(mut self) -> Result<(W, [u8; 16], [u8; 16]), Error> {
let rest = self.crypter.finalize(self.encr_buf.as_mut())?; let rest = self.crypter.finalize(self.encr_buf.as_mut())?;
if rest > 0 { if rest > 0 {
self.writer.write_all(&self.encr_buf[..rest])?; self.writer.write_all(&self.encr_buf[..rest])?;
@ -40,18 +45,20 @@ impl <W: Write> CryptWriter<W> {
} }
} }
impl <W: Write> Write for CryptWriter<W> { impl<W: Write> Write for CryptWriter<W> {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> { fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let mut write_size = buf.len(); let mut write_size = buf.len();
if write_size > (self.encr_buf.len() - self.block_size) { if write_size > (self.encr_buf.len() - self.block_size) {
write_size = self.encr_buf.len() - self.block_size; write_size = self.encr_buf.len() - self.block_size;
} }
let count = self.crypter.update(&buf[..write_size], self.encr_buf.as_mut()) let count = self
.crypter
.update(&buf[..write_size], self.encr_buf.as_mut())
.map_err(|err| { .map_err(|err| {
std::io::Error::new( std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
format!("crypter update failed - {}", err)) format!("crypter update failed - {}", err),
)
})?; })?;
self.writer.write_all(&self.encr_buf[..count])?; self.writer.write_all(&self.encr_buf[..count])?;

View File

@ -6,12 +6,12 @@ use openssl::symm::{decrypt_aead, Mode};
use proxmox_io::{ReadExt, WriteExt}; use proxmox_io::{ReadExt, WriteExt};
use pbs_tools::crypt_config::CryptConfig;
use pbs_api_types::CryptMode; use pbs_api_types::CryptMode;
use pbs_tools::crypt_config::CryptConfig;
use super::file_formats::*; use super::file_formats::*;
const MAX_BLOB_SIZE: usize = 128*1024*1024; const MAX_BLOB_SIZE: usize = 128 * 1024 * 1024;
/// Encoded data chunk with digest and positional information /// Encoded data chunk with digest and positional information
pub struct ChunkInfo { pub struct ChunkInfo {
@ -35,9 +35,8 @@ pub struct DataBlob {
} }
impl DataBlob { impl DataBlob {
/// accessor to raw_data field /// accessor to raw_data field
pub fn raw_data(&self) -> &[u8] { pub fn raw_data(&self) -> &[u8] {
&self.raw_data &self.raw_data
} }
@ -59,13 +58,13 @@ impl DataBlob {
/// accessor to crc32 checksum /// accessor to crc32 checksum
pub fn crc(&self) -> u32 { pub fn crc(&self) -> u32 {
let crc_o = proxmox_lang::offsetof!(DataBlobHeader, crc); let crc_o = proxmox_lang::offsetof!(DataBlobHeader, crc);
u32::from_le_bytes(self.raw_data[crc_o..crc_o+4].try_into().unwrap()) u32::from_le_bytes(self.raw_data[crc_o..crc_o + 4].try_into().unwrap())
} }
// set the CRC checksum field // set the CRC checksum field
pub fn set_crc(&mut self, crc: u32) { pub fn set_crc(&mut self, crc: u32) {
let crc_o = proxmox_lang::offsetof!(DataBlobHeader, crc); let crc_o = proxmox_lang::offsetof!(DataBlobHeader, crc);
self.raw_data[crc_o..crc_o+4].copy_from_slice(&crc.to_le_bytes()); self.raw_data[crc_o..crc_o + 4].copy_from_slice(&crc.to_le_bytes());
} }
/// compute the CRC32 checksum /// compute the CRC32 checksum
@ -91,13 +90,11 @@ impl DataBlob {
config: Option<&CryptConfig>, config: Option<&CryptConfig>,
compress: bool, compress: bool,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
if data.len() > MAX_BLOB_SIZE { if data.len() > MAX_BLOB_SIZE {
bail!("data blob too large ({} bytes).", data.len()); bail!("data blob too large ({} bytes).", data.len());
} }
let mut blob = if let Some(config) = config { let mut blob = if let Some(config) = config {
let compr_data; let compr_data;
let (_compress, data, magic) = if compress { let (_compress, data, magic) = if compress {
compr_data = zstd::block::compress(data, 1)?; compr_data = zstd::block::compress(data, 1)?;
@ -115,7 +112,10 @@ impl DataBlob {
let mut raw_data = Vec::with_capacity(data.len() + header_len); let mut raw_data = Vec::with_capacity(data.len() + header_len);
let dummy_head = EncryptedDataBlobHeader { let dummy_head = EncryptedDataBlobHeader {
head: DataBlobHeader { magic: [0u8; 8], crc: [0; 4] }, head: DataBlobHeader {
magic: [0u8; 8],
crc: [0; 4],
},
iv: [0u8; 16], iv: [0u8; 16],
tag: [0u8; 16], tag: [0u8; 16],
}; };
@ -126,7 +126,9 @@ impl DataBlob {
let (iv, tag) = Self::encrypt_to(config, data, &mut raw_data)?; let (iv, tag) = Self::encrypt_to(config, data, &mut raw_data)?;
let head = EncryptedDataBlobHeader { let head = EncryptedDataBlobHeader {
head: DataBlobHeader { magic, crc: [0; 4] }, iv, tag, head: DataBlobHeader { magic, crc: [0; 4] },
iv,
tag,
}; };
unsafe { unsafe {
@ -135,12 +137,11 @@ impl DataBlob {
DataBlob { raw_data } DataBlob { raw_data }
} else { } else {
let max_data_len = data.len() + std::mem::size_of::<DataBlobHeader>(); let max_data_len = data.len() + std::mem::size_of::<DataBlobHeader>();
if compress { if compress {
let mut comp_data = Vec::with_capacity(max_data_len); let mut comp_data = Vec::with_capacity(max_data_len);
let head = DataBlobHeader { let head = DataBlobHeader {
magic: COMPRESSED_BLOB_MAGIC_1_0, magic: COMPRESSED_BLOB_MAGIC_1_0,
crc: [0; 4], crc: [0; 4],
}; };
@ -151,7 +152,9 @@ impl DataBlob {
zstd::stream::copy_encode(data, &mut comp_data, 1)?; zstd::stream::copy_encode(data, &mut comp_data, 1)?;
if comp_data.len() < max_data_len { if comp_data.len() < max_data_len {
let mut blob = DataBlob { raw_data: comp_data }; let mut blob = DataBlob {
raw_data: comp_data,
};
blob.set_crc(blob.compute_crc()); blob.set_crc(blob.compute_crc());
return Ok(blob); return Ok(blob);
} }
@ -159,7 +162,7 @@ impl DataBlob {
let mut raw_data = Vec::with_capacity(max_data_len); let mut raw_data = Vec::with_capacity(max_data_len);
let head = DataBlobHeader { let head = DataBlobHeader {
magic: UNCOMPRESSED_BLOB_MAGIC_1_0, magic: UNCOMPRESSED_BLOB_MAGIC_1_0,
crc: [0; 4], crc: [0; 4],
}; };
@ -180,18 +183,23 @@ impl DataBlob {
pub fn crypt_mode(&self) -> Result<CryptMode, Error> { pub fn crypt_mode(&self) -> Result<CryptMode, Error> {
let magic = self.magic(); let magic = self.magic();
Ok(if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 || magic == &COMPRESSED_BLOB_MAGIC_1_0 { Ok(
CryptMode::None if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 || magic == &COMPRESSED_BLOB_MAGIC_1_0 {
} else if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 { CryptMode::None
CryptMode::Encrypt } else if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 {
} else { CryptMode::Encrypt
bail!("Invalid blob magic number."); } else {
}) bail!("Invalid blob magic number.");
},
)
} }
/// Decode blob data /// Decode blob data
pub fn decode(&self, config: Option<&CryptConfig>, digest: Option<&[u8; 32]>) -> Result<Vec<u8>, Error> { pub fn decode(
&self,
config: Option<&CryptConfig>,
digest: Option<&[u8; 32]>,
) -> Result<Vec<u8>, Error> {
let magic = self.magic(); let magic = self.magic();
if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 { if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 {
@ -217,11 +225,21 @@ impl DataBlob {
(&self.raw_data[..header_len]).read_le_value::<EncryptedDataBlobHeader>()? (&self.raw_data[..header_len]).read_le_value::<EncryptedDataBlobHeader>()?
}; };
if let Some(config) = config { if let Some(config) = config {
let data = if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 { let data = if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 {
Self::decode_compressed_chunk(config, &self.raw_data[header_len..], &head.iv, &head.tag)? Self::decode_compressed_chunk(
config,
&self.raw_data[header_len..],
&head.iv,
&head.tag,
)?
} else { } else {
Self::decode_uncompressed_chunk(config, &self.raw_data[header_len..], &head.iv, &head.tag)? Self::decode_uncompressed_chunk(
config,
&self.raw_data[header_len..],
&head.iv,
&head.tag,
)?
}; };
if let Some(digest) = digest { if let Some(digest) = digest {
Self::verify_digest(&data, Some(config), digest)?; Self::verify_digest(&data, Some(config), digest)?;
@ -237,8 +255,7 @@ impl DataBlob {
/// Load blob from ``reader``, verify CRC /// Load blob from ``reader``, verify CRC
pub fn load_from_reader(reader: &mut dyn std::io::Read) -> Result<Self, Error> { pub fn load_from_reader(reader: &mut dyn std::io::Read) -> Result<Self, Error> {
let mut data = Vec::with_capacity(1024 * 1024);
let mut data = Vec::with_capacity(1024*1024);
reader.read_to_end(&mut data)?; reader.read_to_end(&mut data)?;
let blob = Self::from_raw(data)?; let blob = Self::from_raw(data)?;
@ -250,7 +267,6 @@ impl DataBlob {
/// Create Instance from raw data /// Create Instance from raw data
pub fn from_raw(data: Vec<u8>) -> Result<Self, Error> { pub fn from_raw(data: Vec<u8>) -> Result<Self, Error> {
if data.len() < std::mem::size_of::<DataBlobHeader>() { if data.len() < std::mem::size_of::<DataBlobHeader>() {
bail!("blob too small ({} bytes).", data.len()); bail!("blob too small ({} bytes).", data.len());
} }
@ -258,7 +274,6 @@ impl DataBlob {
let magic = &data[0..8]; let magic = &data[0..8];
if magic == ENCR_COMPR_BLOB_MAGIC_1_0 || magic == ENCRYPTED_BLOB_MAGIC_1_0 { if magic == ENCR_COMPR_BLOB_MAGIC_1_0 || magic == ENCRYPTED_BLOB_MAGIC_1_0 {
if data.len() < std::mem::size_of::<EncryptedDataBlobHeader>() { if data.len() < std::mem::size_of::<EncryptedDataBlobHeader>() {
bail!("encrypted blob too small ({} bytes).", data.len()); bail!("encrypted blob too small ({} bytes).", data.len());
} }
@ -267,7 +282,6 @@ impl DataBlob {
Ok(blob) Ok(blob)
} else if magic == COMPRESSED_BLOB_MAGIC_1_0 || magic == UNCOMPRESSED_BLOB_MAGIC_1_0 { } else if magic == COMPRESSED_BLOB_MAGIC_1_0 || magic == UNCOMPRESSED_BLOB_MAGIC_1_0 {
let blob = DataBlob { raw_data: data }; let blob = DataBlob { raw_data: data };
Ok(blob) Ok(blob)
@ -293,7 +307,6 @@ impl DataBlob {
expected_chunk_size: usize, expected_chunk_size: usize,
expected_digest: &[u8; 32], expected_digest: &[u8; 32],
) -> Result<(), Error> { ) -> Result<(), Error> {
let magic = self.magic(); let magic = self.magic();
if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 { if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 {
@ -304,7 +317,11 @@ impl DataBlob {
let data = self.decode(None, Some(expected_digest))?; let data = self.decode(None, Some(expected_digest))?;
if expected_chunk_size != data.len() { if expected_chunk_size != data.len() {
bail!("detected chunk with wrong length ({} != {})", expected_chunk_size, data.len()); bail!(
"detected chunk with wrong length ({} != {})",
expected_chunk_size,
data.len()
);
} }
Ok(()) Ok(())
@ -315,7 +332,6 @@ impl DataBlob {
config: Option<&CryptConfig>, config: Option<&CryptConfig>,
expected_digest: &[u8; 32], expected_digest: &[u8; 32],
) -> Result<(), Error> { ) -> Result<(), Error> {
let digest = match config { let digest = match config {
Some(config) => config.compute_digest(data), Some(config) => config.compute_digest(data),
None => openssl::sha::sha256(data), None => openssl::sha::sha256(data),
@ -344,8 +360,7 @@ impl DataBlob {
config: &CryptConfig, config: &CryptConfig,
data: &[u8], data: &[u8],
mut output: W, mut output: W,
) -> Result<([u8;16], [u8;16]), Error> { ) -> Result<([u8; 16], [u8; 16]), Error> {
let mut iv = [0u8; 16]; let mut iv = [0u8; 16];
proxmox_sys::linux::fill_with_random_data(&mut iv)?; proxmox_sys::linux::fill_with_random_data(&mut iv)?;
@ -353,7 +368,7 @@ impl DataBlob {
let mut c = config.data_crypter(&iv, Mode::Encrypt)?; let mut c = config.data_crypter(&iv, Mode::Encrypt)?;
const BUFFER_SIZE: usize = 32*1024; const BUFFER_SIZE: usize = 32 * 1024;
let mut encr_buf = [0u8; BUFFER_SIZE]; let mut encr_buf = [0u8; BUFFER_SIZE];
let max_encoder_input = BUFFER_SIZE - config.cipher().block_size(); let max_encoder_input = BUFFER_SIZE - config.cipher().block_size();
@ -361,7 +376,9 @@ impl DataBlob {
let mut start = 0; let mut start = 0;
loop { loop {
let mut end = start + max_encoder_input; let mut end = start + max_encoder_input;
if end > data.len() { end = data.len(); } if end > data.len() {
end = data.len();
}
if end > start { if end > start {
let count = c.update(&data[start..end], &mut encr_buf)?; let count = c.update(&data[start..end], &mut encr_buf)?;
output.write_all(&encr_buf[..count])?; output.write_all(&encr_buf[..count])?;
@ -372,7 +389,9 @@ impl DataBlob {
} }
let rest = c.finalize(&mut encr_buf)?; let rest = c.finalize(&mut encr_buf)?;
if rest > 0 { output.write_all(&encr_buf[..rest])?; } if rest > 0 {
output.write_all(&encr_buf[..rest])?;
}
output.flush()?; output.flush()?;
@ -388,14 +407,13 @@ impl DataBlob {
iv: &[u8; 16], iv: &[u8; 16],
tag: &[u8; 16], tag: &[u8; 16],
) -> Result<Vec<u8>, Error> { ) -> Result<Vec<u8>, Error> {
let dec = Vec::with_capacity(1024 * 1024);
let dec = Vec::with_capacity(1024*1024);
let mut decompressor = zstd::stream::write::Decoder::new(dec)?; let mut decompressor = zstd::stream::write::Decoder::new(dec)?;
let mut c = config.data_crypter(iv, Mode::Decrypt)?; let mut c = config.data_crypter(iv, Mode::Decrypt)?;
const BUFFER_SIZE: usize = 32*1024; const BUFFER_SIZE: usize = 32 * 1024;
let mut decr_buf = [0u8; BUFFER_SIZE]; let mut decr_buf = [0u8; BUFFER_SIZE];
let max_decoder_input = BUFFER_SIZE - config.cipher().block_size(); let max_decoder_input = BUFFER_SIZE - config.cipher().block_size();
@ -403,7 +421,9 @@ impl DataBlob {
let mut start = 0; let mut start = 0;
loop { loop {
let mut end = start + max_decoder_input; let mut end = start + max_decoder_input;
if end > data.len() { end = data.len(); } if end > data.len() {
end = data.len();
}
if end > start { if end > start {
let count = c.update(&data[start..end], &mut decr_buf)?; let count = c.update(&data[start..end], &mut decr_buf)?;
decompressor.write_all(&decr_buf[0..count])?; decompressor.write_all(&decr_buf[0..count])?;
@ -415,7 +435,9 @@ impl DataBlob {
c.set_tag(tag)?; c.set_tag(tag)?;
let rest = c.finalize(&mut decr_buf)?; let rest = c.finalize(&mut decr_buf)?;
if rest > 0 { decompressor.write_all(&decr_buf[..rest])?; } if rest > 0 {
decompressor.write_all(&decr_buf[..rest])?;
}
decompressor.flush()?; decompressor.flush()?;
@ -429,7 +451,6 @@ impl DataBlob {
iv: &[u8; 16], iv: &[u8; 16],
tag: &[u8; 16], tag: &[u8; 16],
) -> Result<Vec<u8>, Error> { ) -> Result<Vec<u8>, Error> {
let decr_data = decrypt_aead( let decr_data = decrypt_aead(
*config.cipher(), *config.cipher(),
config.enc_key(), config.enc_key(),
@ -441,7 +462,6 @@ impl DataBlob {
Ok(decr_data) Ok(decr_data)
} }
} }
/// Builder for chunk DataBlobs /// Builder for chunk DataBlobs
@ -457,8 +477,7 @@ pub struct DataChunkBuilder<'a, 'b> {
compress: bool, compress: bool,
} }
impl <'a, 'b> DataChunkBuilder<'a, 'b> { impl<'a, 'b> DataChunkBuilder<'a, 'b> {
/// Create a new builder instance. /// Create a new builder instance.
pub fn new(orig_data: &'a [u8]) -> Self { pub fn new(orig_data: &'a [u8]) -> Self {
Self { Self {
@ -537,5 +556,4 @@ impl <'a, 'b> DataChunkBuilder<'a, 'b> {
chunk_builder.build() chunk_builder.build()
} }
} }

View File

@ -1,9 +1,9 @@
use std::collections::{HashSet, HashMap}; use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::io::{self, Write}; use std::io::{self, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::convert::TryFrom;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
@ -11,43 +11,40 @@ use lazy_static::lazy_static;
use proxmox_schema::ApiType; use proxmox_schema::ApiType;
use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions}; use proxmox_sys::fs::{file_read_optional_string, replace_file, CreateOptions};
use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard};
use proxmox_sys::process_locker::ProcessLockSharedGuard; use proxmox_sys::process_locker::ProcessLockSharedGuard;
use proxmox_sys::WorkerTaskContext; use proxmox_sys::WorkerTaskContext;
use proxmox_sys::{task_log, task_warn}; use proxmox_sys::{task_log, task_warn};
use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard};
use pbs_api_types::{ use pbs_api_types::{
UPID, DataStoreConfig, Authid, GarbageCollectionStatus, HumanByte, Authid, ChunkOrder, DataStoreConfig, DatastoreTuning, GarbageCollectionStatus, HumanByte,
ChunkOrder, DatastoreTuning, Operation, Operation, UPID,
}; };
use pbs_config::{open_backup_lockfile, BackupLockGuard, ConfigVersionCache}; use pbs_config::{open_backup_lockfile, BackupLockGuard, ConfigVersionCache};
use crate::DataBlob; use crate::backup_info::{BackupDir, BackupGroup};
use crate::backup_info::{BackupGroup, BackupDir};
use crate::chunk_store::ChunkStore; use crate::chunk_store::ChunkStore;
use crate::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; use crate::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
use crate::fixed_index::{FixedIndexReader, FixedIndexWriter}; use crate::fixed_index::{FixedIndexReader, FixedIndexWriter};
use crate::index::IndexFile; use crate::index::IndexFile;
use crate::manifest::{ use crate::manifest::{
MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME, CLIENT_LOG_BLOB_NAME, archive_type, ArchiveType, BackupManifest, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
ArchiveType, BackupManifest, MANIFEST_LOCK_NAME,
archive_type,
}; };
use crate::task_tracking::update_active_operations; use crate::task_tracking::update_active_operations;
use crate::DataBlob;
lazy_static! { lazy_static! {
static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new()); static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> =
Mutex::new(HashMap::new());
} }
/// checks if auth_id is owner, or, if owner is a token, if /// checks if auth_id is owner, or, if owner is a token, if
/// auth_id is the user of the token /// auth_id is the user of the token
pub fn check_backup_owner( pub fn check_backup_owner(owner: &Authid, auth_id: &Authid) -> Result<(), Error> {
owner: &Authid, let correct_owner =
auth_id: &Authid, owner == auth_id || (owner.is_token() && &Authid::from(owner.user().clone()) == auth_id);
) -> Result<(), Error> {
let correct_owner = owner == auth_id
|| (owner.is_token() && &Authid::from(owner.user().clone()) == auth_id);
if !correct_owner { if !correct_owner {
bail!("backup owner check failed ({} != {})", auth_id, owner); bail!("backup owner check failed ({} != {})", auth_id, owner);
} }
@ -147,14 +144,12 @@ impl DataStore {
} }
/// removes all datastores that are not configured anymore /// removes all datastores that are not configured anymore
pub fn remove_unused_datastores() -> Result<(), Error>{ pub fn remove_unused_datastores() -> Result<(), Error> {
let (config, _digest) = pbs_config::datastore::config()?; let (config, _digest) = pbs_config::datastore::config()?;
let mut map = DATASTORE_MAP.lock().unwrap(); let mut map = DATASTORE_MAP.lock().unwrap();
// removes all elements that are not in the config // removes all elements that are not in the config
map.retain(|key, _| { map.retain(|key, _| config.sections.contains_key(key));
config.sections.contains_key(key)
});
Ok(()) Ok(())
} }
@ -183,7 +178,8 @@ impl DataStore {
}; };
let tuning: DatastoreTuning = serde_json::from_value( let tuning: DatastoreTuning = serde_json::from_value(
DatastoreTuning::API_SCHEMA.parse_property_string(config.tuning.as_deref().unwrap_or(""))? DatastoreTuning::API_SCHEMA
.parse_property_string(config.tuning.as_deref().unwrap_or(""))?,
)?; )?;
let chunk_order = tuning.chunk_order.unwrap_or(ChunkOrder::Inode); let chunk_order = tuning.chunk_order.unwrap_or(ChunkOrder::Inode);
@ -202,21 +198,32 @@ impl DataStore {
&self, &self,
) -> Result< ) -> Result<
impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>, impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
Error Error,
> { > {
self.inner.chunk_store.get_chunk_iterator() self.inner.chunk_store.get_chunk_iterator()
} }
pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> { pub fn create_fixed_writer<P: AsRef<Path>>(
&self,
let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?; filename: P,
size: usize,
chunk_size: usize,
) -> Result<FixedIndexWriter, Error> {
let index = FixedIndexWriter::create(
self.inner.chunk_store.clone(),
filename.as_ref(),
size,
chunk_size,
)?;
Ok(index) Ok(index)
} }
pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> { pub fn open_fixed_reader<P: AsRef<Path>>(
&self,
let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); filename: P,
) -> Result<FixedIndexReader, Error> {
let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
let index = FixedIndexReader::open(&full_path)?; let index = FixedIndexReader::open(&full_path)?;
@ -224,18 +231,19 @@ impl DataStore {
} }
pub fn create_dynamic_writer<P: AsRef<Path>>( pub fn create_dynamic_writer<P: AsRef<Path>>(
&self, filename: P, &self,
filename: P,
) -> Result<DynamicIndexWriter, Error> { ) -> Result<DynamicIndexWriter, Error> {
let index = DynamicIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref())?;
let index = DynamicIndexWriter::create(
self.inner.chunk_store.clone(), filename.as_ref())?;
Ok(index) Ok(index)
} }
pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> { pub fn open_dynamic_reader<P: AsRef<Path>>(
&self,
let full_path = self.inner.chunk_store.relative_path(filename.as_ref()); filename: P,
) -> Result<DynamicIndexReader, Error> {
let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
let index = DynamicIndexReader::open(&full_path)?; let index = DynamicIndexReader::open(&full_path)?;
@ -247,12 +255,11 @@ impl DataStore {
P: AsRef<Path>, P: AsRef<Path>,
{ {
let filename = filename.as_ref(); let filename = filename.as_ref();
let out: Box<dyn IndexFile + Send> = let out: Box<dyn IndexFile + Send> = match archive_type(filename)? {
match archive_type(filename)? { ArchiveType::DynamicIndex => Box::new(self.open_dynamic_reader(filename)?),
ArchiveType::DynamicIndex => Box::new(self.open_dynamic_reader(filename)?), ArchiveType::FixedIndex => Box::new(self.open_fixed_reader(filename)?),
ArchiveType::FixedIndex => Box::new(self.open_fixed_reader(filename)?), _ => bail!("cannot open index file of unknown type: {:?}", filename),
_ => bail!("cannot open index file of unknown type: {:?}", filename), };
};
Ok(out) Ok(out)
} }
@ -260,23 +267,21 @@ impl DataStore {
pub fn fast_index_verification( pub fn fast_index_verification(
&self, &self,
index: &dyn IndexFile, index: &dyn IndexFile,
checked: &mut HashSet<[u8;32]>, checked: &mut HashSet<[u8; 32]>,
) -> Result<(), Error> { ) -> Result<(), Error> {
for pos in 0..index.index_count() { for pos in 0..index.index_count() {
let info = index.chunk_info(pos).unwrap(); let info = index.chunk_info(pos).unwrap();
if checked.contains(&info.digest) { if checked.contains(&info.digest) {
continue; continue;
} }
self.stat_chunk(&info.digest). self.stat_chunk(&info.digest).map_err(|err| {
map_err(|err| { format_err!(
format_err!( "fast_index_verification error, stat_chunk {} failed - {}",
"fast_index_verification error, stat_chunk {} failed - {}", hex::encode(&info.digest),
hex::encode(&info.digest), err,
err, )
) })?;
})?;
checked.insert(info.digest); checked.insert(info.digest);
} }
@ -295,25 +300,35 @@ impl DataStore {
/// Cleanup a backup directory /// Cleanup a backup directory
/// ///
/// Removes all files not mentioned in the manifest. /// Removes all files not mentioned in the manifest.
pub fn cleanup_backup_dir(&self, backup_dir: &BackupDir, manifest: &BackupManifest pub fn cleanup_backup_dir(
) -> Result<(), Error> { &self,
backup_dir: &BackupDir,
manifest: &BackupManifest,
) -> Result<(), Error> {
let mut full_path = self.base_path(); let mut full_path = self.base_path();
full_path.push(backup_dir.relative_path()); full_path.push(backup_dir.relative_path());
let mut wanted_files = HashSet::new(); let mut wanted_files = HashSet::new();
wanted_files.insert(MANIFEST_BLOB_NAME.to_string()); wanted_files.insert(MANIFEST_BLOB_NAME.to_string());
wanted_files.insert(CLIENT_LOG_BLOB_NAME.to_string()); wanted_files.insert(CLIENT_LOG_BLOB_NAME.to_string());
manifest.files().iter().for_each(|item| { wanted_files.insert(item.filename.clone()); }); manifest.files().iter().for_each(|item| {
wanted_files.insert(item.filename.clone());
});
for item in proxmox_sys::fs::read_subdir(libc::AT_FDCWD, &full_path)?.flatten() { for item in proxmox_sys::fs::read_subdir(libc::AT_FDCWD, &full_path)?.flatten() {
if let Some(file_type) = item.file_type() { if let Some(file_type) = item.file_type() {
if file_type != nix::dir::Type::File { continue; } if file_type != nix::dir::Type::File {
continue;
}
} }
let file_name = item.file_name().to_bytes(); let file_name = item.file_name().to_bytes();
if file_name == b"." || file_name == b".." { continue; }; if file_name == b"." || file_name == b".." {
continue;
};
if let Ok(name) = std::str::from_utf8(file_name) { if let Ok(name) = std::str::from_utf8(file_name) {
if wanted_files.contains(name) { continue; } if wanted_files.contains(name) {
continue;
}
} }
println!("remove unused file {:?}", item.file_name()); println!("remove unused file {:?}", item.file_name());
let dirfd = item.parent_fd(); let dirfd = item.parent_fd();
@ -339,11 +354,14 @@ impl DataStore {
/// Remove a complete backup group including all snapshots, returns true /// Remove a complete backup group including all snapshots, returns true
/// if all snapshots were removed, and false if some were protected /// if all snapshots were removed, and false if some were protected
pub fn remove_backup_group(&self, backup_group: &BackupGroup) -> Result<bool, Error> { pub fn remove_backup_group(&self, backup_group: &BackupGroup) -> Result<bool, Error> {
let full_path = self.group_path(backup_group); let full_path = self.group_path(backup_group);
let _guard = proxmox_sys::fs::lock_dir_noblock(&full_path, "backup group", "possible running backup")?; let _guard = proxmox_sys::fs::lock_dir_noblock(
&full_path,
"backup group",
"possible running backup",
)?;
log::info!("removing backup group {:?}", full_path); log::info!("removing backup group {:?}", full_path);
@ -360,22 +378,20 @@ impl DataStore {
if removed_all { if removed_all {
// no snapshots left, we can now safely remove the empty folder // no snapshots left, we can now safely remove the empty folder
std::fs::remove_dir_all(&full_path) std::fs::remove_dir_all(&full_path).map_err(|err| {
.map_err(|err| { format_err!(
format_err!( "removing backup group directory {:?} failed - {}",
"removing backup group directory {:?} failed - {}", full_path,
full_path, err,
err, )
) })?;
})?;
} }
Ok(removed_all) Ok(removed_all)
} }
/// Remove a backup directory including all content /// Remove a backup directory including all content
pub fn remove_backup_dir(&self, backup_dir: &BackupDir, force: bool) -> Result<(), Error> { pub fn remove_backup_dir(&self, backup_dir: &BackupDir, force: bool) -> Result<(), Error> {
let full_path = self.snapshot_path(backup_dir); let full_path = self.snapshot_path(backup_dir);
let (_guard, _manifest_guard); let (_guard, _manifest_guard);
@ -389,14 +405,9 @@ impl DataStore {
} }
log::info!("removing backup snapshot {:?}", full_path); log::info!("removing backup snapshot {:?}", full_path);
std::fs::remove_dir_all(&full_path) std::fs::remove_dir_all(&full_path).map_err(|err| {
.map_err(|err| { format_err!("removing backup snapshot {:?} failed - {}", full_path, err,)
format_err!( })?;
"removing backup snapshot {:?} failed - {}",
full_path,
err,
)
})?;
// the manifest does not exists anymore, we do not need to keep the lock // the manifest does not exists anymore, we do not need to keep the lock
if let Ok(path) = self.manifest_lock_path(backup_dir) { if let Ok(path) = self.manifest_lock_path(backup_dir) {
@ -460,7 +471,8 @@ impl DataStore {
open_options.create_new(true); open_options.create_new(true);
} }
let mut file = open_options.open(&path) let mut file = open_options
.open(&path)
.map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?; .map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?;
writeln!(file, "{}", auth_id) writeln!(file, "{}", auth_id)
@ -490,13 +502,21 @@ impl DataStore {
// create the last component now // create the last component now
match std::fs::create_dir(&full_path) { match std::fs::create_dir(&full_path) {
Ok(_) => { Ok(_) => {
let guard = lock_dir_noblock(&full_path, "backup group", "another backup is already running")?; let guard = lock_dir_noblock(
&full_path,
"backup group",
"another backup is already running",
)?;
self.set_owner(backup_group, auth_id, false)?; self.set_owner(backup_group, auth_id, false)?;
let owner = self.get_owner(backup_group)?; // just to be sure let owner = self.get_owner(backup_group)?; // just to be sure
Ok((owner, guard)) Ok((owner, guard))
} }
Err(ref err) if err.kind() == io::ErrorKind::AlreadyExists => { Err(ref err) if err.kind() == io::ErrorKind::AlreadyExists => {
let guard = lock_dir_noblock(&full_path, "backup group", "another backup is already running")?; let guard = lock_dir_noblock(
&full_path,
"backup group",
"another backup is already running",
)?;
let owner = self.get_owner(backup_group)?; // just to be sure let owner = self.get_owner(backup_group)?; // just to be sure
Ok((owner, guard)) Ok((owner, guard))
} }
@ -507,20 +527,28 @@ impl DataStore {
/// Creates a new backup snapshot inside a BackupGroup /// Creates a new backup snapshot inside a BackupGroup
/// ///
/// The BackupGroup directory needs to exist. /// The BackupGroup directory needs to exist.
pub fn create_locked_backup_dir(&self, backup_dir: &BackupDir) pub fn create_locked_backup_dir(
-> Result<(PathBuf, bool, DirLockGuard), Error> &self,
{ backup_dir: &BackupDir,
) -> Result<(PathBuf, bool, DirLockGuard), Error> {
let relative_path = backup_dir.relative_path(); let relative_path = backup_dir.relative_path();
let mut full_path = self.base_path(); let mut full_path = self.base_path();
full_path.push(&relative_path); full_path.push(&relative_path);
let lock = || let lock = || {
lock_dir_noblock(&full_path, "snapshot", "internal error - tried creating snapshot that's already in use"); lock_dir_noblock(
&full_path,
"snapshot",
"internal error - tried creating snapshot that's already in use",
)
};
match std::fs::create_dir(&full_path) { match std::fs::create_dir(&full_path) {
Ok(_) => Ok((relative_path, true, lock()?)), Ok(_) => Ok((relative_path, true, lock()?)),
Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok((relative_path, false, lock()?)), Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => {
Err(e) => Err(e.into()) Ok((relative_path, false, lock()?))
}
Err(e) => Err(e.into()),
} }
} }
@ -535,7 +563,8 @@ impl DataStore {
// make sure we skip .chunks (and other hidden files to keep it simple) // make sure we skip .chunks (and other hidden files to keep it simple)
fn is_hidden(entry: &walkdir::DirEntry) -> bool { fn is_hidden(entry: &walkdir::DirEntry) -> bool {
entry.file_name() entry
.file_name()
.to_str() .to_str()
.map(|s| s.starts_with('.')) .map(|s| s.starts_with('.'))
.unwrap_or(false) .unwrap_or(false)
@ -550,7 +579,11 @@ impl DataStore {
bail!("cannot continue garbage-collection safely, permission denied on: {:?}", path) bail!("cannot continue garbage-collection safely, permission denied on: {:?}", path)
} }
} else { } else {
bail!("unexpected error on datastore traversal: {} - {:?}", inner, path) bail!(
"unexpected error on datastore traversal: {} - {:?}",
inner,
path
)
} }
} else { } else {
bail!("unexpected error on datastore traversal: {}", inner) bail!("unexpected error on datastore traversal: {}", inner)
@ -563,11 +596,13 @@ impl DataStore {
Ok(entry) => entry.into_path(), Ok(entry) => entry.into_path(),
Err(err) => { Err(err) => {
handle_entry_err(err)?; handle_entry_err(err)?;
continue continue;
}, }
}; };
if let Ok(archive_type) = archive_type(&path) { if let Ok(archive_type) = archive_type(&path) {
if archive_type == ArchiveType::FixedIndex || archive_type == ArchiveType::DynamicIndex { if archive_type == ArchiveType::FixedIndex
|| archive_type == ArchiveType::DynamicIndex
{
list.push(path); list.push(path);
} }
} }
@ -584,7 +619,6 @@ impl DataStore {
status: &mut GarbageCollectionStatus, status: &mut GarbageCollectionStatus,
worker: &dyn WorkerTaskContext, worker: &dyn WorkerTaskContext,
) -> Result<(), Error> { ) -> Result<(), Error> {
status.index_file_count += 1; status.index_file_count += 1;
status.index_data_bytes += index.index_bytes(); status.index_data_bytes += index.index_bytes();
@ -620,7 +654,6 @@ impl DataStore {
status: &mut GarbageCollectionStatus, status: &mut GarbageCollectionStatus,
worker: &dyn WorkerTaskContext, worker: &dyn WorkerTaskContext,
) -> Result<(), Error> { ) -> Result<(), Error> {
let image_list = self.list_images()?; let image_list = self.list_images()?;
let image_count = image_list.len(); let image_count = image_list.len();
@ -629,7 +662,6 @@ impl DataStore {
let mut strange_paths_count: u64 = 0; let mut strange_paths_count: u64 = 0;
for (i, img) in image_list.into_iter().enumerate() { for (i, img) in image_list.into_iter().enumerate() {
worker.check_abort()?; worker.check_abort()?;
worker.fail_on_shutdown()?; worker.fail_on_shutdown()?;
@ -683,7 +715,6 @@ impl DataStore {
); );
} }
Ok(()) Ok(())
} }
@ -695,17 +726,23 @@ impl DataStore {
!matches!(self.inner.gc_mutex.try_lock(), Ok(_)) !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
} }
pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> { pub fn garbage_collection(
&self,
worker: &dyn WorkerTaskContext,
upid: &UPID,
) -> Result<(), Error> {
if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() { if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
// avoids that we run GC if an old daemon process has still a // avoids that we run GC if an old daemon process has still a
// running backup writer, which is not save as we have no "oldest // running backup writer, which is not save as we have no "oldest
// writer" information and thus no safe atime cutoff // writer" information and thus no safe atime cutoff
let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?; let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?;
let phase1_start_time = proxmox_time::epoch_i64(); let phase1_start_time = proxmox_time::epoch_i64();
let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time); let oldest_writer = self
.inner
.chunk_store
.oldest_writer()
.unwrap_or(phase1_start_time);
let mut gc_status = GarbageCollectionStatus::default(); let mut gc_status = GarbageCollectionStatus::default();
gc_status.upid = Some(upid.to_string()); gc_status.upid = Some(upid.to_string());
@ -751,7 +788,8 @@ impl DataStore {
); );
if gc_status.index_data_bytes > 0 { if gc_status.index_data_bytes > 0 {
let comp_per = (gc_status.disk_bytes as f64 * 100.)/gc_status.index_data_bytes as f64; let comp_per =
(gc_status.disk_bytes as f64 * 100.) / gc_status.index_data_bytes as f64;
task_log!( task_log!(
worker, worker,
"On-Disk usage: {} ({:.2}%)", "On-Disk usage: {} ({:.2}%)",
@ -763,7 +801,7 @@ impl DataStore {
task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks); task_log!(worker, "On-Disk chunks: {}", gc_status.disk_chunks);
let deduplication_factor = if gc_status.disk_bytes > 0 { let deduplication_factor = if gc_status.disk_bytes > 0 {
(gc_status.index_data_bytes as f64)/(gc_status.disk_bytes as f64) (gc_status.index_data_bytes as f64) / (gc_status.disk_bytes as f64)
} else { } else {
1.0 1.0
}; };
@ -771,7 +809,7 @@ impl DataStore {
task_log!(worker, "Deduplication factor: {:.2}", deduplication_factor); task_log!(worker, "Deduplication factor: {:.2}", deduplication_factor);
if gc_status.disk_chunks > 0 { if gc_status.disk_chunks > 0 {
let avg_chunk = gc_status.disk_bytes/(gc_status.disk_chunks as u64); let avg_chunk = gc_status.disk_bytes / (gc_status.disk_chunks as u64);
task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk)); task_log!(worker, "Average chunk size: {}", HumanByte::from(avg_chunk));
} }
@ -793,7 +831,6 @@ impl DataStore {
} }
*self.inner.last_gc_status.lock().unwrap() = gc_status; *self.inner.last_gc_status.lock().unwrap() = gc_status;
} else { } else {
bail!("Start GC failed - (already running/locked)"); bail!("Start GC failed - (already running/locked)");
} }
@ -805,19 +842,21 @@ impl DataStore {
self.inner.chunk_store.try_shared_lock() self.inner.chunk_store.try_shared_lock()
} }
pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) { pub fn chunk_path(&self, digest: &[u8; 32]) -> (PathBuf, String) {
self.inner.chunk_store.chunk_path(digest) self.inner.chunk_store.chunk_path(digest)
} }
pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> { pub fn cond_touch_chunk(
self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist) &self,
digest: &[u8; 32],
fail_if_not_exist: bool,
) -> Result<bool, Error> {
self.inner
.chunk_store
.cond_touch_chunk(digest, fail_if_not_exist)
} }
pub fn insert_chunk( pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
&self,
chunk: &DataBlob,
digest: &[u8; 32],
) -> Result<(bool, u64), Error> {
self.inner.chunk_store.insert_chunk(chunk, digest) self.inner.chunk_store.insert_chunk(chunk, digest)
} }
@ -829,38 +868,37 @@ impl DataStore {
proxmox_lang::try_block!({ proxmox_lang::try_block!({
let mut file = std::fs::File::open(&path)?; let mut file = std::fs::File::open(&path)?;
DataBlob::load_from_reader(&mut file) DataBlob::load_from_reader(&mut file)
}).map_err(|err| format_err!("unable to load blob '{:?}' - {}", path, err)) })
.map_err(|err| format_err!("unable to load blob '{:?}' - {}", path, err))
} }
pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> { pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest); let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
std::fs::metadata(chunk_path).map_err(Error::from) std::fs::metadata(chunk_path).map_err(Error::from)
} }
pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> { pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest); let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
proxmox_lang::try_block!({ proxmox_lang::try_block!({
let mut file = std::fs::File::open(&chunk_path)?; let mut file = std::fs::File::open(&chunk_path)?;
DataBlob::load_from_reader(&mut file) DataBlob::load_from_reader(&mut file)
}).map_err(|err| format_err!( })
"store '{}', unable to load chunk '{}' - {}", .map_err(|err| {
self.name(), format_err!(
digest_str, "store '{}', unable to load chunk '{}' - {}",
err, self.name(),
)) digest_str,
err,
)
})
} }
/// Returns the filename to lock a manifest /// Returns the filename to lock a manifest
/// ///
/// Also creates the basedir. The lockfile is located in /// Also creates the basedir. The lockfile is located in
/// '/run/proxmox-backup/locks/{datastore}/{type}/{id}/{timestamp}.index.json.lck' /// '/run/proxmox-backup/locks/{datastore}/{type}/{id}/{timestamp}.index.json.lck'
fn manifest_lock_path( fn manifest_lock_path(&self, backup_dir: &BackupDir) -> Result<String, Error> {
&self,
backup_dir: &BackupDir,
) -> Result<String, Error> {
let mut path = format!( let mut path = format!(
"/run/proxmox-backup/locks/{}/{}/{}", "/run/proxmox-backup/locks/{}/{}/{}",
self.name(), self.name(),
@ -869,32 +907,27 @@ impl DataStore {
); );
std::fs::create_dir_all(&path)?; std::fs::create_dir_all(&path)?;
use std::fmt::Write; use std::fmt::Write;
write!(path, "/{}{}", backup_dir.backup_time_string(), &MANIFEST_LOCK_NAME)?; write!(
path,
"/{}{}",
backup_dir.backup_time_string(),
&MANIFEST_LOCK_NAME
)?;
Ok(path) Ok(path)
} }
fn lock_manifest( fn lock_manifest(&self, backup_dir: &BackupDir) -> Result<BackupLockGuard, Error> {
&self,
backup_dir: &BackupDir,
) -> Result<BackupLockGuard, Error> {
let path = self.manifest_lock_path(backup_dir)?; let path = self.manifest_lock_path(backup_dir)?;
// update_manifest should never take a long time, so if someone else has // update_manifest should never take a long time, so if someone else has
// the lock we can simply block a bit and should get it soon // the lock we can simply block a bit and should get it soon
open_backup_lockfile(&path, Some(Duration::from_secs(5)), true) open_backup_lockfile(&path, Some(Duration::from_secs(5)), true)
.map_err(|err| { .map_err(|err| format_err!("unable to acquire manifest lock {:?} - {}", &path, err))
format_err!(
"unable to acquire manifest lock {:?} - {}", &path, err
)
})
} }
/// Load the manifest without a lock. Must not be written back. /// Load the manifest without a lock. Must not be written back.
pub fn load_manifest( pub fn load_manifest(&self, backup_dir: &BackupDir) -> Result<(BackupManifest, u64), Error> {
&self,
backup_dir: &BackupDir,
) -> Result<(BackupManifest, u64), Error> {
let blob = self.load_blob(backup_dir, MANIFEST_BLOB_NAME)?; let blob = self.load_blob(backup_dir, MANIFEST_BLOB_NAME)?;
let raw_size = blob.raw_size(); let raw_size = blob.raw_size();
let manifest = BackupManifest::try_from(blob)?; let manifest = BackupManifest::try_from(blob)?;
@ -908,7 +941,6 @@ impl DataStore {
backup_dir: &BackupDir, backup_dir: &BackupDir,
update_fn: impl FnOnce(&mut BackupManifest), update_fn: impl FnOnce(&mut BackupManifest),
) -> Result<(), Error> { ) -> Result<(), Error> {
let _guard = self.lock_manifest(backup_dir)?; let _guard = self.lock_manifest(backup_dir)?;
let (mut manifest, _) = self.load_manifest(backup_dir)?; let (mut manifest, _) = self.load_manifest(backup_dir)?;
@ -930,11 +962,7 @@ impl DataStore {
} }
/// Updates the protection status of the specified snapshot. /// Updates the protection status of the specified snapshot.
pub fn update_protection( pub fn update_protection(&self, backup_dir: &BackupDir, protection: bool) -> Result<(), Error> {
&self,
backup_dir: &BackupDir,
protection: bool
) -> Result<(), Error> {
let full_path = self.snapshot_path(backup_dir); let full_path = self.snapshot_path(backup_dir);
let _guard = lock_dir_noblock(&full_path, "snapshot", "possibly running or in use")?; let _guard = lock_dir_noblock(&full_path, "snapshot", "possibly running or in use")?;

View File

@ -9,21 +9,21 @@ use std::task::Context;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use proxmox_sys::mmap::Mmap;
use proxmox_io::ReadExt; use proxmox_io::ReadExt;
use proxmox_uuid::Uuid; use proxmox_sys::mmap::Mmap;
use proxmox_sys::process_locker::ProcessLockSharedGuard; use proxmox_sys::process_locker::ProcessLockSharedGuard;
use proxmox_uuid::Uuid;
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
use pbs_tools::lru_cache::LruCache; use pbs_tools::lru_cache::LruCache;
use crate::Chunker;
use crate::chunk_stat::ChunkStat; use crate::chunk_stat::ChunkStat;
use crate::chunk_store::ChunkStore; use crate::chunk_store::ChunkStore;
use crate::data_blob::{DataBlob, DataChunkBuilder}; use crate::data_blob::{DataBlob, DataChunkBuilder};
use crate::file_formats; use crate::file_formats;
use crate::index::{IndexFile, ChunkReadInfo}; use crate::index::{ChunkReadInfo, IndexFile};
use crate::read_chunk::ReadChunk; use crate::read_chunk::ReadChunk;
use crate::Chunker;
/// Header format definition for dynamic index files (`.dixd`) /// Header format definition for dynamic index files (`.dixd`)
#[repr(C)] #[repr(C)]
@ -228,7 +228,11 @@ impl IndexFile for DynamicIndexReader {
if pos >= self.index.len() { if pos >= self.index.len() {
return None; return None;
} }
let start = if pos == 0 { 0 } else { self.index[pos - 1].end() }; let start = if pos == 0 {
0
} else {
self.index[pos - 1].end()
};
let end = self.index[pos].end(); let end = self.index[pos].end();
@ -252,7 +256,7 @@ impl IndexFile for DynamicIndexReader {
let found_idx = self.binary_search(0, 0, end_idx, end, offset); let found_idx = self.binary_search(0, 0, end_idx, end, offset);
let found_idx = match found_idx { let found_idx = match found_idx {
Ok(i) => i, Ok(i) => i,
Err(_) => return None Err(_) => return None,
}; };
let found_start = if found_idx == 0 { let found_start = if found_idx == 0 {
@ -581,13 +585,16 @@ impl<S: ReadChunk> BufferedDynamicReader<S> {
fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
//let (start, end, data) = self.lru_cache.access( //let (start, end, data) = self.lru_cache.access(
let cached_chunk = self.lru_cache.access( let cached_chunk = self
idx, .lru_cache
&mut ChunkCacher { .access(
store: &mut self.store, idx,
index: &self.index, &mut ChunkCacher {
}, store: &mut self.store,
)?.ok_or_else(|| format_err!("chunk not found by cacher"))?; index: &self.index,
},
)?
.ok_or_else(|| format_err!("chunk not found by cacher"))?;
// fixme: avoid copy // fixme: avoid copy
self.read_buffer.clear(); self.read_buffer.clear();

View File

@ -38,7 +38,7 @@ pub const DYNAMIC_SIZED_CHUNK_INDEX_1_0: [u8; 8] = [28, 145, 78, 165, 25, 186, 1
/// This is basically the same format we use for chunks, but /// This is basically the same format we use for chunks, but
/// with other magic numbers so that we can distinguish them. /// with other magic numbers so that we can distinguish them.
#[derive(Endian)] #[derive(Endian)]
#[repr(C,packed)] #[repr(C, packed)]
pub struct DataBlobHeader { pub struct DataBlobHeader {
pub magic: [u8; 8], pub magic: [u8; 8],
pub crc: [u8; 4], pub crc: [u8; 4],
@ -52,7 +52,7 @@ pub struct DataBlobHeader {
/// ///
/// (MAGIC || CRC32 || IV || TAG || EncryptedData). /// (MAGIC || CRC32 || IV || TAG || EncryptedData).
#[derive(Endian)] #[derive(Endian)]
#[repr(C,packed)] #[repr(C, packed)]
pub struct EncryptedDataBlobHeader { pub struct EncryptedDataBlobHeader {
pub head: DataBlobHeader, pub head: DataBlobHeader,
pub iv: [u8; 16], pub iv: [u8; 16],

View File

@ -1,14 +1,14 @@
use std::fs::File; use std::fs::File;
use std::io::Write; use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::io::{Seek, SeekFrom};
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use proxmox_sys::process_locker::ProcessLockSharedGuard;
use proxmox_io::ReadExt; use proxmox_io::ReadExt;
use proxmox_sys::process_locker::ProcessLockSharedGuard;
use proxmox_uuid::Uuid; use proxmox_uuid::Uuid;
use crate::chunk_stat::ChunkStat; use crate::chunk_stat::ChunkStat;

View File

@ -45,13 +45,17 @@ pub trait IndexFile {
let mut most_used = Vec::new(); let mut most_used = Vec::new();
for (digest, count) in map { for (digest, count) in map {
if count <= 1 { continue; } if count <= 1 {
continue;
}
match most_used.binary_search_by_key(&count, |&(_digest, count)| count) { match most_used.binary_search_by_key(&count, |&(_digest, count)| count) {
Ok(p) => most_used.insert(p, (digest, count)), Ok(p) => most_used.insert(p, (digest, count)),
Err(p) => most_used.insert(p, (digest, count)), Err(p) => most_used.insert(p, (digest, count)),
} }
if most_used.len() > max { let _ = most_used.pop(); } if most_used.len() > max {
let _ = most_used.pop();
}
} }
let mut map = HashMap::new(); let mut map = HashMap::new();

View File

@ -146,7 +146,10 @@
pub const CATALOG_NAME: &str = "catalog.pcat1.didx"; pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
/// Directory path where active operations counters are saved. /// Directory path where active operations counters are saved.
pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations"); pub const ACTIVE_OPERATIONS_DIR: &str = concat!(
pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(),
"/active-operations"
);
#[macro_export] #[macro_export]
macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 { macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
@ -163,8 +166,8 @@ macro_rules! PROXMOX_BACKUP_READER_PROTOCOL_ID_V1 {
} }
pub mod backup_info; pub mod backup_info;
pub mod catalog;
pub mod cached_chunk_reader; pub mod cached_chunk_reader;
pub mod catalog;
pub mod checksum_reader; pub mod checksum_reader;
pub mod checksum_writer; pub mod checksum_writer;
pub mod chunk_stat; pub mod chunk_stat;

View File

@ -4,11 +4,11 @@ use std::sync::Arc;
use anyhow::{bail, Error}; use anyhow::{bail, Error};
use pbs_tools::crypt_config::CryptConfig;
use pbs_api_types::CryptMode; use pbs_api_types::CryptMode;
use pbs_tools::crypt_config::CryptConfig;
use crate::data_blob::DataBlob; use crate::data_blob::DataBlob;
use crate::read_chunk::{ReadChunk, AsyncReadChunk}; use crate::read_chunk::{AsyncReadChunk, ReadChunk};
use crate::DataStore; use crate::DataStore;
#[derive(Clone)] #[derive(Clone)]
@ -19,7 +19,11 @@ pub struct LocalChunkReader {
} }
impl LocalChunkReader { impl LocalChunkReader {
pub fn new(store: Arc<DataStore>, crypt_config: Option<Arc<CryptConfig>>, crypt_mode: CryptMode) -> Self { pub fn new(
store: Arc<DataStore>,
crypt_config: Option<Arc<CryptConfig>>,
crypt_mode: CryptMode,
) -> Self {
Self { Self {
store, store,
crypt_config, crypt_config,
@ -29,17 +33,15 @@ impl LocalChunkReader {
fn ensure_crypt_mode(&self, chunk_mode: CryptMode) -> Result<(), Error> { fn ensure_crypt_mode(&self, chunk_mode: CryptMode) -> Result<(), Error> {
match self.crypt_mode { match self.crypt_mode {
CryptMode::Encrypt => { CryptMode::Encrypt => match chunk_mode {
match chunk_mode { CryptMode::Encrypt => Ok(()),
CryptMode::Encrypt => Ok(()), CryptMode::SignOnly | CryptMode::None => {
CryptMode::SignOnly | CryptMode::None => bail!("Index and chunk CryptMode don't match."), bail!("Index and chunk CryptMode don't match.")
} }
}, },
CryptMode::SignOnly | CryptMode::None => { CryptMode::SignOnly | CryptMode::None => match chunk_mode {
match chunk_mode { CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."),
CryptMode::Encrypt => bail!("Index and chunk CryptMode don't match."), CryptMode::SignOnly | CryptMode::None => Ok(()),
CryptMode::SignOnly | CryptMode::None => Ok(()),
}
}, },
} }
} }
@ -66,7 +68,7 @@ impl AsyncReadChunk for LocalChunkReader {
&'a self, &'a self,
digest: &'a [u8; 32], digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> { ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
Box::pin(async move{ Box::pin(async move {
let (path, _) = self.store.chunk_path(digest); let (path, _) = self.store.chunk_path(digest);
let raw_data = tokio::fs::read(&path).await?; let raw_data = tokio::fs::read(&path).await?;
@ -85,7 +87,8 @@ impl AsyncReadChunk for LocalChunkReader {
Box::pin(async move { Box::pin(async move {
let chunk = AsyncReadChunk::read_raw_chunk(self, digest).await?; let chunk = AsyncReadChunk::read_raw_chunk(self, digest).await?;
let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?; let raw_data =
chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
// fixme: verify digest? // fixme: verify digest?

View File

@ -3,11 +3,11 @@ use std::path::Path;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use serde_json::{json, Value};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use pbs_tools::crypt_config::CryptConfig;
use pbs_api_types::{CryptMode, Fingerprint}; use pbs_api_types::{CryptMode, Fingerprint};
use pbs_tools::crypt_config::CryptConfig;
use crate::BackupDir; use crate::BackupDir;
@ -16,15 +16,18 @@ pub const MANIFEST_LOCK_NAME: &str = ".index.json.lck";
pub const CLIENT_LOG_BLOB_NAME: &str = "client.log.blob"; pub const CLIENT_LOG_BLOB_NAME: &str = "client.log.blob";
pub const ENCRYPTED_KEY_BLOB_NAME: &str = "rsa-encrypted.key.blob"; pub const ENCRYPTED_KEY_BLOB_NAME: &str = "rsa-encrypted.key.blob";
fn crypt_mode_none() -> CryptMode {
fn crypt_mode_none() -> CryptMode { CryptMode::None } CryptMode::None
fn empty_value() -> Value { json!({}) } }
fn empty_value() -> Value {
json!({})
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(rename_all="kebab-case")] #[serde(rename_all = "kebab-case")]
pub struct FileInfo { pub struct FileInfo {
pub filename: String, pub filename: String,
#[serde(default="crypt_mode_none")] // to be compatible with < 0.8.0 backups #[serde(default = "crypt_mode_none")] // to be compatible with < 0.8.0 backups
pub crypt_mode: CryptMode, pub crypt_mode: CryptMode,
pub size: u64, pub size: u64,
#[serde(with = "hex::serde")] #[serde(with = "hex::serde")]
@ -32,12 +35,11 @@ pub struct FileInfo {
} }
impl FileInfo { impl FileInfo {
/// Return expected CryptMode of referenced chunks /// Return expected CryptMode of referenced chunks
/// ///
/// Encrypted Indices should only reference encrypted chunks, while signed or plain indices /// Encrypted Indices should only reference encrypted chunks, while signed or plain indices
/// should only reference plain chunks. /// should only reference plain chunks.
pub fn chunk_crypt_mode (&self) -> CryptMode { pub fn chunk_crypt_mode(&self) -> CryptMode {
match self.crypt_mode { match self.crypt_mode {
CryptMode::Encrypt => CryptMode::Encrypt, CryptMode::Encrypt => CryptMode::Encrypt,
CryptMode::SignOnly | CryptMode::None => CryptMode::None, CryptMode::SignOnly | CryptMode::None => CryptMode::None,
@ -46,13 +48,13 @@ impl FileInfo {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
#[serde(rename_all="kebab-case")] #[serde(rename_all = "kebab-case")]
pub struct BackupManifest { pub struct BackupManifest {
backup_type: String, backup_type: String,
backup_id: String, backup_id: String,
backup_time: i64, backup_time: i64,
files: Vec<FileInfo>, files: Vec<FileInfo>,
#[serde(default="empty_value")] // to be compatible with < 0.8.0 backups #[serde(default = "empty_value")] // to be compatible with < 0.8.0 backups
pub unprotected: Value, pub unprotected: Value,
pub signature: Option<String>, pub signature: Option<String>,
} }
@ -78,14 +80,11 @@ impl ArchiveType {
} }
//#[deprecated(note = "use ArchivType::from_path instead")] later... //#[deprecated(note = "use ArchivType::from_path instead")] later...
pub fn archive_type<P: AsRef<Path>>( pub fn archive_type<P: AsRef<Path>>(archive_name: P) -> Result<ArchiveType, Error> {
archive_name: P,
) -> Result<ArchiveType, Error> {
ArchiveType::from_path(archive_name) ArchiveType::from_path(archive_name)
} }
impl BackupManifest { impl BackupManifest {
pub fn new(snapshot: BackupDir) -> Self { pub fn new(snapshot: BackupDir) -> Self {
Self { Self {
backup_type: snapshot.group().backup_type().into(), backup_type: snapshot.group().backup_type().into(),
@ -97,9 +96,20 @@ impl BackupManifest {
} }
} }
pub fn add_file(&mut self, filename: String, size: u64, csum: [u8; 32], crypt_mode: CryptMode) -> Result<(), Error> { pub fn add_file(
&mut self,
filename: String,
size: u64,
csum: [u8; 32],
crypt_mode: CryptMode,
) -> Result<(), Error> {
let _archive_type = ArchiveType::from_path(&filename)?; // check type let _archive_type = ArchiveType::from_path(&filename)?; // check type
self.files.push(FileInfo { filename, size, csum, crypt_mode }); self.files.push(FileInfo {
filename,
size,
csum,
crypt_mode,
});
Ok(()) Ok(())
} }
@ -108,7 +118,6 @@ impl BackupManifest {
} }
pub fn lookup_file_info(&self, name: &str) -> Result<&FileInfo, Error> { pub fn lookup_file_info(&self, name: &str) -> Result<&FileInfo, Error> {
let info = self.files.iter().find(|item| item.filename == name); let info = self.files.iter().find(|item| item.filename == name);
match info { match info {
@ -118,7 +127,6 @@ impl BackupManifest {
} }
pub fn verify_file(&self, name: &str, csum: &[u8; 32], size: u64) -> Result<(), Error> { pub fn verify_file(&self, name: &str, csum: &[u8; 32], size: u64) -> Result<(), Error> {
let info = self.lookup_file_info(name)?; let info = self.lookup_file_info(name)?;
if size != info.size { if size != info.size {
@ -146,7 +154,6 @@ impl BackupManifest {
} }
fn json_signature(data: &Value, crypt_config: &CryptConfig) -> Result<[u8; 32], Error> { fn json_signature(data: &Value, crypt_config: &CryptConfig) -> Result<[u8; 32], Error> {
let mut signed_data = data.clone(); let mut signed_data = data.clone();
signed_data.as_object_mut().unwrap().remove("unprotected"); // exclude signed_data.as_object_mut().unwrap().remove("unprotected"); // exclude
@ -161,7 +168,6 @@ impl BackupManifest {
/// Converts the Manifest into json string, and add a signature if there is a crypt_config. /// Converts the Manifest into json string, and add a signature if there is a crypt_config.
pub fn to_string(&self, crypt_config: Option<&CryptConfig>) -> Result<String, Error> { pub fn to_string(&self, crypt_config: Option<&CryptConfig>) -> Result<String, Error> {
let mut manifest = serde_json::to_value(&self)?; let mut manifest = serde_json::to_value(&self)?;
if let Some(crypt_config) = crypt_config { if let Some(crypt_config) = crypt_config {
@ -178,7 +184,7 @@ impl BackupManifest {
pub fn fingerprint(&self) -> Result<Option<Fingerprint>, Error> { pub fn fingerprint(&self) -> Result<Option<Fingerprint>, Error> {
match &self.unprotected["key-fingerprint"] { match &self.unprotected["key-fingerprint"] {
Value::Null => Ok(None), Value::Null => Ok(None),
value => Ok(Some(Deserialize::deserialize(value)?)) value => Ok(Some(Deserialize::deserialize(value)?)),
} }
} }
@ -210,7 +216,10 @@ impl BackupManifest {
} }
/// Try to read the manifest. This verifies the signature if there is a crypt_config. /// Try to read the manifest. This verifies the signature if there is a crypt_config.
pub fn from_data(data: &[u8], crypt_config: Option<&CryptConfig>) -> Result<BackupManifest, Error> { pub fn from_data(
data: &[u8],
crypt_config: Option<&CryptConfig>,
) -> Result<BackupManifest, Error> {
let json: Value = serde_json::from_slice(data)?; let json: Value = serde_json::from_slice(data)?;
let signature = json["signature"].as_str().map(String::from); let signature = json["signature"].as_str().map(String::from);
@ -243,13 +252,13 @@ impl BackupManifest {
} }
} }
impl TryFrom<super::DataBlob> for BackupManifest { impl TryFrom<super::DataBlob> for BackupManifest {
type Error = Error; type Error = Error;
fn try_from(blob: super::DataBlob) -> Result<Self, Error> { fn try_from(blob: super::DataBlob) -> Result<Self, Error> {
// no expected digest available // no expected digest available
let data = blob.decode(None, None) let data = blob
.decode(None, None)
.map_err(|err| format_err!("decode backup manifest blob failed - {}", err))?; .map_err(|err| format_err!("decode backup manifest blob failed - {}", err))?;
let json: Value = serde_json::from_slice(&data[..]) let json: Value = serde_json::from_slice(&data[..])
.map_err(|err| format_err!("unable to parse backup manifest json - {}", err))?; .map_err(|err| format_err!("unable to parse backup manifest json - {}", err))?;
@ -258,10 +267,8 @@ impl TryFrom<super::DataBlob> for BackupManifest {
} }
} }
#[test] #[test]
fn test_manifest_signature() -> Result<(), Error> { fn test_manifest_signature() -> Result<(), Error> {
use pbs_config::key_config::KeyDerivationConfig; use pbs_config::key_config::KeyDerivationConfig;
let pw = b"test"; let pw = b"test";
@ -291,7 +298,10 @@ fn test_manifest_signature() -> Result<(), Error> {
let manifest: Value = serde_json::from_str(&text)?; let manifest: Value = serde_json::from_str(&text)?;
let signature = manifest["signature"].as_str().unwrap().to_string(); let signature = manifest["signature"].as_str().unwrap().to_string();
assert_eq!(signature, "d7b446fb7db081662081d4b40fedd858a1d6307a5aff4ecff7d5bf4fd35679e9"); assert_eq!(
signature,
"d7b446fb7db081662081d4b40fedd858a1d6307a5aff4ecff7d5bf4fd35679e9"
);
let manifest: BackupManifest = serde_json::from_value(manifest)?; let manifest: BackupManifest = serde_json::from_value(manifest)?;
let expected_signature = hex::encode(&manifest.signature(&crypt_config)?); let expected_signature = hex::encode(&manifest.signature(&crypt_config)?);

View File

@ -1,14 +1,19 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::path::PathBuf; use std::path::PathBuf;
use anyhow::{Error}; use anyhow::Error;
use pbs_api_types::PruneOptions; use pbs_api_types::PruneOptions;
use super::BackupInfo; use super::BackupInfo;
#[derive(Clone, Copy, PartialEq, Eq)] #[derive(Clone, Copy, PartialEq, Eq)]
pub enum PruneMark { Protected, Keep, KeepPartial, Remove } pub enum PruneMark {
Protected,
Keep,
KeepPartial,
Remove,
}
impl PruneMark { impl PruneMark {
pub fn keep(self) -> bool { pub fn keep(self) -> bool {
@ -31,13 +36,12 @@ impl std::fmt::Display for PruneMark {
} }
} }
fn mark_selections<F: Fn(&BackupInfo) -> Result<String, Error>> ( fn mark_selections<F: Fn(&BackupInfo) -> Result<String, Error>>(
mark: &mut HashMap<PathBuf, PruneMark>, mark: &mut HashMap<PathBuf, PruneMark>,
list: &[BackupInfo], list: &[BackupInfo],
keep: usize, keep: usize,
select_id: F, select_id: F,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut include_hash = HashSet::new(); let mut include_hash = HashSet::new();
let mut already_included = HashSet::new(); let mut already_included = HashSet::new();
@ -51,17 +55,23 @@ fn mark_selections<F: Fn(&BackupInfo) -> Result<String, Error>> (
for info in list { for info in list {
let backup_id = info.backup_dir.relative_path(); let backup_id = info.backup_dir.relative_path();
if mark.get(&backup_id).is_some() { continue; } if mark.get(&backup_id).is_some() {
continue;
}
if info.protected { if info.protected {
mark.insert(backup_id, PruneMark::Protected); mark.insert(backup_id, PruneMark::Protected);
continue; continue;
} }
let sel_id: String = select_id(info)?; let sel_id: String = select_id(info)?;
if already_included.contains(&sel_id) { continue; } if already_included.contains(&sel_id) {
continue;
}
if !include_hash.contains(&sel_id) { if !include_hash.contains(&sel_id) {
if include_hash.len() >= keep { break; } if include_hash.len() >= keep {
break;
}
include_hash.insert(sel_id); include_hash.insert(sel_id);
mark.insert(backup_id, PruneMark::Keep); mark.insert(backup_id, PruneMark::Keep);
} else { } else {
@ -72,11 +82,7 @@ fn mark_selections<F: Fn(&BackupInfo) -> Result<String, Error>> (
Ok(()) Ok(())
} }
fn remove_incomplete_snapshots( fn remove_incomplete_snapshots(mark: &mut HashMap<PathBuf, PruneMark>, list: &[BackupInfo]) {
mark: &mut HashMap<PathBuf, PruneMark>,
list: &[BackupInfo],
) {
let mut keep_unfinished = true; let mut keep_unfinished = true;
for info in list.iter() { for info in list.iter() {
// backup is considered unfinished if there is no manifest // backup is considered unfinished if there is no manifest
@ -86,7 +92,8 @@ fn remove_incomplete_snapshots(
keep_unfinished = false; keep_unfinished = false;
} else { } else {
let backup_id = info.backup_dir.relative_path(); let backup_id = info.backup_dir.relative_path();
if keep_unfinished { // keep first unfinished if keep_unfinished {
// keep first unfinished
mark.insert(backup_id, PruneMark::KeepPartial); mark.insert(backup_id, PruneMark::KeepPartial);
} else { } else {
mark.insert(backup_id, PruneMark::Remove); mark.insert(backup_id, PruneMark::Remove);
@ -98,12 +105,36 @@ fn remove_incomplete_snapshots(
pub fn keeps_something(options: &PruneOptions) -> bool { pub fn keeps_something(options: &PruneOptions) -> bool {
let mut keep_something = false; let mut keep_something = false;
if let Some(count) = options.keep_last { if count > 0 { keep_something = true; } } if let Some(count) = options.keep_last {
if let Some(count) = options.keep_hourly { if count > 0 { keep_something = true; } } if count > 0 {
if let Some(count) = options.keep_daily { if count > 0 { keep_something = true; } } keep_something = true;
if let Some(count) = options.keep_weekly { if count > 0 { keep_something = true; } } }
if let Some(count) = options.keep_monthly { if count > 0 { keep_something = true; } } }
if let Some(count) = options.keep_yearly { if count > 0 { keep_something = true; } } if let Some(count) = options.keep_hourly {
if count > 0 {
keep_something = true;
}
}
if let Some(count) = options.keep_daily {
if count > 0 {
keep_something = true;
}
}
if let Some(count) = options.keep_weekly {
if count > 0 {
keep_something = true;
}
}
if let Some(count) = options.keep_monthly {
if count > 0 {
keep_something = true;
}
}
if let Some(count) = options.keep_yearly {
if count > 0 {
keep_something = true;
}
}
keep_something keep_something
} }
@ -148,7 +179,6 @@ pub fn compute_prune_info(
mut list: Vec<BackupInfo>, mut list: Vec<BackupInfo>,
options: &PruneOptions, options: &PruneOptions,
) -> Result<Vec<(BackupInfo, PruneMark)>, Error> { ) -> Result<Vec<(BackupInfo, PruneMark)>, Error> {
let mut mark = HashMap::new(); let mut mark = HashMap::new();
BackupInfo::sort_list(&mut list, false); BackupInfo::sort_list(&mut list, false);
@ -195,7 +225,8 @@ pub fn compute_prune_info(
})?; })?;
} }
let prune_info: Vec<(BackupInfo, PruneMark)> = list.into_iter() let prune_info: Vec<(BackupInfo, PruneMark)> = list
.into_iter()
.map(|info| { .map(|info| {
let backup_id = info.backup_dir.relative_path(); let backup_id = info.backup_dir.relative_path();
let mark = if info.protected { let mark = if info.protected {

View File

@ -1,7 +1,7 @@
use std::fs::File;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::fs::File;
use anyhow::{bail, Error}; use anyhow::{bail, Error};
use nix::dir::Dir; use nix::dir::Dir;
@ -9,9 +9,9 @@ use nix::dir::Dir;
use proxmox_sys::fs::lock_dir_noblock_shared; use proxmox_sys::fs::lock_dir_noblock_shared;
use crate::backup_info::BackupDir; use crate::backup_info::BackupDir;
use crate::index::IndexFile;
use crate::fixed_index::FixedIndexReader;
use crate::dynamic_index::DynamicIndexReader; use crate::dynamic_index::DynamicIndexReader;
use crate::fixed_index::FixedIndexReader;
use crate::index::IndexFile;
use crate::manifest::{archive_type, ArchiveType, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME}; use crate::manifest::{archive_type, ArchiveType, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME};
use crate::DataStore; use crate::DataStore;
use pbs_api_types::Operation; use pbs_api_types::Operation;
@ -27,24 +27,24 @@ pub struct SnapshotReader {
} }
impl SnapshotReader { impl SnapshotReader {
/// Lock snapshot, reads the manifest and returns a new instance /// Lock snapshot, reads the manifest and returns a new instance
pub fn new(datastore: Arc<DataStore>, snapshot: BackupDir) -> Result<Self, Error> { pub fn new(datastore: Arc<DataStore>, snapshot: BackupDir) -> Result<Self, Error> {
let snapshot_path = datastore.snapshot_path(&snapshot); let snapshot_path = datastore.snapshot_path(&snapshot);
let locked_dir = lock_dir_noblock_shared( let locked_dir =
&snapshot_path, lock_dir_noblock_shared(&snapshot_path, "snapshot", "locked by another operation")?;
"snapshot",
"locked by another operation")?;
let datastore_name = datastore.name().to_string(); let datastore_name = datastore.name().to_string();
let manifest = match datastore.load_manifest(&snapshot) { let manifest = match datastore.load_manifest(&snapshot) {
Ok((manifest, _)) => manifest, Ok((manifest, _)) => manifest,
Err(err) => { Err(err) => {
bail!("manifest load error on datastore '{}' snapshot '{}' - {}", bail!(
datastore_name, snapshot, err); "manifest load error on datastore '{}' snapshot '{}' - {}",
datastore_name,
snapshot,
err
);
} }
}; };
@ -53,12 +53,19 @@ impl SnapshotReader {
let mut file_list = Vec::new(); let mut file_list = Vec::new();
file_list.push(MANIFEST_BLOB_NAME.to_string()); file_list.push(MANIFEST_BLOB_NAME.to_string());
for item in manifest.files() { file_list.push(item.filename.clone()); } for item in manifest.files() {
file_list.push(item.filename.clone());
}
if client_log_path.exists() { if client_log_path.exists() {
file_list.push(CLIENT_LOG_BLOB_NAME.to_string()); file_list.push(CLIENT_LOG_BLOB_NAME.to_string());
} }
Ok(Self { snapshot, datastore_name, file_list, locked_dir }) Ok(Self {
snapshot,
datastore_name,
file_list,
locked_dir,
})
} }
/// Return the snapshot directory /// Return the snapshot directory
@ -89,7 +96,10 @@ impl SnapshotReader {
} }
/// Returns an iterator for all chunks not skipped by `skip_fn`. /// Returns an iterator for all chunks not skipped by `skip_fn`.
pub fn chunk_iterator<F: Fn(&[u8;32]) -> bool>(&self, skip_fn: F) -> Result<SnapshotChunkIterator<F>, Error> { pub fn chunk_iterator<F: Fn(&[u8; 32]) -> bool>(
&self,
skip_fn: F,
) -> Result<SnapshotChunkIterator<F>, Error> {
SnapshotChunkIterator::new(self, skip_fn) SnapshotChunkIterator::new(self, skip_fn)
} }
} }
@ -99,14 +109,14 @@ impl SnapshotReader {
/// Note: The iterator returns a `Result`, and the iterator state is /// Note: The iterator returns a `Result`, and the iterator state is
/// undefined after the first error. So it make no sense to continue /// undefined after the first error. So it make no sense to continue
/// iteration after the first error. /// iteration after the first error.
pub struct SnapshotChunkIterator<'a, F: Fn(&[u8;32]) -> bool> { pub struct SnapshotChunkIterator<'a, F: Fn(&[u8; 32]) -> bool> {
snapshot_reader: &'a SnapshotReader, snapshot_reader: &'a SnapshotReader,
todo_list: Vec<String>, todo_list: Vec<String>,
skip_fn: F, skip_fn: F,
current_index: Option<(Arc<Box<dyn IndexFile + Send>>, usize, Vec<(usize, u64)>)>, current_index: Option<(Arc<Box<dyn IndexFile + Send>>, usize, Vec<(usize, u64)>)>,
} }
impl <'a, F: Fn(&[u8;32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> { impl<'a, F: Fn(&[u8; 32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> {
type Item = Result<[u8; 32], Error>; type Item = Result<[u8; 32], Error>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
@ -118,15 +128,17 @@ impl <'a, F: Fn(&[u8;32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> {
let index: Box<dyn IndexFile + Send> = match archive_type(&filename)? { let index: Box<dyn IndexFile + Send> = match archive_type(&filename)? {
ArchiveType::FixedIndex => Box::new(FixedIndexReader::new(file)?), ArchiveType::FixedIndex => Box::new(FixedIndexReader::new(file)?),
ArchiveType::DynamicIndex => Box::new(DynamicIndexReader::new(file)?), ArchiveType::DynamicIndex => Box::new(DynamicIndexReader::new(file)?),
_ => bail!("SnapshotChunkIterator: got unknown file type - internal error"), _ => bail!(
"SnapshotChunkIterator: got unknown file type - internal error"
),
}; };
let datastore = let datastore = DataStore::lookup_datastore(
DataStore::lookup_datastore( self.snapshot_reader.datastore_name(),
self.snapshot_reader.datastore_name(), Some(Operation::Read),
Some(Operation::Read) )?;
)?; let order =
let order = datastore.get_chunks_in_order(&index, &self.skip_fn, |_| Ok(()))?; datastore.get_chunks_in_order(&index, &self.skip_fn, |_| Ok(()))?;
self.current_index = Some((Arc::new(index), 0, order)); self.current_index = Some((Arc::new(index), 0, order));
} else { } else {
@ -143,25 +155,29 @@ impl <'a, F: Fn(&[u8;32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> {
// pop next index // pop next index
} }
} }
}).transpose() })
.transpose()
} }
} }
impl <'a, F: Fn(&[u8;32]) -> bool> SnapshotChunkIterator<'a, F> { impl<'a, F: Fn(&[u8; 32]) -> bool> SnapshotChunkIterator<'a, F> {
pub fn new(snapshot_reader: &'a SnapshotReader, skip_fn: F) -> Result<Self, Error> { pub fn new(snapshot_reader: &'a SnapshotReader, skip_fn: F) -> Result<Self, Error> {
let mut todo_list = Vec::new(); let mut todo_list = Vec::new();
for filename in snapshot_reader.file_list() { for filename in snapshot_reader.file_list() {
match archive_type(filename)? { match archive_type(filename)? {
ArchiveType::FixedIndex | ArchiveType::DynamicIndex => { ArchiveType::FixedIndex | ArchiveType::DynamicIndex => {
todo_list.push(filename.to_owned()); todo_list.push(filename.to_owned());
}, }
ArchiveType::Blob => { /* no chunks, do nothing */ }, ArchiveType::Blob => { /* no chunks, do nothing */ }
} }
} }
Ok(Self { snapshot_reader, todo_list, current_index: None, skip_fn }) Ok(Self {
snapshot_reader,
todo_list,
current_index: None,
skip_fn,
})
} }
} }

View File

@ -15,7 +15,7 @@ impl StoreProgress {
pub fn new(total_groups: u64) -> Self { pub fn new(total_groups: u64) -> Self {
StoreProgress { StoreProgress {
total_groups, total_groups,
.. Default::default() ..Default::default()
} }
} }