add pbs-datastore module
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
22
pbs-datastore/Cargo.toml
Normal file
22
pbs-datastore/Cargo.toml
Normal file
@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "pbs-datastore"
|
||||
version = "0.1.0"
|
||||
authors = ["Proxmox Support Team <support@proxmox.com>"]
|
||||
edition = "2018"
|
||||
description = "low level pbs data storage access"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
crc32fast = "1"
|
||||
endian_trait = { version = "0.6", features = [ "arrays" ] }
|
||||
nix = "0.19.1"
|
||||
openssl = "0.10"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
zstd = { version = "0.6", features = [ "bindgen" ] }
|
||||
|
||||
pathpatterns = "0.1.2"
|
||||
pxar = { version = "0.10.1", features = [ "tokio-io" ] }
|
||||
|
||||
proxmox = { version = "0.11.5", default-features = false, features = [ "api-macro" ] }
|
||||
|
||||
pbs-tools = { path = "../pbs-tools" }
|
810
pbs-datastore/src/catalog.rs
Normal file
810
pbs-datastore/src/catalog.rs
Normal file
@ -0,0 +1,810 @@
|
||||
use std::convert::TryFrom;
|
||||
use std::ffi::{CStr, CString, OsStr};
|
||||
use std::fmt;
|
||||
use std::io::{Read, Write, Seek, SeekFrom};
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
|
||||
use pathpatterns::{MatchList, MatchType};
|
||||
use proxmox::tools::io::ReadExt;
|
||||
|
||||
use crate::file_formats::PROXMOX_CATALOG_FILE_MAGIC_1_0;
|
||||
|
||||
/// Trait for writing file list catalogs.
|
||||
///
|
||||
/// A file list catalog simply stores a directory tree. Such catalogs may be used as index to do a
|
||||
/// fast search for files.
|
||||
pub trait BackupCatalogWriter {
|
||||
fn start_directory(&mut self, name: &CStr) -> Result<(), Error>;
|
||||
fn end_directory(&mut self) -> Result<(), Error>;
|
||||
fn add_file(&mut self, name: &CStr, size: u64, mtime: i64) -> Result<(), Error>;
|
||||
fn add_symlink(&mut self, name: &CStr) -> Result<(), Error>;
|
||||
fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error>;
|
||||
fn add_block_device(&mut self, name: &CStr) -> Result<(), Error>;
|
||||
fn add_char_device(&mut self, name: &CStr) -> Result<(), Error>;
|
||||
fn add_fifo(&mut self, name: &CStr) -> Result<(), Error>;
|
||||
fn add_socket(&mut self, name: &CStr) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
#[repr(u8)]
|
||||
#[derive(Copy,Clone,PartialEq)]
|
||||
pub enum CatalogEntryType {
|
||||
Directory = b'd',
|
||||
File = b'f',
|
||||
Symlink = b'l',
|
||||
Hardlink = b'h',
|
||||
BlockDevice = b'b',
|
||||
CharDevice = b'c',
|
||||
Fifo = b'p', // Fifo,Pipe
|
||||
Socket = b's',
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for CatalogEntryType {
|
||||
type Error=Error;
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Error> {
|
||||
Ok(match value {
|
||||
b'd' => CatalogEntryType::Directory,
|
||||
b'f' => CatalogEntryType::File,
|
||||
b'l' => CatalogEntryType::Symlink,
|
||||
b'h' => CatalogEntryType::Hardlink,
|
||||
b'b' => CatalogEntryType::BlockDevice,
|
||||
b'c' => CatalogEntryType::CharDevice,
|
||||
b'p' => CatalogEntryType::Fifo,
|
||||
b's' => CatalogEntryType::Socket,
|
||||
_ => bail!("invalid CatalogEntryType value '{}'", char::from(value)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&DirEntryAttribute> for CatalogEntryType {
|
||||
fn from(value: &DirEntryAttribute) -> Self {
|
||||
match value {
|
||||
DirEntryAttribute::Directory { .. } => CatalogEntryType::Directory,
|
||||
DirEntryAttribute::File { .. } => CatalogEntryType::File,
|
||||
DirEntryAttribute::Symlink => CatalogEntryType::Symlink,
|
||||
DirEntryAttribute::Hardlink => CatalogEntryType::Hardlink,
|
||||
DirEntryAttribute::BlockDevice => CatalogEntryType::BlockDevice,
|
||||
DirEntryAttribute::CharDevice => CatalogEntryType::CharDevice,
|
||||
DirEntryAttribute::Fifo => CatalogEntryType::Fifo,
|
||||
DirEntryAttribute::Socket => CatalogEntryType::Socket,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for CatalogEntryType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", char::from(*self as u8))
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a named directory entry
|
||||
///
|
||||
/// The ``attr`` property contain the exact type with type specific
|
||||
/// attributes.
|
||||
#[derive(Clone, PartialEq)]
|
||||
pub struct DirEntry {
|
||||
pub name: Vec<u8>,
|
||||
pub attr: DirEntryAttribute,
|
||||
}
|
||||
|
||||
/// Used to specific additional attributes inside DirEntry
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum DirEntryAttribute {
|
||||
Directory { start: u64 },
|
||||
File { size: u64, mtime: i64 },
|
||||
Symlink,
|
||||
Hardlink,
|
||||
BlockDevice,
|
||||
CharDevice,
|
||||
Fifo,
|
||||
Socket,
|
||||
}
|
||||
|
||||
impl DirEntry {
|
||||
|
||||
fn new(etype: CatalogEntryType, name: Vec<u8>, start: u64, size: u64, mtime: i64) -> Self {
|
||||
match etype {
|
||||
CatalogEntryType::Directory => {
|
||||
DirEntry { name, attr: DirEntryAttribute::Directory { start } }
|
||||
}
|
||||
CatalogEntryType::File => {
|
||||
DirEntry { name, attr: DirEntryAttribute::File { size, mtime } }
|
||||
}
|
||||
CatalogEntryType::Symlink => {
|
||||
DirEntry { name, attr: DirEntryAttribute::Symlink }
|
||||
}
|
||||
CatalogEntryType::Hardlink => {
|
||||
DirEntry { name, attr: DirEntryAttribute::Hardlink }
|
||||
}
|
||||
CatalogEntryType::BlockDevice => {
|
||||
DirEntry { name, attr: DirEntryAttribute::BlockDevice }
|
||||
}
|
||||
CatalogEntryType::CharDevice => {
|
||||
DirEntry { name, attr: DirEntryAttribute::CharDevice }
|
||||
}
|
||||
CatalogEntryType::Fifo => {
|
||||
DirEntry { name, attr: DirEntryAttribute::Fifo }
|
||||
}
|
||||
CatalogEntryType::Socket => {
|
||||
DirEntry { name, attr: DirEntryAttribute::Socket }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get file mode bits for this entry to be used with the `MatchList` api.
|
||||
pub fn get_file_mode(&self) -> Option<u32> {
|
||||
Some(
|
||||
match self.attr {
|
||||
DirEntryAttribute::Directory { .. } => pxar::mode::IFDIR,
|
||||
DirEntryAttribute::File { .. } => pxar::mode::IFREG,
|
||||
DirEntryAttribute::Symlink => pxar::mode::IFLNK,
|
||||
DirEntryAttribute::Hardlink => return None,
|
||||
DirEntryAttribute::BlockDevice => pxar::mode::IFBLK,
|
||||
DirEntryAttribute::CharDevice => pxar::mode::IFCHR,
|
||||
DirEntryAttribute::Fifo => pxar::mode::IFIFO,
|
||||
DirEntryAttribute::Socket => pxar::mode::IFSOCK,
|
||||
}
|
||||
as u32
|
||||
)
|
||||
}
|
||||
|
||||
/// Check if DirEntry is a directory
|
||||
pub fn is_directory(&self) -> bool {
|
||||
matches!(self.attr, DirEntryAttribute::Directory { .. })
|
||||
}
|
||||
|
||||
/// Check if DirEntry is a symlink
|
||||
pub fn is_symlink(&self) -> bool {
|
||||
matches!(self.attr, DirEntryAttribute::Symlink { .. })
|
||||
}
|
||||
}
|
||||
|
||||
struct DirInfo {
|
||||
name: CString,
|
||||
entries: Vec<DirEntry>,
|
||||
}
|
||||
|
||||
impl DirInfo {
|
||||
|
||||
fn new(name: CString) -> Self {
|
||||
DirInfo { name, entries: Vec::new() }
|
||||
}
|
||||
|
||||
fn new_rootdir() -> Self {
|
||||
DirInfo::new(CString::new(b"/".to_vec()).unwrap())
|
||||
}
|
||||
|
||||
fn encode_entry<W: Write>(
|
||||
writer: &mut W,
|
||||
entry: &DirEntry,
|
||||
pos: u64,
|
||||
) -> Result<(), Error> {
|
||||
match entry {
|
||||
DirEntry { name, attr: DirEntryAttribute::Directory { start } } => {
|
||||
writer.write_all(&[CatalogEntryType::Directory as u8])?;
|
||||
catalog_encode_u64(writer, name.len() as u64)?;
|
||||
writer.write_all(name)?;
|
||||
catalog_encode_u64(writer, pos - start)?;
|
||||
}
|
||||
DirEntry { name, attr: DirEntryAttribute::File { size, mtime } } => {
|
||||
writer.write_all(&[CatalogEntryType::File as u8])?;
|
||||
catalog_encode_u64(writer, name.len() as u64)?;
|
||||
writer.write_all(name)?;
|
||||
catalog_encode_u64(writer, *size)?;
|
||||
catalog_encode_i64(writer, *mtime)?;
|
||||
}
|
||||
DirEntry { name, attr: DirEntryAttribute::Symlink } => {
|
||||
writer.write_all(&[CatalogEntryType::Symlink as u8])?;
|
||||
catalog_encode_u64(writer, name.len() as u64)?;
|
||||
writer.write_all(name)?;
|
||||
}
|
||||
DirEntry { name, attr: DirEntryAttribute::Hardlink } => {
|
||||
writer.write_all(&[CatalogEntryType::Hardlink as u8])?;
|
||||
catalog_encode_u64(writer, name.len() as u64)?;
|
||||
writer.write_all(name)?;
|
||||
}
|
||||
DirEntry { name, attr: DirEntryAttribute::BlockDevice } => {
|
||||
writer.write_all(&[CatalogEntryType::BlockDevice as u8])?;
|
||||
catalog_encode_u64(writer, name.len() as u64)?;
|
||||
writer.write_all(name)?;
|
||||
}
|
||||
DirEntry { name, attr: DirEntryAttribute::CharDevice } => {
|
||||
writer.write_all(&[CatalogEntryType::CharDevice as u8])?;
|
||||
catalog_encode_u64(writer, name.len() as u64)?;
|
||||
writer.write_all(name)?;
|
||||
}
|
||||
DirEntry { name, attr: DirEntryAttribute::Fifo } => {
|
||||
writer.write_all(&[CatalogEntryType::Fifo as u8])?;
|
||||
catalog_encode_u64(writer, name.len() as u64)?;
|
||||
writer.write_all(name)?;
|
||||
}
|
||||
DirEntry { name, attr: DirEntryAttribute::Socket } => {
|
||||
writer.write_all(&[CatalogEntryType::Socket as u8])?;
|
||||
catalog_encode_u64(writer, name.len() as u64)?;
|
||||
writer.write_all(name)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn encode(self, start: u64) -> Result<(CString, Vec<u8>), Error> {
|
||||
let mut table = Vec::new();
|
||||
catalog_encode_u64(&mut table, self.entries.len() as u64)?;
|
||||
for entry in self.entries {
|
||||
Self::encode_entry(&mut table, &entry, start)?;
|
||||
}
|
||||
|
||||
let mut data = Vec::new();
|
||||
catalog_encode_u64(&mut data, table.len() as u64)?;
|
||||
data.extend_from_slice(&table);
|
||||
|
||||
Ok((self.name, data))
|
||||
}
|
||||
|
||||
fn parse<C: FnMut(CatalogEntryType, &[u8], u64, u64, i64) -> Result<bool, Error>>(
|
||||
data: &[u8],
|
||||
mut callback: C,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
let mut cursor = data;
|
||||
|
||||
let entries = catalog_decode_u64(&mut cursor)?;
|
||||
|
||||
let mut name_buf = vec![0u8; 4096];
|
||||
|
||||
for _ in 0..entries {
|
||||
|
||||
let mut buf = [ 0u8 ];
|
||||
cursor.read_exact(&mut buf)?;
|
||||
let etype = CatalogEntryType::try_from(buf[0])?;
|
||||
|
||||
let name_len = catalog_decode_u64(&mut cursor)? as usize;
|
||||
if name_len >= name_buf.len() {
|
||||
bail!("directory entry name too long ({} >= {})", name_len, name_buf.len());
|
||||
}
|
||||
let name = &mut name_buf[0..name_len];
|
||||
cursor.read_exact(name)?;
|
||||
|
||||
let cont = match etype {
|
||||
CatalogEntryType::Directory => {
|
||||
let offset = catalog_decode_u64(&mut cursor)?;
|
||||
callback(etype, name, offset, 0, 0)?
|
||||
}
|
||||
CatalogEntryType::File => {
|
||||
let size = catalog_decode_u64(&mut cursor)?;
|
||||
let mtime = catalog_decode_i64(&mut cursor)?;
|
||||
callback(etype, name, 0, size, mtime)?
|
||||
}
|
||||
_ => {
|
||||
callback(etype, name, 0, 0, 0)?
|
||||
}
|
||||
};
|
||||
if !cont {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if !cursor.is_empty() {
|
||||
bail!("unable to parse whole catalog data block");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Write small catalog files
|
||||
///
|
||||
/// A Catalogs simply contains list of files and directories
|
||||
/// (directory tree). They are use to find content without having to
|
||||
/// search the real archive (which may be large). For files, they
|
||||
/// include the last modification time and file size.
|
||||
pub struct CatalogWriter<W> {
|
||||
writer: W,
|
||||
dirstack: Vec<DirInfo>,
|
||||
pos: u64,
|
||||
}
|
||||
|
||||
impl <W: Write> CatalogWriter<W> {
|
||||
|
||||
/// Create a new CatalogWriter instance
|
||||
pub fn new(writer: W) -> Result<Self, Error> {
|
||||
let mut me = Self { writer, dirstack: vec![ DirInfo::new_rootdir() ], pos: 0 };
|
||||
me.write_all(&PROXMOX_CATALOG_FILE_MAGIC_1_0)?;
|
||||
Ok(me)
|
||||
}
|
||||
|
||||
fn write_all(&mut self, data: &[u8]) -> Result<(), Error> {
|
||||
self.writer.write_all(data)?;
|
||||
self.pos += u64::try_from(data.len())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finish writing, flush all data
|
||||
///
|
||||
/// This need to be called before drop.
|
||||
pub fn finish(&mut self) -> Result<(), Error> {
|
||||
if self.dirstack.len() != 1 {
|
||||
bail!("unable to finish catalog at level {}", self.dirstack.len());
|
||||
}
|
||||
|
||||
let dir = self.dirstack.pop().unwrap();
|
||||
|
||||
let start = self.pos;
|
||||
let (_, data) = dir.encode(start)?;
|
||||
self.write_all(&data)?;
|
||||
|
||||
self.write_all(&start.to_le_bytes())?;
|
||||
|
||||
self.writer.flush()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl <W: Write> BackupCatalogWriter for CatalogWriter<W> {
|
||||
|
||||
fn start_directory(&mut self, name: &CStr) -> Result<(), Error> {
|
||||
let new = DirInfo::new(name.to_owned());
|
||||
self.dirstack.push(new);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn end_directory(&mut self) -> Result<(), Error> {
|
||||
let (start, name) = match self.dirstack.pop() {
|
||||
Some(dir) => {
|
||||
let start = self.pos;
|
||||
let (name, data) = dir.encode(start)?;
|
||||
self.write_all(&data)?;
|
||||
(start, name)
|
||||
}
|
||||
None => {
|
||||
bail!("got unexpected end_directory level 0");
|
||||
}
|
||||
};
|
||||
|
||||
let current = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
|
||||
let name = name.to_bytes().to_vec();
|
||||
current.entries.push(DirEntry { name, attr: DirEntryAttribute::Directory { start } });
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_file(&mut self, name: &CStr, size: u64, mtime: i64) -> Result<(), Error> {
|
||||
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
|
||||
let name = name.to_bytes().to_vec();
|
||||
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::File { size, mtime } });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> {
|
||||
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
|
||||
let name = name.to_bytes().to_vec();
|
||||
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Symlink });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> {
|
||||
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
|
||||
let name = name.to_bytes().to_vec();
|
||||
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Hardlink });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> {
|
||||
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
|
||||
let name = name.to_bytes().to_vec();
|
||||
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::BlockDevice });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> {
|
||||
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
|
||||
let name = name.to_bytes().to_vec();
|
||||
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::CharDevice });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> {
|
||||
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
|
||||
let name = name.to_bytes().to_vec();
|
||||
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Fifo });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_socket(&mut self, name: &CStr) -> Result<(), Error> {
|
||||
let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?;
|
||||
let name = name.to_bytes().to_vec();
|
||||
dir.entries.push(DirEntry { name, attr: DirEntryAttribute::Socket });
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Read Catalog files
|
||||
pub struct CatalogReader<R> {
|
||||
reader: R,
|
||||
}
|
||||
|
||||
impl <R: Read + Seek> CatalogReader<R> {
|
||||
|
||||
/// Create a new CatalogReader instance
|
||||
pub fn new(reader: R) -> Self {
|
||||
Self { reader }
|
||||
}
|
||||
|
||||
/// Print whole catalog to stdout
|
||||
pub fn dump(&mut self) -> Result<(), Error> {
|
||||
|
||||
let root = self.root()?;
|
||||
match root {
|
||||
DirEntry { attr: DirEntryAttribute::Directory { start }, .. }=> {
|
||||
self.dump_dir(std::path::Path::new("./"), start)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the root DirEntry
|
||||
pub fn root(&mut self) -> Result<DirEntry, Error> {
|
||||
// Root dir is special
|
||||
self.reader.seek(SeekFrom::Start(0))?;
|
||||
let mut magic = [ 0u8; 8];
|
||||
self.reader.read_exact(&mut magic)?;
|
||||
if magic != PROXMOX_CATALOG_FILE_MAGIC_1_0 {
|
||||
bail!("got unexpected magic number for catalog");
|
||||
}
|
||||
self.reader.seek(SeekFrom::End(-8))?;
|
||||
let start = unsafe { self.reader.read_le_value::<u64>()? };
|
||||
Ok(DirEntry { name: b"".to_vec(), attr: DirEntryAttribute::Directory { start } })
|
||||
}
|
||||
|
||||
/// Read all directory entries
|
||||
pub fn read_dir(
|
||||
&mut self,
|
||||
parent: &DirEntry,
|
||||
) -> Result<Vec<DirEntry>, Error> {
|
||||
|
||||
let start = match parent.attr {
|
||||
DirEntryAttribute::Directory { start } => start,
|
||||
_ => bail!("parent is not a directory - internal error"),
|
||||
};
|
||||
|
||||
let data = self.read_raw_dirinfo_block(start)?;
|
||||
|
||||
let mut entry_list = Vec::new();
|
||||
|
||||
DirInfo::parse(&data, |etype, name, offset, size, mtime| {
|
||||
let entry = DirEntry::new(etype, name.to_vec(), start - offset, size, mtime);
|
||||
entry_list.push(entry);
|
||||
Ok(true)
|
||||
})?;
|
||||
|
||||
Ok(entry_list)
|
||||
}
|
||||
|
||||
/// Lookup a DirEntry from an absolute path
|
||||
pub fn lookup_recursive(
|
||||
&mut self,
|
||||
path: &[u8],
|
||||
) -> Result<DirEntry, Error> {
|
||||
let mut current = self.root()?;
|
||||
if path == b"/" {
|
||||
return Ok(current);
|
||||
}
|
||||
|
||||
let components = if !path.is_empty() && path[0] == b'/' {
|
||||
&path[1..]
|
||||
} else {
|
||||
path
|
||||
}.split(|c| *c == b'/');
|
||||
|
||||
for comp in components {
|
||||
if let Some(entry) = self.lookup(¤t, comp)? {
|
||||
current = entry;
|
||||
} else {
|
||||
bail!("path {:?} not found in catalog", String::from_utf8_lossy(&path));
|
||||
}
|
||||
}
|
||||
Ok(current)
|
||||
}
|
||||
|
||||
/// Lockup a DirEntry inside a parent directory
|
||||
pub fn lookup(
|
||||
&mut self,
|
||||
parent: &DirEntry,
|
||||
filename: &[u8],
|
||||
) -> Result<Option<DirEntry>, Error> {
|
||||
|
||||
let start = match parent.attr {
|
||||
DirEntryAttribute::Directory { start } => start,
|
||||
_ => bail!("parent is not a directory - internal error"),
|
||||
};
|
||||
|
||||
let data = self.read_raw_dirinfo_block(start)?;
|
||||
|
||||
let mut item = None;
|
||||
DirInfo::parse(&data, |etype, name, offset, size, mtime| {
|
||||
if name != filename {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let entry = DirEntry::new(etype, name.to_vec(), start - offset, size, mtime);
|
||||
item = Some(entry);
|
||||
Ok(false) // stop parsing
|
||||
})?;
|
||||
|
||||
Ok(item)
|
||||
}
|
||||
|
||||
/// Read the raw directory info block from current reader position.
|
||||
fn read_raw_dirinfo_block(&mut self, start: u64) -> Result<Vec<u8>, Error> {
|
||||
self.reader.seek(SeekFrom::Start(start))?;
|
||||
let size = catalog_decode_u64(&mut self.reader)?;
|
||||
if size < 1 { bail!("got small directory size {}", size) };
|
||||
let data = self.reader.read_exact_allocated(size as usize)?;
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
/// Print the content of a directory to stdout
|
||||
pub fn dump_dir(&mut self, prefix: &std::path::Path, start: u64) -> Result<(), Error> {
|
||||
|
||||
let data = self.read_raw_dirinfo_block(start)?;
|
||||
|
||||
DirInfo::parse(&data, |etype, name, offset, size, mtime| {
|
||||
|
||||
let mut path = std::path::PathBuf::from(prefix);
|
||||
let name: &OsStr = OsStrExt::from_bytes(name);
|
||||
path.push(name);
|
||||
|
||||
match etype {
|
||||
CatalogEntryType::Directory => {
|
||||
println!("{} {:?}", etype, path);
|
||||
if offset > start {
|
||||
bail!("got wrong directory offset ({} > {})", offset, start);
|
||||
}
|
||||
let pos = start - offset;
|
||||
self.dump_dir(&path, pos)?;
|
||||
}
|
||||
CatalogEntryType::File => {
|
||||
let mut mtime_string = mtime.to_string();
|
||||
if let Ok(s) = proxmox::tools::time::strftime_local("%FT%TZ", mtime as i64) {
|
||||
mtime_string = s;
|
||||
}
|
||||
|
||||
println!(
|
||||
"{} {:?} {} {}",
|
||||
etype,
|
||||
path,
|
||||
size,
|
||||
mtime_string,
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
println!("{} {:?}", etype, path);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
})
|
||||
}
|
||||
|
||||
/// Finds all entries matching the given match patterns and calls the
|
||||
/// provided callback on them.
|
||||
pub fn find(
|
||||
&mut self,
|
||||
parent: &DirEntry,
|
||||
file_path: &mut Vec<u8>,
|
||||
match_list: &impl MatchList, //&[MatchEntry],
|
||||
callback: &mut dyn FnMut(&[u8]) -> Result<(), Error>,
|
||||
) -> Result<(), Error> {
|
||||
let file_len = file_path.len();
|
||||
for e in self.read_dir(parent)? {
|
||||
let is_dir = e.is_directory();
|
||||
file_path.truncate(file_len);
|
||||
if !e.name.starts_with(b"/") {
|
||||
file_path.reserve(e.name.len() + 1);
|
||||
file_path.push(b'/');
|
||||
}
|
||||
file_path.extend(&e.name);
|
||||
match match_list.matches(&file_path, e.get_file_mode()) {
|
||||
Some(MatchType::Exclude) => continue,
|
||||
Some(MatchType::Include) => callback(&file_path)?,
|
||||
None => (),
|
||||
}
|
||||
if is_dir {
|
||||
self.find(&e, file_path, match_list, callback)?;
|
||||
}
|
||||
}
|
||||
file_path.truncate(file_len);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Serialize i64 as short, variable length byte sequence
|
||||
///
|
||||
/// Stores 7 bits per byte, Bit 8 indicates the end of the sequence (when not set).
|
||||
/// If the value is negative, we end with a zero byte (0x00).
|
||||
#[allow(clippy::neg_multiply)]
|
||||
pub fn catalog_encode_i64<W: Write>(writer: &mut W, v: i64) -> Result<(), Error> {
|
||||
let mut enc = Vec::new();
|
||||
|
||||
let mut d = if v < 0 {
|
||||
(-1 * (v + 1)) as u64 + 1 // also handles i64::MIN
|
||||
} else {
|
||||
v as u64
|
||||
};
|
||||
|
||||
loop {
|
||||
if d < 128 {
|
||||
if v < 0 {
|
||||
enc.push(128 | d as u8);
|
||||
enc.push(0u8);
|
||||
} else {
|
||||
enc.push(d as u8);
|
||||
}
|
||||
break;
|
||||
}
|
||||
enc.push((128 | (d & 127)) as u8);
|
||||
d >>= 7;
|
||||
}
|
||||
writer.write_all(&enc)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deserialize i64 from variable length byte sequence
|
||||
///
|
||||
/// We currently read maximal 11 bytes, which give a maximum of 70 bits + sign.
|
||||
/// this method is compatible with catalog_encode_u64 iff the
|
||||
/// value encoded is <= 2^63 (values > 2^63 cannot be represented in an i64)
|
||||
#[allow(clippy::neg_multiply)]
|
||||
pub fn catalog_decode_i64<R: Read>(reader: &mut R) -> Result<i64, Error> {
|
||||
|
||||
let mut v: u64 = 0;
|
||||
let mut buf = [0u8];
|
||||
|
||||
for i in 0..11 { // only allow 11 bytes (70 bits + sign marker)
|
||||
if buf.is_empty() {
|
||||
bail!("decode_i64 failed - unexpected EOB");
|
||||
}
|
||||
reader.read_exact(&mut buf)?;
|
||||
|
||||
let t = buf[0];
|
||||
|
||||
if t == 0 {
|
||||
if v == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
return Ok(((v - 1) as i64 * -1) - 1); // also handles i64::MIN
|
||||
} else if t < 128 {
|
||||
v |= (t as u64) << (i*7);
|
||||
return Ok(v as i64);
|
||||
} else {
|
||||
v |= ((t & 127) as u64) << (i*7);
|
||||
}
|
||||
}
|
||||
|
||||
bail!("decode_i64 failed - missing end marker");
|
||||
}
|
||||
|
||||
/// Serialize u64 as short, variable length byte sequence
|
||||
///
|
||||
/// Stores 7 bits per byte, Bit 8 indicates the end of the sequence (when not set).
|
||||
pub fn catalog_encode_u64<W: Write>(writer: &mut W, v: u64) -> Result<(), Error> {
|
||||
let mut enc = Vec::new();
|
||||
|
||||
let mut d = v;
|
||||
loop {
|
||||
if d < 128 {
|
||||
enc.push(d as u8);
|
||||
break;
|
||||
}
|
||||
enc.push((128 | (d & 127)) as u8);
|
||||
d >>= 7;
|
||||
}
|
||||
writer.write_all(&enc)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deserialize u64 from variable length byte sequence
|
||||
///
|
||||
/// We currently read maximal 10 bytes, which give a maximum of 70 bits,
|
||||
/// but we currently only encode up to 64 bits
|
||||
pub fn catalog_decode_u64<R: Read>(reader: &mut R) -> Result<u64, Error> {
|
||||
|
||||
let mut v: u64 = 0;
|
||||
let mut buf = [0u8];
|
||||
|
||||
for i in 0..10 { // only allow 10 bytes (70 bits)
|
||||
if buf.is_empty() {
|
||||
bail!("decode_u64 failed - unexpected EOB");
|
||||
}
|
||||
reader.read_exact(&mut buf)?;
|
||||
let t = buf[0];
|
||||
if t < 128 {
|
||||
v |= (t as u64) << (i*7);
|
||||
return Ok(v);
|
||||
} else {
|
||||
v |= ((t & 127) as u64) << (i*7);
|
||||
}
|
||||
}
|
||||
|
||||
bail!("decode_u64 failed - missing end marker");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_catalog_u64_encoder() {
|
||||
|
||||
fn test_encode_decode(value: u64) {
|
||||
|
||||
let mut data = Vec::new();
|
||||
catalog_encode_u64(&mut data, value).unwrap();
|
||||
|
||||
//println!("ENCODE {} {:?}", value, data);
|
||||
|
||||
let slice = &mut &data[..];
|
||||
let decoded = catalog_decode_u64(slice).unwrap();
|
||||
|
||||
//println!("DECODE {}", decoded);
|
||||
|
||||
assert!(decoded == value);
|
||||
}
|
||||
|
||||
test_encode_decode(u64::MIN);
|
||||
test_encode_decode(126);
|
||||
test_encode_decode((1<<12)-1);
|
||||
test_encode_decode((1<<20)-1);
|
||||
test_encode_decode((1<<50)-1);
|
||||
test_encode_decode(u64::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_catalog_i64_encoder() {
|
||||
|
||||
fn test_encode_decode(value: i64) {
|
||||
|
||||
let mut data = Vec::new();
|
||||
catalog_encode_i64(&mut data, value).unwrap();
|
||||
|
||||
let slice = &mut &data[..];
|
||||
let decoded = catalog_decode_i64(slice).unwrap();
|
||||
|
||||
assert!(decoded == value);
|
||||
}
|
||||
|
||||
test_encode_decode(0);
|
||||
test_encode_decode(-0);
|
||||
test_encode_decode(126);
|
||||
test_encode_decode(-126);
|
||||
test_encode_decode((1<<12)-1);
|
||||
test_encode_decode(-(1<<12)-1);
|
||||
test_encode_decode((1<<20)-1);
|
||||
test_encode_decode(-(1<<20)-1);
|
||||
test_encode_decode(i64::MIN);
|
||||
test_encode_decode(i64::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_catalog_i64_compatibility() {
|
||||
|
||||
fn test_encode_decode(value: u64) {
|
||||
|
||||
let mut data = Vec::new();
|
||||
catalog_encode_u64(&mut data, value).unwrap();
|
||||
|
||||
let slice = &mut &data[..];
|
||||
let decoded = catalog_decode_i64(slice).unwrap() as u64;
|
||||
|
||||
assert!(decoded == value);
|
||||
}
|
||||
|
||||
test_encode_decode(u64::MIN);
|
||||
test_encode_decode(126);
|
||||
test_encode_decode((1<<12)-1);
|
||||
test_encode_decode((1<<20)-1);
|
||||
test_encode_decode((1<<50)-1);
|
||||
test_encode_decode(u64::MAX);
|
||||
}
|
62
pbs-datastore/src/checksum_reader.rs
Normal file
62
pbs-datastore/src/checksum_reader.rs
Normal file
@ -0,0 +1,62 @@
|
||||
use anyhow::{Error};
|
||||
use std::sync::Arc;
|
||||
use std::io::Read;
|
||||
|
||||
use pbs_tools::borrow::Tied;
|
||||
|
||||
use super::CryptConfig;
|
||||
|
||||
pub struct ChecksumReader<R> {
|
||||
reader: R,
|
||||
hasher: crc32fast::Hasher,
|
||||
signer: Option<Tied<Arc<CryptConfig>, openssl::sign::Signer<'static>>>,
|
||||
}
|
||||
|
||||
impl <R: Read> ChecksumReader<R> {
|
||||
|
||||
pub fn new(reader: R, config: Option<Arc<CryptConfig>>) -> Self {
|
||||
let hasher = crc32fast::Hasher::new();
|
||||
let signer = match config {
|
||||
Some(config) => {
|
||||
let tied_signer = Tied::new(config, |config| {
|
||||
Box::new(unsafe { (*config).data_signer() })
|
||||
});
|
||||
Some(tied_signer)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
Self { reader, hasher, signer }
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> Result<(R, u32, Option<[u8; 32]>), Error> {
|
||||
let crc = self.hasher.finalize();
|
||||
|
||||
if let Some(ref mut signer) = self.signer {
|
||||
let mut tag = [0u8; 32];
|
||||
signer.sign(&mut tag)?;
|
||||
Ok((self.reader, crc, Some(tag)))
|
||||
} else {
|
||||
Ok((self.reader, crc, None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl <R: Read> Read for ChecksumReader<R> {
|
||||
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
let count = self.reader.read(buf)?;
|
||||
if count > 0 {
|
||||
self.hasher.update(&buf[..count]);
|
||||
if let Some(ref mut signer) = self.signer {
|
||||
signer.update(&buf[..count])
|
||||
.map_err(|err| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("hmac update failed - {}", err))
|
||||
})?;
|
||||
}
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
}
|
63
pbs-datastore/src/checksum_writer.rs
Normal file
63
pbs-datastore/src/checksum_writer.rs
Normal file
@ -0,0 +1,63 @@
|
||||
use std::sync::Arc;
|
||||
use std::io::Write;
|
||||
|
||||
use anyhow::{Error};
|
||||
|
||||
use pbs_tools::borrow::Tied;
|
||||
|
||||
use super::CryptConfig;
|
||||
|
||||
pub struct ChecksumWriter<W> {
|
||||
writer: W,
|
||||
hasher: crc32fast::Hasher,
|
||||
signer: Option<Tied<Arc<CryptConfig>, openssl::sign::Signer<'static>>>,
|
||||
}
|
||||
|
||||
impl <W: Write> ChecksumWriter<W> {
|
||||
|
||||
pub fn new(writer: W, config: Option<Arc<CryptConfig>>) -> Self {
|
||||
let hasher = crc32fast::Hasher::new();
|
||||
let signer = match config {
|
||||
Some(config) => {
|
||||
let tied_signer = Tied::new(config, |config| {
|
||||
Box::new(unsafe { (*config).data_signer() })
|
||||
});
|
||||
Some(tied_signer)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
Self { writer, hasher, signer }
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> Result<(W, u32, Option<[u8; 32]>), Error> {
|
||||
let crc = self.hasher.finalize();
|
||||
|
||||
if let Some(ref mut signer) = self.signer {
|
||||
let mut tag = [0u8; 32];
|
||||
signer.sign(&mut tag)?;
|
||||
Ok((self.writer, crc, Some(tag)))
|
||||
} else {
|
||||
Ok((self.writer, crc, None))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl <W: Write> Write for ChecksumWriter<W> {
|
||||
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||
self.hasher.update(buf);
|
||||
if let Some(ref mut signer) = self.signer {
|
||||
signer.update(buf)
|
||||
.map_err(|err| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("hmac update failed - {}", err))
|
||||
})?;
|
||||
}
|
||||
self.writer.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> Result<(), std::io::Error> {
|
||||
self.writer.flush()
|
||||
}
|
||||
}
|
271
pbs-datastore/src/chunker.rs
Normal file
271
pbs-datastore/src/chunker.rs
Normal file
@ -0,0 +1,271 @@
|
||||
/// Note: window size 32 or 64, is faster because we can
|
||||
/// speedup modulo operations, but always computes hash 0
|
||||
/// for constant data streams .. 0,0,0,0,0,0
|
||||
/// so we use a modified the chunk boundary test too not
|
||||
/// use hash value 0 to detect a boundary.
|
||||
const CA_CHUNKER_WINDOW_SIZE: usize = 64;
|
||||
|
||||
/// Sliding window chunker (Buzhash)
|
||||
///
|
||||
/// This is a rewrite of *casync* chunker (cachunker.h) in rust.
|
||||
///
|
||||
/// Hashing by cyclic polynomial (also called Buzhash) has the benefit
|
||||
/// of avoiding multiplications, using barrel shifts instead. For more
|
||||
/// information please take a look at the [Rolling
|
||||
/// Hash](https://en.wikipedia.org/wiki/Rolling_hash) article from
|
||||
/// Wikipedia.
|
||||
|
||||
pub struct Chunker {
|
||||
h: u32,
|
||||
window_size: usize,
|
||||
chunk_size: usize,
|
||||
|
||||
chunk_size_min: usize,
|
||||
chunk_size_max: usize,
|
||||
_chunk_size_avg: usize,
|
||||
|
||||
_discriminator: u32,
|
||||
|
||||
break_test_mask: u32,
|
||||
break_test_minimum: u32,
|
||||
|
||||
window: [u8; CA_CHUNKER_WINDOW_SIZE],
|
||||
}
|
||||
|
||||
const BUZHASH_TABLE: [u32; 256] = [
|
||||
0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801,
|
||||
0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494,
|
||||
0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa, 0x882bf287, 0x3116737c, 0x05569956, 0xe8cc1f68,
|
||||
0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7, 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93,
|
||||
0x9bfd7c64, 0x0b3e7276, 0xf2688e77, 0x8fad8abc, 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00,
|
||||
0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2, 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2,
|
||||
0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e, 0x7b7c222f, 0x2955ed16, 0x9f10ca59, 0xe840c4c9,
|
||||
0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c, 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5,
|
||||
0xb19165cd, 0x9891c393, 0x325384ac, 0x0308459d, 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed,
|
||||
0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae, 0x977eb18c, 0xd8770976, 0x9833466a, 0xc674df7f,
|
||||
0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874, 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4,
|
||||
0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493, 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85,
|
||||
0xbef8f0e1, 0x21d73653, 0x4e3d977a, 0x1e7b3929, 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6,
|
||||
0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617, 0xf5f7be70, 0xe795248a, 0x375a2fe9, 0x425570b6,
|
||||
0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f, 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f,
|
||||
0x1bc0dfb5, 0xfb273589, 0x0ea47f7b, 0x3c1c2b50, 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2,
|
||||
0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c, 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b,
|
||||
0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729, 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, 0xe0d8f8ae,
|
||||
0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328, 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f,
|
||||
0x686a5b83, 0x50e072e5, 0xd9d3bb2a, 0x8befc475, 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18,
|
||||
0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09, 0xc0d0a81c, 0x7fa3429b, 0xe9158a1b, 0x225ea19a,
|
||||
0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293, 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140,
|
||||
0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661, 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9,
|
||||
0x19727a23, 0x15a7e374, 0xc43a18d5, 0x3fb1aa73, 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9,
|
||||
0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2, 0x5388e5ee, 0xcd8a7510, 0xf901b4fd, 0xdbc13dbc,
|
||||
0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49, 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b,
|
||||
0x32baf4a9, 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a, 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c,
|
||||
0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1, 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319,
|
||||
0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1, 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, 0xc6eb57bb,
|
||||
0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c, 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b,
|
||||
0x329e5388, 0x91dd236b, 0x2ecb0d93, 0xf4d82a3d, 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964,
|
||||
0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11,
|
||||
];
|
||||
|
||||
impl Chunker {
|
||||
/// Create a new Chunker instance, which produces and average
|
||||
/// chunk size of `chunk_size_avg` (need to be a power of two). We
|
||||
/// allow variation from `chunk_size_avg/4` up to a maximum of
|
||||
/// `chunk_size_avg*4`.
|
||||
pub fn new(chunk_size_avg: usize) -> Self {
|
||||
// The chunk cut discriminator. In order to get an average
|
||||
// chunk size of avg, we cut whenever for a hash value "h" at
|
||||
// byte "i" given the descriminator "d(avg)": h(i) mod d(avg)
|
||||
// == d(avg) - 1. Note that the discriminator calculated like
|
||||
// this only yields correct results as long as the minimal
|
||||
// chunk size is picked as avg/4, and the maximum chunk size
|
||||
// as avg*4. If they are picked differently the result might
|
||||
// be skewed into either direction.
|
||||
let avg = chunk_size_avg as f64;
|
||||
let discriminator = (avg / (-1.42888852e-7 * avg + 1.33237515)) as u32;
|
||||
|
||||
if chunk_size_avg.count_ones() != 1 {
|
||||
panic!("got unexpected chunk size - not a power of two.");
|
||||
}
|
||||
|
||||
let break_test_mask = (chunk_size_avg * 2 - 1) as u32;
|
||||
let break_test_minimum = break_test_mask - 2;
|
||||
|
||||
Self {
|
||||
h: 0,
|
||||
window_size: 0,
|
||||
chunk_size: 0,
|
||||
chunk_size_min: chunk_size_avg >> 2,
|
||||
chunk_size_max: chunk_size_avg << 2,
|
||||
_chunk_size_avg: chunk_size_avg,
|
||||
_discriminator: discriminator,
|
||||
break_test_mask,
|
||||
break_test_minimum,
|
||||
window: [0u8; CA_CHUNKER_WINDOW_SIZE],
|
||||
}
|
||||
}
|
||||
|
||||
/// Scans the specified data for a chunk border. Returns 0 if none
|
||||
/// was found (and the function should be called with more data
|
||||
/// later on), or another value indicating the position of a
|
||||
/// border.
|
||||
pub fn scan(&mut self, data: &[u8]) -> usize {
|
||||
let window_len = self.window.len();
|
||||
let data_len = data.len();
|
||||
|
||||
let mut pos = 0;
|
||||
|
||||
if self.window_size < window_len {
|
||||
let need = window_len - self.window_size;
|
||||
let copy_len = if need < data_len { need } else { data_len };
|
||||
|
||||
for _i in 0..copy_len {
|
||||
let byte = data[pos];
|
||||
self.window[self.window_size] = byte;
|
||||
self.h = self.h.rotate_left(1) ^ BUZHASH_TABLE[byte as usize];
|
||||
pos += 1;
|
||||
self.window_size += 1;
|
||||
}
|
||||
|
||||
self.chunk_size += copy_len;
|
||||
|
||||
// return if window is still not full
|
||||
if self.window_size < window_len {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
//let mut idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
|
||||
let mut idx = self.chunk_size & 0x3f;
|
||||
|
||||
while pos < data_len {
|
||||
// roll window
|
||||
let enter = data[pos];
|
||||
let leave = self.window[idx];
|
||||
self.h = self.h.rotate_left(1) ^
|
||||
//BUZHASH_TABLE[leave as usize].rotate_left(CA_CHUNKER_WINDOW_SIZE as u32) ^
|
||||
BUZHASH_TABLE[leave as usize] ^
|
||||
BUZHASH_TABLE[enter as usize];
|
||||
|
||||
self.chunk_size += 1;
|
||||
pos += 1;
|
||||
|
||||
self.window[idx] = enter;
|
||||
|
||||
if self.shall_break() {
|
||||
self.h = 0;
|
||||
self.chunk_size = 0;
|
||||
self.window_size = 0;
|
||||
return pos;
|
||||
}
|
||||
|
||||
//idx = self.chunk_size % CA_CHUNKER_WINDOW_SIZE;
|
||||
idx = self.chunk_size & 0x3f;
|
||||
//idx += 1; if idx >= CA_CHUNKER_WINDOW_SIZE { idx = 0 };
|
||||
}
|
||||
|
||||
0
|
||||
}
|
||||
|
||||
// fast implementation avoiding modulo
|
||||
// #[inline(always)]
|
||||
fn shall_break(&self) -> bool {
|
||||
if self.chunk_size >= self.chunk_size_max {
|
||||
return true;
|
||||
}
|
||||
|
||||
if self.chunk_size < self.chunk_size_min {
|
||||
return false;
|
||||
}
|
||||
|
||||
//(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!!
|
||||
|
||||
//(self.h & self.break_test_mask) <= 2 // Bad on 0 streams
|
||||
|
||||
(self.h & self.break_test_mask) >= self.break_test_minimum
|
||||
}
|
||||
|
||||
// This is the original implementation from casync
|
||||
/*
|
||||
#[inline(always)]
|
||||
fn shall_break_orig(&self) -> bool {
|
||||
|
||||
if self.chunk_size >= self.chunk_size_max { return true; }
|
||||
|
||||
if self.chunk_size < self.chunk_size_min { return false; }
|
||||
|
||||
(self.h % self.discriminator) == (self.discriminator - 1)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_chunker1() {
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
for i in 0..(256 * 1024) {
|
||||
for j in 0..4 {
|
||||
let byte = ((i >> (j << 3)) & 0xff) as u8;
|
||||
buffer.push(byte);
|
||||
}
|
||||
}
|
||||
let mut chunker = Chunker::new(64 * 1024);
|
||||
|
||||
let mut pos = 0;
|
||||
let mut last = 0;
|
||||
|
||||
let mut chunks1: Vec<(usize, usize)> = vec![];
|
||||
let mut chunks2: Vec<(usize, usize)> = vec![];
|
||||
|
||||
// test1: feed single bytes
|
||||
while pos < buffer.len() {
|
||||
let k = chunker.scan(&buffer[pos..pos + 1]);
|
||||
pos += 1;
|
||||
if k != 0 {
|
||||
let prev = last;
|
||||
last = pos;
|
||||
chunks1.push((prev, pos - prev));
|
||||
}
|
||||
}
|
||||
chunks1.push((last, buffer.len() - last));
|
||||
|
||||
let mut chunker = Chunker::new(64 * 1024);
|
||||
|
||||
let mut pos = 0;
|
||||
|
||||
// test2: feed with whole buffer
|
||||
while pos < buffer.len() {
|
||||
let k = chunker.scan(&buffer[pos..]);
|
||||
if k != 0 {
|
||||
chunks2.push((pos, k));
|
||||
pos += k;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
chunks2.push((pos, buffer.len() - pos));
|
||||
|
||||
if chunks1 != chunks2 {
|
||||
let mut size1 = 0;
|
||||
for (_offset, len) in &chunks1 {
|
||||
size1 += len;
|
||||
}
|
||||
println!("Chunks1:{}\n{:?}\n", size1, chunks1);
|
||||
|
||||
let mut size2 = 0;
|
||||
for (_offset, len) in &chunks2 {
|
||||
size2 += len;
|
||||
}
|
||||
println!("Chunks2:{}\n{:?}\n", size2, chunks2);
|
||||
|
||||
if size1 != 256 * 4 * 1024 {
|
||||
panic!("wrong size for chunks1");
|
||||
}
|
||||
if size2 != 256 * 4 * 1024 {
|
||||
panic!("wrong size for chunks2");
|
||||
}
|
||||
|
||||
panic!("got different chunks");
|
||||
}
|
||||
}
|
265
pbs-datastore/src/crypt_config.rs
Normal file
265
pbs-datastore/src/crypt_config.rs
Normal file
@ -0,0 +1,265 @@
|
||||
//! Wrappers for OpenSSL crypto functions
|
||||
//!
|
||||
//! We use this to encrypt and decrypt data chunks. Cipher is
|
||||
//! AES_256_GCM, which is fast and provides authenticated encryption.
|
||||
//!
|
||||
//! See the Wikipedia Artikel for [Authenticated
|
||||
//! encryption](https://en.wikipedia.org/wiki/Authenticated_encryption)
|
||||
//! for a short introduction.
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::io::Write;
|
||||
|
||||
use anyhow::{Error};
|
||||
use openssl::hash::MessageDigest;
|
||||
use openssl::pkcs5::pbkdf2_hmac;
|
||||
use openssl::symm::{decrypt_aead, Cipher, Crypter, Mode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use proxmox::api::api;
|
||||
|
||||
use pbs_tools::format::{as_fingerprint, bytes_as_fingerprint};
|
||||
|
||||
// openssl::sha::sha256(b"Proxmox Backup Encryption Key Fingerprint")
|
||||
/// This constant is used to compute fingerprints.
|
||||
const FINGERPRINT_INPUT: [u8; 32] = [
|
||||
110, 208, 239, 119, 71, 31, 255, 77,
|
||||
85, 199, 168, 254, 74, 157, 182, 33,
|
||||
97, 64, 127, 19, 76, 114, 93, 223,
|
||||
48, 153, 45, 37, 236, 69, 237, 38,
|
||||
];
|
||||
#[api(default: "encrypt")]
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
/// Defines whether data is encrypted (using an AEAD cipher), only signed, or neither.
|
||||
pub enum CryptMode {
|
||||
/// Don't encrypt.
|
||||
None,
|
||||
/// Encrypt.
|
||||
Encrypt,
|
||||
/// Only sign.
|
||||
SignOnly,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Hash, Clone, Deserialize, Serialize)]
|
||||
#[serde(transparent)]
|
||||
/// 32-byte fingerprint, usually calculated with SHA256.
|
||||
pub struct Fingerprint {
|
||||
#[serde(with = "bytes_as_fingerprint")]
|
||||
bytes: [u8; 32],
|
||||
}
|
||||
|
||||
impl Fingerprint {
|
||||
pub fn new(bytes: [u8; 32]) -> Self {
|
||||
Self { bytes }
|
||||
}
|
||||
pub fn bytes(&self) -> &[u8; 32] {
|
||||
&self.bytes
|
||||
}
|
||||
}
|
||||
|
||||
/// Display as short key ID
|
||||
impl Display for Fingerprint {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", as_fingerprint(&self.bytes[0..8]))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for Fingerprint {
|
||||
type Err = Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Error> {
|
||||
let mut tmp = s.to_string();
|
||||
tmp.retain(|c| c != ':');
|
||||
let bytes = proxmox::tools::hex_to_digest(&tmp)?;
|
||||
Ok(Fingerprint::new(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
/// Encryption Configuration with secret key
|
||||
///
|
||||
/// This structure stores the secret key and provides helpers for
|
||||
/// authenticated encryption.
|
||||
pub struct CryptConfig {
|
||||
// the Cipher
|
||||
cipher: Cipher,
|
||||
// A secrect key use to provide the chunk digest name space.
|
||||
id_key: [u8; 32],
|
||||
// Openssl hmac PKey of id_key
|
||||
id_pkey: openssl::pkey::PKey<openssl::pkey::Private>,
|
||||
// The private key used by the cipher.
|
||||
enc_key: [u8; 32],
|
||||
}
|
||||
|
||||
impl CryptConfig {
|
||||
|
||||
/// Create a new instance.
|
||||
///
|
||||
/// We compute a derived 32 byte key using pbkdf2_hmac. This second
|
||||
/// key is used in compute_digest.
|
||||
pub fn new(enc_key: [u8; 32]) -> Result<Self, Error> {
|
||||
|
||||
let mut id_key = [0u8; 32];
|
||||
|
||||
pbkdf2_hmac(
|
||||
&enc_key,
|
||||
b"_id_key",
|
||||
10,
|
||||
MessageDigest::sha256(),
|
||||
&mut id_key)?;
|
||||
|
||||
let id_pkey = openssl::pkey::PKey::hmac(&id_key).unwrap();
|
||||
|
||||
Ok(Self { id_key, id_pkey, enc_key, cipher: Cipher::aes_256_gcm() })
|
||||
}
|
||||
|
||||
/// Expose Cipher
|
||||
pub fn cipher(&self) -> &Cipher {
|
||||
&self.cipher
|
||||
}
|
||||
|
||||
/// Compute a chunk digest using a secret name space.
|
||||
///
|
||||
/// Computes an SHA256 checksum over some secret data (derived
|
||||
/// from the secret key) and the provided data. This ensures that
|
||||
/// chunk digest values do not clash with values computed for
|
||||
/// other sectret keys.
|
||||
pub fn compute_digest(&self, data: &[u8]) -> [u8; 32] {
|
||||
let mut hasher = openssl::sha::Sha256::new();
|
||||
hasher.update(data);
|
||||
hasher.update(&self.id_key); // at the end, to avoid length extensions attacks
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
pub fn data_signer(&self) -> openssl::sign::Signer {
|
||||
openssl::sign::Signer::new(MessageDigest::sha256(), &self.id_pkey).unwrap()
|
||||
}
|
||||
|
||||
/// Compute authentication tag (hmac/sha256)
|
||||
///
|
||||
/// Computes an SHA256 HMAC using some secret data (derived
|
||||
/// from the secret key) and the provided data.
|
||||
pub fn compute_auth_tag(&self, data: &[u8]) -> [u8; 32] {
|
||||
let mut signer = self.data_signer();
|
||||
signer.update(data).unwrap();
|
||||
let mut tag = [0u8; 32];
|
||||
signer.sign(&mut tag).unwrap();
|
||||
tag
|
||||
}
|
||||
|
||||
pub fn fingerprint(&self) -> Fingerprint {
|
||||
Fingerprint::new(self.compute_digest(&FINGERPRINT_INPUT))
|
||||
}
|
||||
|
||||
pub fn data_crypter(&self, iv: &[u8; 16], mode: Mode) -> Result<Crypter, Error> {
|
||||
let mut crypter = openssl::symm::Crypter::new(self.cipher, mode, &self.enc_key, Some(iv))?;
|
||||
crypter.aad_update(b"")?; //??
|
||||
Ok(crypter)
|
||||
}
|
||||
|
||||
/// Encrypt data using a random 16 byte IV.
|
||||
///
|
||||
/// Writes encrypted data to ``output``, Return the used IV and computed MAC.
|
||||
pub fn encrypt_to<W: Write>(
|
||||
&self,
|
||||
data: &[u8],
|
||||
mut output: W,
|
||||
) -> Result<([u8;16], [u8;16]), Error> {
|
||||
|
||||
let mut iv = [0u8; 16];
|
||||
proxmox::sys::linux::fill_with_random_data(&mut iv)?;
|
||||
|
||||
let mut tag = [0u8; 16];
|
||||
|
||||
let mut c = self.data_crypter(&iv, Mode::Encrypt)?;
|
||||
|
||||
const BUFFER_SIZE: usize = 32*1024;
|
||||
|
||||
let mut encr_buf = [0u8; BUFFER_SIZE];
|
||||
let max_encoder_input = BUFFER_SIZE - self.cipher.block_size();
|
||||
|
||||
let mut start = 0;
|
||||
loop {
|
||||
let mut end = start + max_encoder_input;
|
||||
if end > data.len() { end = data.len(); }
|
||||
if end > start {
|
||||
let count = c.update(&data[start..end], &mut encr_buf)?;
|
||||
output.write_all(&encr_buf[..count])?;
|
||||
start = end;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let rest = c.finalize(&mut encr_buf)?;
|
||||
if rest > 0 { output.write_all(&encr_buf[..rest])?; }
|
||||
|
||||
output.flush()?;
|
||||
|
||||
c.get_tag(&mut tag)?;
|
||||
|
||||
Ok((iv, tag))
|
||||
}
|
||||
|
||||
/// Decompress and decrypt data, verify MAC.
|
||||
pub fn decode_compressed_chunk(
|
||||
&self,
|
||||
data: &[u8],
|
||||
iv: &[u8; 16],
|
||||
tag: &[u8; 16],
|
||||
) -> Result<Vec<u8>, Error> {
|
||||
|
||||
let dec = Vec::with_capacity(1024*1024);
|
||||
|
||||
let mut decompressor = zstd::stream::write::Decoder::new(dec)?;
|
||||
|
||||
let mut c = self.data_crypter(iv, Mode::Decrypt)?;
|
||||
|
||||
const BUFFER_SIZE: usize = 32*1024;
|
||||
|
||||
let mut decr_buf = [0u8; BUFFER_SIZE];
|
||||
let max_decoder_input = BUFFER_SIZE - self.cipher.block_size();
|
||||
|
||||
let mut start = 0;
|
||||
loop {
|
||||
let mut end = start + max_decoder_input;
|
||||
if end > data.len() { end = data.len(); }
|
||||
if end > start {
|
||||
let count = c.update(&data[start..end], &mut decr_buf)?;
|
||||
decompressor.write_all(&decr_buf[0..count])?;
|
||||
start = end;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
c.set_tag(tag)?;
|
||||
let rest = c.finalize(&mut decr_buf)?;
|
||||
if rest > 0 { decompressor.write_all(&decr_buf[..rest])?; }
|
||||
|
||||
decompressor.flush()?;
|
||||
|
||||
Ok(decompressor.into_inner())
|
||||
}
|
||||
|
||||
/// Decrypt data, verify tag.
|
||||
pub fn decode_uncompressed_chunk(
|
||||
&self,
|
||||
data: &[u8],
|
||||
iv: &[u8; 16],
|
||||
tag: &[u8; 16],
|
||||
) -> Result<Vec<u8>, Error> {
|
||||
|
||||
let decr_data = decrypt_aead(
|
||||
self.cipher,
|
||||
&self.enc_key,
|
||||
Some(iv),
|
||||
b"", //??
|
||||
data,
|
||||
tag,
|
||||
)?;
|
||||
|
||||
Ok(decr_data)
|
||||
}
|
||||
}
|
90
pbs-datastore/src/crypt_reader.rs
Normal file
90
pbs-datastore/src/crypt_reader.rs
Normal file
@ -0,0 +1,90 @@
|
||||
use std::sync::Arc;
|
||||
use std::io::{Read, BufRead};
|
||||
|
||||
use anyhow::{bail, Error};
|
||||
|
||||
use super::CryptConfig;
|
||||
|
||||
pub struct CryptReader<R> {
|
||||
reader: R,
|
||||
small_read_buf: Vec<u8>,
|
||||
block_size: usize,
|
||||
crypter: openssl::symm::Crypter,
|
||||
finalized: bool,
|
||||
}
|
||||
|
||||
impl <R: BufRead> CryptReader<R> {
|
||||
|
||||
pub fn new(reader: R, iv: [u8; 16], tag: [u8; 16], config: Arc<CryptConfig>) -> Result<Self, Error> {
|
||||
let block_size = config.cipher().block_size(); // Note: block size is normally 1 byte for stream ciphers
|
||||
if block_size.count_ones() != 1 || block_size > 512 {
|
||||
bail!("unexpected Cipher block size {}", block_size);
|
||||
}
|
||||
let mut crypter = config.data_crypter(&iv, openssl::symm::Mode::Decrypt)?;
|
||||
crypter.set_tag(&tag)?;
|
||||
|
||||
Ok(Self { reader, crypter, block_size, finalized: false, small_read_buf: Vec::new() })
|
||||
}
|
||||
|
||||
pub fn finish(self) -> Result<R, Error> {
|
||||
if !self.finalized {
|
||||
bail!("CryptReader not successfully finalized.");
|
||||
}
|
||||
Ok(self.reader)
|
||||
}
|
||||
}
|
||||
|
||||
impl <R: BufRead> Read for CryptReader<R> {
|
||||
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
if !self.small_read_buf.is_empty() {
|
||||
let max = if self.small_read_buf.len() > buf.len() { buf.len() } else { self.small_read_buf.len() };
|
||||
let rest = self.small_read_buf.split_off(max);
|
||||
buf[..max].copy_from_slice(&self.small_read_buf);
|
||||
self.small_read_buf = rest;
|
||||
return Ok(max);
|
||||
}
|
||||
|
||||
let data = self.reader.fill_buf()?;
|
||||
|
||||
// handle small read buffers
|
||||
if buf.len() <= 2*self.block_size {
|
||||
let mut outbuf = [0u8; 1024];
|
||||
|
||||
let count = if data.is_empty() { // EOF
|
||||
let written = self.crypter.finalize(&mut outbuf)?;
|
||||
self.finalized = true;
|
||||
written
|
||||
} else {
|
||||
let mut read_size = outbuf.len() - self.block_size;
|
||||
if read_size > data.len() {
|
||||
read_size = data.len();
|
||||
}
|
||||
let written = self.crypter.update(&data[..read_size], &mut outbuf)?;
|
||||
self.reader.consume(read_size);
|
||||
written
|
||||
};
|
||||
|
||||
if count > buf.len() {
|
||||
buf.copy_from_slice(&outbuf[..buf.len()]);
|
||||
self.small_read_buf = outbuf[buf.len()..count].to_vec();
|
||||
Ok(buf.len())
|
||||
} else {
|
||||
buf[..count].copy_from_slice(&outbuf[..count]);
|
||||
Ok(count)
|
||||
}
|
||||
} else if data.is_empty() { // EOF
|
||||
let rest = self.crypter.finalize(buf)?;
|
||||
self.finalized = true;
|
||||
Ok(rest)
|
||||
} else {
|
||||
let mut read_size = buf.len() - self.block_size;
|
||||
if read_size > data.len() {
|
||||
read_size = data.len();
|
||||
}
|
||||
let count = self.crypter.update(&data[..read_size], buf)?;
|
||||
self.reader.consume(read_size);
|
||||
Ok(count)
|
||||
}
|
||||
}
|
||||
}
|
65
pbs-datastore/src/crypt_writer.rs
Normal file
65
pbs-datastore/src/crypt_writer.rs
Normal file
@ -0,0 +1,65 @@
|
||||
use std::sync::Arc;
|
||||
use std::io::Write;
|
||||
|
||||
use anyhow::Error;
|
||||
|
||||
use super::CryptConfig;
|
||||
|
||||
pub struct CryptWriter<W> {
|
||||
writer: W,
|
||||
block_size: usize,
|
||||
encr_buf: Box<[u8; 64*1024]>,
|
||||
iv: [u8; 16],
|
||||
crypter: openssl::symm::Crypter,
|
||||
}
|
||||
|
||||
impl <W: Write> CryptWriter<W> {
|
||||
|
||||
pub fn new(writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> {
|
||||
let mut iv = [0u8; 16];
|
||||
proxmox::sys::linux::fill_with_random_data(&mut iv)?;
|
||||
let block_size = config.cipher().block_size();
|
||||
|
||||
let crypter = config.data_crypter(&iv, openssl::symm::Mode::Encrypt)?;
|
||||
|
||||
Ok(Self { writer, iv, crypter, block_size, encr_buf: Box::new([0u8; 64*1024]) })
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> Result<(W, [u8; 16], [u8; 16]), Error> {
|
||||
let rest = self.crypter.finalize(self.encr_buf.as_mut())?;
|
||||
if rest > 0 {
|
||||
self.writer.write_all(&self.encr_buf[..rest])?;
|
||||
}
|
||||
|
||||
self.writer.flush()?;
|
||||
|
||||
let mut tag = [0u8; 16];
|
||||
self.crypter.get_tag(&mut tag)?;
|
||||
|
||||
Ok((self.writer, self.iv, tag))
|
||||
}
|
||||
}
|
||||
|
||||
impl <W: Write> Write for CryptWriter<W> {
|
||||
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||
let mut write_size = buf.len();
|
||||
if write_size > (self.encr_buf.len() - self.block_size) {
|
||||
write_size = self.encr_buf.len() - self.block_size;
|
||||
}
|
||||
let count = self.crypter.update(&buf[..write_size], self.encr_buf.as_mut())
|
||||
.map_err(|err| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("crypter update failed - {}", err))
|
||||
})?;
|
||||
|
||||
self.writer.write_all(&self.encr_buf[..count])?;
|
||||
|
||||
Ok(write_size)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> Result<(), std::io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
421
pbs-datastore/src/data_blob.rs
Normal file
421
pbs-datastore/src/data_blob.rs
Normal file
@ -0,0 +1,421 @@
|
||||
use std::convert::TryInto;
|
||||
|
||||
use anyhow::{bail, Error};
|
||||
|
||||
use proxmox::tools::io::{ReadExt, WriteExt};
|
||||
|
||||
use super::file_formats::*;
|
||||
use super::{CryptConfig, CryptMode};
|
||||
|
||||
const MAX_BLOB_SIZE: usize = 128*1024*1024;
|
||||
|
||||
/// Encoded data chunk with digest and positional information
|
||||
pub struct ChunkInfo {
|
||||
pub chunk: DataBlob,
|
||||
pub digest: [u8; 32],
|
||||
pub chunk_len: u64,
|
||||
pub offset: u64,
|
||||
}
|
||||
|
||||
/// Data blob binary storage format
|
||||
///
|
||||
/// Data blobs store arbitrary binary data (< 128MB), and can be
|
||||
/// compressed and encrypted (or just signed). A simply binary format
|
||||
/// is used to store them on disk or transfer them over the network.
|
||||
///
|
||||
/// Please use index files to store large data files (".fidx" of
|
||||
/// ".didx").
|
||||
///
|
||||
pub struct DataBlob {
|
||||
raw_data: Vec<u8>, // tagged, compressed, encryped data
|
||||
}
|
||||
|
||||
impl DataBlob {
|
||||
|
||||
/// accessor to raw_data field
|
||||
pub fn raw_data(&self) -> &[u8] {
|
||||
&self.raw_data
|
||||
}
|
||||
|
||||
/// Returns raw_data size
|
||||
pub fn raw_size(&self) -> u64 {
|
||||
self.raw_data.len() as u64
|
||||
}
|
||||
|
||||
/// Consume self and returns raw_data
|
||||
pub fn into_inner(self) -> Vec<u8> {
|
||||
self.raw_data
|
||||
}
|
||||
|
||||
/// accessor to chunk type (magic number)
|
||||
pub fn magic(&self) -> &[u8; 8] {
|
||||
self.raw_data[0..8].try_into().unwrap()
|
||||
}
|
||||
|
||||
/// accessor to crc32 checksum
|
||||
pub fn crc(&self) -> u32 {
|
||||
let crc_o = proxmox::offsetof!(DataBlobHeader, crc);
|
||||
u32::from_le_bytes(self.raw_data[crc_o..crc_o+4].try_into().unwrap())
|
||||
}
|
||||
|
||||
// set the CRC checksum field
|
||||
pub fn set_crc(&mut self, crc: u32) {
|
||||
let crc_o = proxmox::offsetof!(DataBlobHeader, crc);
|
||||
self.raw_data[crc_o..crc_o+4].copy_from_slice(&crc.to_le_bytes());
|
||||
}
|
||||
|
||||
/// compute the CRC32 checksum
|
||||
pub fn compute_crc(&self) -> u32 {
|
||||
let mut hasher = crc32fast::Hasher::new();
|
||||
let start = header_size(self.magic()); // start after HEAD
|
||||
hasher.update(&self.raw_data[start..]);
|
||||
hasher.finalize()
|
||||
}
|
||||
|
||||
// verify the CRC32 checksum
|
||||
pub fn verify_crc(&self) -> Result<(), Error> {
|
||||
let expected_crc = self.compute_crc();
|
||||
if expected_crc != self.crc() {
|
||||
bail!("Data blob has wrong CRC checksum.");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a DataBlob, optionally compressed and/or encrypted
|
||||
pub fn encode(
|
||||
data: &[u8],
|
||||
config: Option<&CryptConfig>,
|
||||
compress: bool,
|
||||
) -> Result<Self, Error> {
|
||||
|
||||
if data.len() > MAX_BLOB_SIZE {
|
||||
bail!("data blob too large ({} bytes).", data.len());
|
||||
}
|
||||
|
||||
let mut blob = if let Some(config) = config {
|
||||
|
||||
let compr_data;
|
||||
let (_compress, data, magic) = if compress {
|
||||
compr_data = zstd::block::compress(data, 1)?;
|
||||
// Note: We only use compression if result is shorter
|
||||
if compr_data.len() < data.len() {
|
||||
(true, &compr_data[..], ENCR_COMPR_BLOB_MAGIC_1_0)
|
||||
} else {
|
||||
(false, data, ENCRYPTED_BLOB_MAGIC_1_0)
|
||||
}
|
||||
} else {
|
||||
(false, data, ENCRYPTED_BLOB_MAGIC_1_0)
|
||||
};
|
||||
|
||||
let header_len = std::mem::size_of::<EncryptedDataBlobHeader>();
|
||||
let mut raw_data = Vec::with_capacity(data.len() + header_len);
|
||||
|
||||
let dummy_head = EncryptedDataBlobHeader {
|
||||
head: DataBlobHeader { magic: [0u8; 8], crc: [0; 4] },
|
||||
iv: [0u8; 16],
|
||||
tag: [0u8; 16],
|
||||
};
|
||||
unsafe {
|
||||
raw_data.write_le_value(dummy_head)?;
|
||||
}
|
||||
|
||||
let (iv, tag) = config.encrypt_to(data, &mut raw_data)?;
|
||||
|
||||
let head = EncryptedDataBlobHeader {
|
||||
head: DataBlobHeader { magic, crc: [0; 4] }, iv, tag,
|
||||
};
|
||||
|
||||
unsafe {
|
||||
(&mut raw_data[0..header_len]).write_le_value(head)?;
|
||||
}
|
||||
|
||||
DataBlob { raw_data }
|
||||
} else {
|
||||
|
||||
let max_data_len = data.len() + std::mem::size_of::<DataBlobHeader>();
|
||||
if compress {
|
||||
let mut comp_data = Vec::with_capacity(max_data_len);
|
||||
|
||||
let head = DataBlobHeader {
|
||||
magic: COMPRESSED_BLOB_MAGIC_1_0,
|
||||
crc: [0; 4],
|
||||
};
|
||||
unsafe {
|
||||
comp_data.write_le_value(head)?;
|
||||
}
|
||||
|
||||
zstd::stream::copy_encode(data, &mut comp_data, 1)?;
|
||||
|
||||
if comp_data.len() < max_data_len {
|
||||
let mut blob = DataBlob { raw_data: comp_data };
|
||||
blob.set_crc(blob.compute_crc());
|
||||
return Ok(blob);
|
||||
}
|
||||
}
|
||||
|
||||
let mut raw_data = Vec::with_capacity(max_data_len);
|
||||
|
||||
let head = DataBlobHeader {
|
||||
magic: UNCOMPRESSED_BLOB_MAGIC_1_0,
|
||||
crc: [0; 4],
|
||||
};
|
||||
unsafe {
|
||||
raw_data.write_le_value(head)?;
|
||||
}
|
||||
raw_data.extend_from_slice(data);
|
||||
|
||||
DataBlob { raw_data }
|
||||
};
|
||||
|
||||
blob.set_crc(blob.compute_crc());
|
||||
|
||||
Ok(blob)
|
||||
}
|
||||
|
||||
/// Get the encryption mode for this blob.
|
||||
pub fn crypt_mode(&self) -> Result<CryptMode, Error> {
|
||||
let magic = self.magic();
|
||||
|
||||
Ok(if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 || magic == &COMPRESSED_BLOB_MAGIC_1_0 {
|
||||
CryptMode::None
|
||||
} else if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 {
|
||||
CryptMode::Encrypt
|
||||
} else {
|
||||
bail!("Invalid blob magic number.");
|
||||
})
|
||||
}
|
||||
|
||||
/// Decode blob data
|
||||
pub fn decode(&self, config: Option<&CryptConfig>, digest: Option<&[u8; 32]>) -> Result<Vec<u8>, Error> {
|
||||
|
||||
let magic = self.magic();
|
||||
|
||||
if magic == &UNCOMPRESSED_BLOB_MAGIC_1_0 {
|
||||
let data_start = std::mem::size_of::<DataBlobHeader>();
|
||||
let data = self.raw_data[data_start..].to_vec();
|
||||
if let Some(digest) = digest {
|
||||
Self::verify_digest(&data, None, digest)?;
|
||||
}
|
||||
Ok(data)
|
||||
} else if magic == &COMPRESSED_BLOB_MAGIC_1_0 {
|
||||
let data_start = std::mem::size_of::<DataBlobHeader>();
|
||||
let mut reader = &self.raw_data[data_start..];
|
||||
let data = zstd::stream::decode_all(&mut reader)?;
|
||||
// zstd::block::decompress is abou 10% slower
|
||||
// let data = zstd::block::decompress(&self.raw_data[data_start..], MAX_BLOB_SIZE)?;
|
||||
if let Some(digest) = digest {
|
||||
Self::verify_digest(&data, None, digest)?;
|
||||
}
|
||||
Ok(data)
|
||||
} else if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 {
|
||||
let header_len = std::mem::size_of::<EncryptedDataBlobHeader>();
|
||||
let head = unsafe {
|
||||
(&self.raw_data[..header_len]).read_le_value::<EncryptedDataBlobHeader>()?
|
||||
};
|
||||
|
||||
if let Some(config) = config {
|
||||
let data = if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 {
|
||||
config.decode_compressed_chunk(&self.raw_data[header_len..], &head.iv, &head.tag)?
|
||||
} else {
|
||||
config.decode_uncompressed_chunk(&self.raw_data[header_len..], &head.iv, &head.tag)?
|
||||
};
|
||||
if let Some(digest) = digest {
|
||||
Self::verify_digest(&data, Some(config), digest)?;
|
||||
}
|
||||
Ok(data)
|
||||
} else {
|
||||
bail!("unable to decrypt blob - missing CryptConfig");
|
||||
}
|
||||
} else {
|
||||
bail!("Invalid blob magic number.");
|
||||
}
|
||||
}
|
||||
|
||||
/// Load blob from ``reader``, verify CRC
|
||||
pub fn load_from_reader(reader: &mut dyn std::io::Read) -> Result<Self, Error> {
|
||||
|
||||
let mut data = Vec::with_capacity(1024*1024);
|
||||
reader.read_to_end(&mut data)?;
|
||||
|
||||
let blob = Self::from_raw(data)?;
|
||||
|
||||
blob.verify_crc()?;
|
||||
|
||||
Ok(blob)
|
||||
}
|
||||
|
||||
/// Create Instance from raw data
|
||||
pub fn from_raw(data: Vec<u8>) -> Result<Self, Error> {
|
||||
|
||||
if data.len() < std::mem::size_of::<DataBlobHeader>() {
|
||||
bail!("blob too small ({} bytes).", data.len());
|
||||
}
|
||||
|
||||
let magic = &data[0..8];
|
||||
|
||||
if magic == ENCR_COMPR_BLOB_MAGIC_1_0 || magic == ENCRYPTED_BLOB_MAGIC_1_0 {
|
||||
|
||||
if data.len() < std::mem::size_of::<EncryptedDataBlobHeader>() {
|
||||
bail!("encrypted blob too small ({} bytes).", data.len());
|
||||
}
|
||||
|
||||
let blob = DataBlob { raw_data: data };
|
||||
|
||||
Ok(blob)
|
||||
} else if magic == COMPRESSED_BLOB_MAGIC_1_0 || magic == UNCOMPRESSED_BLOB_MAGIC_1_0 {
|
||||
|
||||
let blob = DataBlob { raw_data: data };
|
||||
|
||||
Ok(blob)
|
||||
} else {
|
||||
bail!("unable to parse raw blob - wrong magic");
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns if chunk is encrypted
|
||||
pub fn is_encrypted(&self) -> bool {
|
||||
let magic = self.magic();
|
||||
magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0
|
||||
}
|
||||
|
||||
/// Verify digest and data length for unencrypted chunks.
|
||||
///
|
||||
/// To do that, we need to decompress data first. Please note that
|
||||
/// this is not possible for encrypted chunks. This function simply return Ok
|
||||
/// for encrypted chunks.
|
||||
/// Note: This does not call verify_crc, because this is usually done in load
|
||||
pub fn verify_unencrypted(
|
||||
&self,
|
||||
expected_chunk_size: usize,
|
||||
expected_digest: &[u8; 32],
|
||||
) -> Result<(), Error> {
|
||||
|
||||
let magic = self.magic();
|
||||
|
||||
if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// verifies digest!
|
||||
let data = self.decode(None, Some(expected_digest))?;
|
||||
|
||||
if expected_chunk_size != data.len() {
|
||||
bail!("detected chunk with wrong length ({} != {})", expected_chunk_size, data.len());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn verify_digest(
|
||||
data: &[u8],
|
||||
config: Option<&CryptConfig>,
|
||||
expected_digest: &[u8; 32],
|
||||
) -> Result<(), Error> {
|
||||
|
||||
let digest = match config {
|
||||
Some(config) => config.compute_digest(data),
|
||||
None => openssl::sha::sha256(data),
|
||||
};
|
||||
if &digest != expected_digest {
|
||||
bail!("detected chunk with wrong digest.");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for chunk DataBlobs
|
||||
///
|
||||
/// Main purpose is to centralize digest computation. Digest
|
||||
/// computation differ for encryped chunk, and this interface ensures that
|
||||
/// we always compute the correct one.
|
||||
pub struct DataChunkBuilder<'a, 'b> {
|
||||
config: Option<&'b CryptConfig>,
|
||||
orig_data: &'a [u8],
|
||||
digest_computed: bool,
|
||||
digest: [u8; 32],
|
||||
compress: bool,
|
||||
}
|
||||
|
||||
impl <'a, 'b> DataChunkBuilder<'a, 'b> {
|
||||
|
||||
/// Create a new builder instance.
|
||||
pub fn new(orig_data: &'a [u8]) -> Self {
|
||||
Self {
|
||||
orig_data,
|
||||
config: None,
|
||||
digest_computed: false,
|
||||
digest: [0u8; 32],
|
||||
compress: true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set compression flag.
|
||||
///
|
||||
/// If true, chunk data is compressed using zstd (level 1).
|
||||
pub fn compress(mut self, value: bool) -> Self {
|
||||
self.compress = value;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set encryption Configuration
|
||||
///
|
||||
/// If set, chunks are encrypted
|
||||
pub fn crypt_config(mut self, value: &'b CryptConfig) -> Self {
|
||||
if self.digest_computed {
|
||||
panic!("unable to set crypt_config after compute_digest().");
|
||||
}
|
||||
self.config = Some(value);
|
||||
self
|
||||
}
|
||||
|
||||
fn compute_digest(&mut self) {
|
||||
if !self.digest_computed {
|
||||
if let Some(ref config) = self.config {
|
||||
self.digest = config.compute_digest(self.orig_data);
|
||||
} else {
|
||||
self.digest = openssl::sha::sha256(self.orig_data);
|
||||
}
|
||||
self.digest_computed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the chunk Digest
|
||||
///
|
||||
/// Note: For encrypted chunks, this needs to be called after
|
||||
/// ``crypt_config``.
|
||||
pub fn digest(&mut self) -> &[u8; 32] {
|
||||
if !self.digest_computed {
|
||||
self.compute_digest();
|
||||
}
|
||||
&self.digest
|
||||
}
|
||||
|
||||
/// Consume self and build the ``DataBlob``.
|
||||
///
|
||||
/// Returns the blob and the computet digest.
|
||||
pub fn build(mut self) -> Result<(DataBlob, [u8; 32]), Error> {
|
||||
if !self.digest_computed {
|
||||
self.compute_digest();
|
||||
}
|
||||
|
||||
let chunk = DataBlob::encode(self.orig_data, self.config, self.compress)?;
|
||||
Ok((chunk, self.digest))
|
||||
}
|
||||
|
||||
/// Create a chunk filled with zeroes
|
||||
pub fn build_zero_chunk(
|
||||
crypt_config: Option<&CryptConfig>,
|
||||
chunk_size: usize,
|
||||
compress: bool,
|
||||
) -> Result<(DataBlob, [u8; 32]), Error> {
|
||||
let zero_bytes = vec![0; chunk_size];
|
||||
let mut chunk_builder = DataChunkBuilder::new(&zero_bytes).compress(compress);
|
||||
if let Some(ref crypt_config) = crypt_config {
|
||||
chunk_builder = chunk_builder.crypt_config(crypt_config);
|
||||
}
|
||||
|
||||
chunk_builder.build()
|
||||
}
|
||||
|
||||
}
|
177
pbs-datastore/src/data_blob_reader.rs
Normal file
177
pbs-datastore/src/data_blob_reader.rs
Normal file
@ -0,0 +1,177 @@
|
||||
use std::io::{BufReader, Read};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use proxmox::tools::io::ReadExt;
|
||||
|
||||
use crate::checksum_reader::ChecksumReader;
|
||||
use crate::crypt_config::CryptConfig;
|
||||
use crate::crypt_reader::CryptReader;
|
||||
use crate::file_formats::{self, DataBlobHeader};
|
||||
|
||||
enum BlobReaderState<'reader, R: Read> {
|
||||
Uncompressed {
|
||||
expected_crc: u32,
|
||||
csum_reader: ChecksumReader<R>,
|
||||
},
|
||||
Compressed {
|
||||
expected_crc: u32,
|
||||
decompr: zstd::stream::read::Decoder<'reader, BufReader<ChecksumReader<R>>>,
|
||||
},
|
||||
Encrypted {
|
||||
expected_crc: u32,
|
||||
decrypt_reader: CryptReader<BufReader<ChecksumReader<R>>>,
|
||||
},
|
||||
EncryptedCompressed {
|
||||
expected_crc: u32,
|
||||
decompr: zstd::stream::read::Decoder<
|
||||
'reader,
|
||||
BufReader<CryptReader<BufReader<ChecksumReader<R>>>>,
|
||||
>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Read data blobs
|
||||
pub struct DataBlobReader<'reader, R: Read> {
|
||||
state: BlobReaderState<'reader, R>,
|
||||
}
|
||||
|
||||
// zstd_safe::DCtx is not sync but we are, since
|
||||
// the only public interface is on mutable reference
|
||||
unsafe impl<R: Read> Sync for DataBlobReader<'_, R> {}
|
||||
|
||||
impl<R: Read> DataBlobReader<'_, R> {
|
||||
pub fn new(mut reader: R, config: Option<Arc<CryptConfig>>) -> Result<Self, Error> {
|
||||
let head: DataBlobHeader = unsafe { reader.read_le_value()? };
|
||||
match head.magic {
|
||||
file_formats::UNCOMPRESSED_BLOB_MAGIC_1_0 => {
|
||||
let expected_crc = u32::from_le_bytes(head.crc);
|
||||
let csum_reader = ChecksumReader::new(reader, None);
|
||||
Ok(Self {
|
||||
state: BlobReaderState::Uncompressed {
|
||||
expected_crc,
|
||||
csum_reader,
|
||||
},
|
||||
})
|
||||
}
|
||||
file_formats::COMPRESSED_BLOB_MAGIC_1_0 => {
|
||||
let expected_crc = u32::from_le_bytes(head.crc);
|
||||
let csum_reader = ChecksumReader::new(reader, None);
|
||||
|
||||
let decompr = zstd::stream::read::Decoder::new(csum_reader)?;
|
||||
Ok(Self {
|
||||
state: BlobReaderState::Compressed {
|
||||
expected_crc,
|
||||
decompr,
|
||||
},
|
||||
})
|
||||
}
|
||||
file_formats::ENCRYPTED_BLOB_MAGIC_1_0 => {
|
||||
let config = config
|
||||
.ok_or_else(|| format_err!("unable to read encrypted blob without key"))?;
|
||||
let expected_crc = u32::from_le_bytes(head.crc);
|
||||
let mut iv = [0u8; 16];
|
||||
let mut expected_tag = [0u8; 16];
|
||||
reader.read_exact(&mut iv)?;
|
||||
reader.read_exact(&mut expected_tag)?;
|
||||
let csum_reader = ChecksumReader::new(reader, None);
|
||||
let decrypt_reader = CryptReader::new(
|
||||
BufReader::with_capacity(64 * 1024, csum_reader),
|
||||
iv,
|
||||
expected_tag,
|
||||
config,
|
||||
)?;
|
||||
Ok(Self {
|
||||
state: BlobReaderState::Encrypted {
|
||||
expected_crc,
|
||||
decrypt_reader,
|
||||
},
|
||||
})
|
||||
}
|
||||
file_formats::ENCR_COMPR_BLOB_MAGIC_1_0 => {
|
||||
let config = config
|
||||
.ok_or_else(|| format_err!("unable to read encrypted blob without key"))?;
|
||||
let expected_crc = u32::from_le_bytes(head.crc);
|
||||
let mut iv = [0u8; 16];
|
||||
let mut expected_tag = [0u8; 16];
|
||||
reader.read_exact(&mut iv)?;
|
||||
reader.read_exact(&mut expected_tag)?;
|
||||
let csum_reader = ChecksumReader::new(reader, None);
|
||||
let decrypt_reader = CryptReader::new(
|
||||
BufReader::with_capacity(64 * 1024, csum_reader),
|
||||
iv,
|
||||
expected_tag,
|
||||
config,
|
||||
)?;
|
||||
let decompr = zstd::stream::read::Decoder::new(decrypt_reader)?;
|
||||
Ok(Self {
|
||||
state: BlobReaderState::EncryptedCompressed {
|
||||
expected_crc,
|
||||
decompr,
|
||||
},
|
||||
})
|
||||
}
|
||||
_ => bail!("got wrong magic number {:?}", head.magic),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finish(self) -> Result<R, Error> {
|
||||
match self.state {
|
||||
BlobReaderState::Uncompressed {
|
||||
csum_reader,
|
||||
expected_crc,
|
||||
} => {
|
||||
let (reader, crc, _) = csum_reader.finish()?;
|
||||
if crc != expected_crc {
|
||||
bail!("blob crc check failed");
|
||||
}
|
||||
Ok(reader)
|
||||
}
|
||||
BlobReaderState::Compressed {
|
||||
expected_crc,
|
||||
decompr,
|
||||
} => {
|
||||
let csum_reader = decompr.finish().into_inner();
|
||||
let (reader, crc, _) = csum_reader.finish()?;
|
||||
if crc != expected_crc {
|
||||
bail!("blob crc check failed");
|
||||
}
|
||||
Ok(reader)
|
||||
}
|
||||
BlobReaderState::Encrypted {
|
||||
expected_crc,
|
||||
decrypt_reader,
|
||||
} => {
|
||||
let csum_reader = decrypt_reader.finish()?.into_inner();
|
||||
let (reader, crc, _) = csum_reader.finish()?;
|
||||
if crc != expected_crc {
|
||||
bail!("blob crc check failed");
|
||||
}
|
||||
Ok(reader)
|
||||
}
|
||||
BlobReaderState::EncryptedCompressed {
|
||||
expected_crc,
|
||||
decompr,
|
||||
} => {
|
||||
let decrypt_reader = decompr.finish().into_inner();
|
||||
let csum_reader = decrypt_reader.finish()?.into_inner();
|
||||
let (reader, crc, _) = csum_reader.finish()?;
|
||||
if crc != expected_crc {
|
||||
bail!("blob crc check failed");
|
||||
}
|
||||
Ok(reader)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Read for DataBlobReader<'_, R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
match &mut self.state {
|
||||
BlobReaderState::Uncompressed { csum_reader, .. } => csum_reader.read(buf),
|
||||
BlobReaderState::Compressed { decompr, .. } => decompr.read(buf),
|
||||
BlobReaderState::Encrypted { decrypt_reader, .. } => decrypt_reader.read(buf),
|
||||
BlobReaderState::EncryptedCompressed { decompr, .. } => decompr.read(buf),
|
||||
}
|
||||
}
|
||||
}
|
209
pbs-datastore/src/data_blob_writer.rs
Normal file
209
pbs-datastore/src/data_blob_writer.rs
Normal file
@ -0,0 +1,209 @@
|
||||
use anyhow::Error;
|
||||
use proxmox::tools::io::WriteExt;
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::checksum_writer::ChecksumWriter;
|
||||
use crate::crypt_config::CryptConfig;
|
||||
use crate::crypt_writer::CryptWriter;
|
||||
use crate::file_formats::{self, DataBlobHeader, EncryptedDataBlobHeader};
|
||||
|
||||
enum BlobWriterState<'writer, W: Write> {
|
||||
Uncompressed {
|
||||
csum_writer: ChecksumWriter<W>,
|
||||
},
|
||||
Compressed {
|
||||
compr: zstd::stream::write::Encoder<'writer, ChecksumWriter<W>>,
|
||||
},
|
||||
Encrypted {
|
||||
crypt_writer: CryptWriter<ChecksumWriter<W>>,
|
||||
},
|
||||
EncryptedCompressed {
|
||||
compr: zstd::stream::write::Encoder<'writer, CryptWriter<ChecksumWriter<W>>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Data blob writer
|
||||
pub struct DataBlobWriter<'writer, W: Write> {
|
||||
state: BlobWriterState<'writer, W>,
|
||||
}
|
||||
|
||||
impl<W: Write + Seek> DataBlobWriter<'_, W> {
|
||||
pub fn new_uncompressed(mut writer: W) -> Result<Self, Error> {
|
||||
writer.seek(SeekFrom::Start(0))?;
|
||||
let head = DataBlobHeader {
|
||||
magic: file_formats::UNCOMPRESSED_BLOB_MAGIC_1_0,
|
||||
crc: [0; 4],
|
||||
};
|
||||
unsafe {
|
||||
writer.write_le_value(head)?;
|
||||
}
|
||||
let csum_writer = ChecksumWriter::new(writer, None);
|
||||
Ok(Self {
|
||||
state: BlobWriterState::Uncompressed { csum_writer },
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_compressed(mut writer: W) -> Result<Self, Error> {
|
||||
writer.seek(SeekFrom::Start(0))?;
|
||||
let head = DataBlobHeader {
|
||||
magic: file_formats::COMPRESSED_BLOB_MAGIC_1_0,
|
||||
crc: [0; 4],
|
||||
};
|
||||
unsafe {
|
||||
writer.write_le_value(head)?;
|
||||
}
|
||||
let csum_writer = ChecksumWriter::new(writer, None);
|
||||
let compr = zstd::stream::write::Encoder::new(csum_writer, 1)?;
|
||||
Ok(Self {
|
||||
state: BlobWriterState::Compressed { compr },
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_encrypted(mut writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> {
|
||||
writer.seek(SeekFrom::Start(0))?;
|
||||
let head = EncryptedDataBlobHeader {
|
||||
head: DataBlobHeader {
|
||||
magic: file_formats::ENCRYPTED_BLOB_MAGIC_1_0,
|
||||
crc: [0; 4],
|
||||
},
|
||||
iv: [0u8; 16],
|
||||
tag: [0u8; 16],
|
||||
};
|
||||
unsafe {
|
||||
writer.write_le_value(head)?;
|
||||
}
|
||||
|
||||
let csum_writer = ChecksumWriter::new(writer, None);
|
||||
let crypt_writer = CryptWriter::new(csum_writer, config)?;
|
||||
Ok(Self {
|
||||
state: BlobWriterState::Encrypted { crypt_writer },
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_encrypted_compressed(
|
||||
mut writer: W,
|
||||
config: Arc<CryptConfig>,
|
||||
) -> Result<Self, Error> {
|
||||
writer.seek(SeekFrom::Start(0))?;
|
||||
let head = EncryptedDataBlobHeader {
|
||||
head: DataBlobHeader {
|
||||
magic: file_formats::ENCR_COMPR_BLOB_MAGIC_1_0,
|
||||
crc: [0; 4],
|
||||
},
|
||||
iv: [0u8; 16],
|
||||
tag: [0u8; 16],
|
||||
};
|
||||
unsafe {
|
||||
writer.write_le_value(head)?;
|
||||
}
|
||||
|
||||
let csum_writer = ChecksumWriter::new(writer, None);
|
||||
let crypt_writer = CryptWriter::new(csum_writer, config)?;
|
||||
let compr = zstd::stream::write::Encoder::new(crypt_writer, 1)?;
|
||||
Ok(Self {
|
||||
state: BlobWriterState::EncryptedCompressed { compr },
|
||||
})
|
||||
}
|
||||
|
||||
pub fn finish(self) -> Result<W, Error> {
|
||||
match self.state {
|
||||
BlobWriterState::Uncompressed { csum_writer } => {
|
||||
// write CRC
|
||||
let (mut writer, crc, _) = csum_writer.finish()?;
|
||||
let head = DataBlobHeader {
|
||||
magic: file_formats::UNCOMPRESSED_BLOB_MAGIC_1_0,
|
||||
crc: crc.to_le_bytes(),
|
||||
};
|
||||
|
||||
writer.seek(SeekFrom::Start(0))?;
|
||||
unsafe {
|
||||
writer.write_le_value(head)?;
|
||||
}
|
||||
|
||||
Ok(writer)
|
||||
}
|
||||
BlobWriterState::Compressed { compr } => {
|
||||
let csum_writer = compr.finish()?;
|
||||
let (mut writer, crc, _) = csum_writer.finish()?;
|
||||
|
||||
let head = DataBlobHeader {
|
||||
magic: file_formats::COMPRESSED_BLOB_MAGIC_1_0,
|
||||
crc: crc.to_le_bytes(),
|
||||
};
|
||||
|
||||
writer.seek(SeekFrom::Start(0))?;
|
||||
unsafe {
|
||||
writer.write_le_value(head)?;
|
||||
}
|
||||
|
||||
Ok(writer)
|
||||
}
|
||||
BlobWriterState::Encrypted { crypt_writer } => {
|
||||
let (csum_writer, iv, tag) = crypt_writer.finish()?;
|
||||
let (mut writer, crc, _) = csum_writer.finish()?;
|
||||
|
||||
let head = EncryptedDataBlobHeader {
|
||||
head: DataBlobHeader {
|
||||
magic: file_formats::ENCRYPTED_BLOB_MAGIC_1_0,
|
||||
crc: crc.to_le_bytes(),
|
||||
},
|
||||
iv,
|
||||
tag,
|
||||
};
|
||||
writer.seek(SeekFrom::Start(0))?;
|
||||
unsafe {
|
||||
writer.write_le_value(head)?;
|
||||
}
|
||||
Ok(writer)
|
||||
}
|
||||
BlobWriterState::EncryptedCompressed { compr } => {
|
||||
let crypt_writer = compr.finish()?;
|
||||
let (csum_writer, iv, tag) = crypt_writer.finish()?;
|
||||
let (mut writer, crc, _) = csum_writer.finish()?;
|
||||
|
||||
let head = EncryptedDataBlobHeader {
|
||||
head: DataBlobHeader {
|
||||
magic: file_formats::ENCR_COMPR_BLOB_MAGIC_1_0,
|
||||
crc: crc.to_le_bytes(),
|
||||
},
|
||||
iv,
|
||||
tag,
|
||||
};
|
||||
writer.seek(SeekFrom::Start(0))?;
|
||||
unsafe {
|
||||
writer.write_le_value(head)?;
|
||||
}
|
||||
Ok(writer)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: Write + Seek> Write for DataBlobWriter<'_, W> {
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||
match self.state {
|
||||
BlobWriterState::Uncompressed {
|
||||
ref mut csum_writer,
|
||||
} => csum_writer.write(buf),
|
||||
BlobWriterState::Compressed { ref mut compr } => compr.write(buf),
|
||||
BlobWriterState::Encrypted {
|
||||
ref mut crypt_writer,
|
||||
} => crypt_writer.write(buf),
|
||||
BlobWriterState::EncryptedCompressed { ref mut compr } => compr.write(buf),
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> Result<(), std::io::Error> {
|
||||
match self.state {
|
||||
BlobWriterState::Uncompressed {
|
||||
ref mut csum_writer,
|
||||
} => csum_writer.flush(),
|
||||
BlobWriterState::Compressed { ref mut compr } => compr.flush(),
|
||||
BlobWriterState::Encrypted {
|
||||
ref mut crypt_writer,
|
||||
} => crypt_writer.flush(),
|
||||
BlobWriterState::EncryptedCompressed { ref mut compr } => compr.flush(),
|
||||
}
|
||||
}
|
||||
}
|
73
pbs-datastore/src/file_formats.rs
Normal file
73
pbs-datastore/src/file_formats.rs
Normal file
@ -0,0 +1,73 @@
|
||||
use endian_trait::Endian;
|
||||
|
||||
// WARNING: PLEASE DO NOT MODIFY THOSE MAGIC VALUES
|
||||
|
||||
// openssl::sha::sha256(b"Proxmox Backup Catalog file v1.0")[0..8]
|
||||
pub const PROXMOX_CATALOG_FILE_MAGIC_1_0: [u8; 8] = [145, 253, 96, 249, 196, 103, 88, 213];
|
||||
|
||||
// openssl::sha::sha256(b"Proxmox Backup uncompressed blob v1.0")[0..8]
|
||||
pub const UNCOMPRESSED_BLOB_MAGIC_1_0: [u8; 8] = [66, 171, 56, 7, 190, 131, 112, 161];
|
||||
|
||||
//openssl::sha::sha256(b"Proxmox Backup zstd compressed blob v1.0")[0..8]
|
||||
pub const COMPRESSED_BLOB_MAGIC_1_0: [u8; 8] = [49, 185, 88, 66, 111, 182, 163, 127];
|
||||
|
||||
// openssl::sha::sha256(b"Proxmox Backup encrypted blob v1.0")[0..8]
|
||||
pub const ENCRYPTED_BLOB_MAGIC_1_0: [u8; 8] = [123, 103, 133, 190, 34, 45, 76, 240];
|
||||
|
||||
// openssl::sha::sha256(b"Proxmox Backup zstd compressed encrypted blob v1.0")[0..8]
|
||||
pub const ENCR_COMPR_BLOB_MAGIC_1_0: [u8; 8] = [230, 89, 27, 191, 11, 191, 216, 11];
|
||||
|
||||
// openssl::sha::sha256(b"Proxmox Backup fixed sized chunk index v1.0")[0..8]
|
||||
pub const FIXED_SIZED_CHUNK_INDEX_1_0: [u8; 8] = [47, 127, 65, 237, 145, 253, 15, 205];
|
||||
|
||||
// openssl::sha::sha256(b"Proxmox Backup dynamic sized chunk index v1.0")[0..8]
|
||||
pub const DYNAMIC_SIZED_CHUNK_INDEX_1_0: [u8; 8] = [28, 145, 78, 165, 25, 186, 179, 205];
|
||||
|
||||
/// Data blob binary storage format
|
||||
///
|
||||
/// The format start with a 8 byte magic number to identify the type,
|
||||
/// followed by a 4 byte CRC. This CRC is used on the server side to
|
||||
/// detect file corruption (computed when upload data), so there is
|
||||
/// usually no need to compute it on the client side.
|
||||
///
|
||||
/// Unencrypted blobs simply contain the CRC, followed by the
|
||||
/// (compressed) data.
|
||||
///
|
||||
/// (MAGIC || CRC32 || Data)
|
||||
///
|
||||
/// This is basically the same format we use for chunks, but
|
||||
/// with other magic numbers so that we can distinguish them.
|
||||
#[derive(Endian)]
|
||||
#[repr(C,packed)]
|
||||
pub struct DataBlobHeader {
|
||||
pub magic: [u8; 8],
|
||||
pub crc: [u8; 4],
|
||||
}
|
||||
|
||||
/// Encrypted data blob binary storage format
|
||||
///
|
||||
/// The ``DataBlobHeader`` for encrypted blobs additionally contains
|
||||
/// a 16 byte IV, followed by a 16 byte Authenticated Encyrypten (AE)
|
||||
/// tag, followed by the encrypted data:
|
||||
///
|
||||
/// (MAGIC || CRC32 || IV || TAG || EncryptedData).
|
||||
#[derive(Endian)]
|
||||
#[repr(C,packed)]
|
||||
pub struct EncryptedDataBlobHeader {
|
||||
pub head: DataBlobHeader,
|
||||
pub iv: [u8; 16],
|
||||
pub tag: [u8; 16],
|
||||
}
|
||||
|
||||
/// Header size for different file types
|
||||
///
|
||||
/// Panics on unknown magic numbers.
|
||||
pub fn header_size(magic: &[u8; 8]) -> usize {
|
||||
match *magic {
|
||||
UNCOMPRESSED_BLOB_MAGIC_1_0 => std::mem::size_of::<DataBlobHeader>(),
|
||||
COMPRESSED_BLOB_MAGIC_1_0 => std::mem::size_of::<DataBlobHeader>(),
|
||||
ENCRYPTED_BLOB_MAGIC_1_0 => std::mem::size_of::<EncryptedDataBlobHeader>(),
|
||||
ENCR_COMPR_BLOB_MAGIC_1_0 => std::mem::size_of::<EncryptedDataBlobHeader>(),
|
||||
_ => panic!("unknown blob magic"),
|
||||
}
|
||||
}
|
65
pbs-datastore/src/index.rs
Normal file
65
pbs-datastore/src/index.rs
Normal file
@ -0,0 +1,65 @@
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Range;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ChunkReadInfo {
|
||||
pub range: Range<u64>,
|
||||
pub digest: [u8; 32],
|
||||
}
|
||||
|
||||
impl ChunkReadInfo {
|
||||
#[inline]
|
||||
pub fn size(&self) -> u64 {
|
||||
self.range.end - self.range.start
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait to get digest list from index files
|
||||
///
|
||||
/// To allow easy iteration over all used chunks.
|
||||
pub trait IndexFile {
|
||||
fn index_count(&self) -> usize;
|
||||
fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
|
||||
fn index_bytes(&self) -> u64;
|
||||
fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
|
||||
fn index_ctime(&self) -> i64;
|
||||
fn index_size(&self) -> usize;
|
||||
|
||||
/// Get the chunk index and the relative offset within it for a byte offset
|
||||
fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
|
||||
|
||||
/// Compute index checksum and size
|
||||
fn compute_csum(&self) -> ([u8; 32], u64);
|
||||
|
||||
/// Returns most often used chunks
|
||||
fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> {
|
||||
let mut map = HashMap::new();
|
||||
|
||||
for pos in 0..self.index_count() {
|
||||
let digest = self.index_digest(pos).unwrap();
|
||||
|
||||
let count = map.entry(*digest).or_insert(0);
|
||||
*count += 1;
|
||||
}
|
||||
|
||||
let mut most_used = Vec::new();
|
||||
|
||||
for (digest, count) in map {
|
||||
if count <= 1 { continue; }
|
||||
match most_used.binary_search_by_key(&count, |&(_digest, count)| count) {
|
||||
Ok(p) => most_used.insert(p, (digest, count)),
|
||||
Err(p) => most_used.insert(p, (digest, count)),
|
||||
}
|
||||
|
||||
if most_used.len() > max { let _ = most_used.pop(); }
|
||||
}
|
||||
|
||||
let mut map = HashMap::new();
|
||||
|
||||
for data in most_used {
|
||||
map.insert(data.0, data.1);
|
||||
}
|
||||
|
||||
map
|
||||
}
|
||||
}
|
199
pbs-datastore/src/lib.rs
Normal file
199
pbs-datastore/src/lib.rs
Normal file
@ -0,0 +1,199 @@
|
||||
//! This module implements the data storage and access layer.
|
||||
//!
|
||||
//! # Data formats
|
||||
//!
|
||||
//! PBS splits large files into chunks, and stores them deduplicated using
|
||||
//! a content addressable storage format.
|
||||
//!
|
||||
//! Backup snapshots are stored as folders containing a manifest file and
|
||||
//! potentially one or more index or blob files.
|
||||
//!
|
||||
//! The manifest contains hashes of all other files and can be signed by
|
||||
//! the client.
|
||||
//!
|
||||
//! Blob files contain data directly. They are used for config files and
|
||||
//! the like.
|
||||
//!
|
||||
//! Index files are used to reconstruct an original file. They contain a
|
||||
//! list of SHA256 checksums. The `DynamicIndex*` format is able to deal
|
||||
//! with dynamic chunk sizes (CT and host backups), whereas the
|
||||
//! `FixedIndex*` format is an optimization to store a list of equal sized
|
||||
//! chunks (VMs, whole block devices).
|
||||
//!
|
||||
//! A chunk is defined as a binary blob, which is stored inside a
|
||||
//! [ChunkStore](struct.ChunkStore.html) instead of the backup directory
|
||||
//! directly, and can be addressed by its SHA256 digest.
|
||||
//!
|
||||
//!
|
||||
//! # Garbage Collection (GC)
|
||||
//!
|
||||
//! Deleting backups is as easy as deleting the corresponding .idx files.
|
||||
//! However, this does not free up any storage, because those files just
|
||||
//! contain references to chunks.
|
||||
//!
|
||||
//! To free up some storage, we run a garbage collection process at
|
||||
//! regular intervals. The collector uses a mark and sweep approach. In
|
||||
//! the first phase, it scans all .idx files to mark used chunks. The
|
||||
//! second phase then removes all unmarked chunks from the store.
|
||||
//!
|
||||
//! The locking mechanisms mentioned below make sure that we are the only
|
||||
//! process running GC. We still want to be able to create backups during
|
||||
//! GC, so there may be multiple backup threads/tasks running, either
|
||||
//! started before GC, or while GC is running.
|
||||
//!
|
||||
//! ## `atime` based GC
|
||||
//!
|
||||
//! The idea here is to mark chunks by updating the `atime` (access
|
||||
//! timestamp) on the chunk file. This is quite simple and does not need
|
||||
//! additional RAM.
|
||||
//!
|
||||
//! One minor problem is that recent Linux versions use the `relatime`
|
||||
//! mount flag by default for performance reasons (and we want that). When
|
||||
//! enabled, `atime` data is written to the disk only if the file has been
|
||||
//! modified since the `atime` data was last updated (`mtime`), or if the
|
||||
//! file was last accessed more than a certain amount of time ago (by
|
||||
//! default 24h). So we may only delete chunks with `atime` older than 24
|
||||
//! hours.
|
||||
//!
|
||||
//! Another problem arises from running backups. The mark phase does not
|
||||
//! find any chunks from those backups, because there is no .idx file for
|
||||
//! them (created after the backup). Chunks created or touched by those
|
||||
//! backups may have an `atime` as old as the start time of those backups.
|
||||
//! Please note that the backup start time may predate the GC start time.
|
||||
//! So we may only delete chunks older than the start time of those
|
||||
//! running backup jobs, which might be more than 24h back (this is the
|
||||
//! reason why ProcessLocker exclusive locks only have to be exclusive
|
||||
//! between processes, since within one we can determine the age of the
|
||||
//! oldest shared lock).
|
||||
//!
|
||||
//! ## Store `marks` in RAM using a HASH
|
||||
//!
|
||||
//! Might be better. Under investigation.
|
||||
//!
|
||||
//!
|
||||
//! # Locking
|
||||
//!
|
||||
//! Since PBS allows multiple potentially interfering operations at the
|
||||
//! same time (e.g. garbage collect, prune, multiple backup creations
|
||||
//! (only in separate groups), forget, ...), these need to lock against
|
||||
//! each other in certain scenarios. There is no overarching global lock
|
||||
//! though, instead always the finest grained lock possible is used,
|
||||
//! because running these operations concurrently is treated as a feature
|
||||
//! on its own.
|
||||
//!
|
||||
//! ## Inter-process Locking
|
||||
//!
|
||||
//! We need to be able to restart the proxmox-backup service daemons, so
|
||||
//! that we can update the software without rebooting the host. But such
|
||||
//! restarts must not abort running backup jobs, so we need to keep the
|
||||
//! old service running until those jobs are finished. This implies that
|
||||
//! we need some kind of locking for modifying chunks and indices in the
|
||||
//! ChunkStore.
|
||||
//!
|
||||
//! Please note that it is perfectly valid to have multiple
|
||||
//! parallel ChunkStore writers, even when they write the same chunk
|
||||
//! (because the chunk would have the same name and the same data, and
|
||||
//! writes are completed atomically via a rename). The only problem is
|
||||
//! garbage collection, because we need to avoid deleting chunks which are
|
||||
//! still referenced.
|
||||
//!
|
||||
//! To do this we use the
|
||||
//! [ProcessLocker](../tools/struct.ProcessLocker.html).
|
||||
//!
|
||||
//! ### ChunkStore-wide
|
||||
//!
|
||||
//! * Create Index Files:
|
||||
//!
|
||||
//! Acquire shared lock for ChunkStore.
|
||||
//!
|
||||
//! Note: When creating .idx files, we create a temporary .tmp file,
|
||||
//! then do an atomic rename.
|
||||
//!
|
||||
//! * Garbage Collect:
|
||||
//!
|
||||
//! Acquire exclusive lock for ChunkStore. If we have
|
||||
//! already a shared lock for the ChunkStore, try to upgrade that
|
||||
//! lock.
|
||||
//!
|
||||
//! Exclusive locks only work _between processes_. It is valid to have an
|
||||
//! exclusive and one or more shared locks held within one process. Writing
|
||||
//! chunks within one process is synchronized using the gc_mutex.
|
||||
//!
|
||||
//! On server restart, we stop any running GC in the old process to avoid
|
||||
//! having the exclusive lock held for too long.
|
||||
//!
|
||||
//! ## Locking table
|
||||
//!
|
||||
//! Below table shows all operations that play a role in locking, and which
|
||||
//! mechanisms are used to make their concurrent usage safe.
|
||||
//!
|
||||
//! | starting ><br>v during | read index file | create index file | GC mark | GC sweep | update manifest | forget | prune | create backup | verify | reader api |
|
||||
//! |-|-|-|-|-|-|-|-|-|-|-|
|
||||
//! | **read index file** | / | / | / | / | / | mmap stays valid, oldest_shared_lock prevents GC | see forget column | / | / | / |
|
||||
//! | **create index file** | / | / | / | / | / | / | / | /, happens at the end, after all chunks are touched | /, only happens without a manifest | / |
|
||||
//! | **GC mark** | / | Datastore process-lock shared | gc_mutex, exclusive ProcessLocker | gc_mutex | /, GC only cares about index files, not manifests | tells GC about removed chunks | see forget column | /, index files don’t exist yet | / | / |
|
||||
//! | **GC sweep** | / | Datastore process-lock shared | gc_mutex, exclusive ProcessLocker | gc_mutex | / | /, chunks already marked | see forget column | chunks get touched; chunk_store.mutex; oldest PL lock | / | / |
|
||||
//! | **update manifest** | / | / | / | / | update_manifest lock | update_manifest lock, remove dir under lock | see forget column | /, “write manifest” happens at the end | /, can call “write manifest”, see that column | / |
|
||||
//! | **forget** | / | / | removed_during_gc mutex is held during unlink | marking done, doesn’t matter if forgotten now | update_manifest lock, forget waits for lock | /, unlink is atomic | causes forget to fail, but that’s OK | running backup has snapshot flock | /, potentially detects missing folder | shared snap flock |
|
||||
//! | **prune** | / | / | see forget row | see forget row | see forget row | causes warn in prune, but no error | see forget column | running and last non-running can’t be pruned | see forget row | shared snap flock |
|
||||
//! | **create backup** | / | only time this happens, thus has snapshot flock | / | chunks get touched; chunk_store.mutex; oldest PL lock | no lock, but cannot exist beforehand | snapshot flock, can’t be forgotten | running and last non-running can’t be pruned | snapshot group flock, only one running per group | /, won’t be verified since manifest missing | / |
|
||||
//! | **verify** | / | / | / | / | see “update manifest” row | /, potentially detects missing folder | see forget column | / | /, but useless (“update manifest” protects itself) | / |
|
||||
//! | **reader api** | / | / | / | /, open snap can’t be forgotten, so ref must exist | / | prevented by shared snap flock | prevented by shared snap flock | / | / | /, lock is shared |!
|
||||
//! * / = no interaction
|
||||
//! * shared/exclusive from POV of 'starting' process
|
||||
|
||||
use anyhow::{format_err, Error};
|
||||
|
||||
// Note: .pcat1 => Proxmox Catalog Format version 1
|
||||
pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
|
||||
() => {
|
||||
"proxmox-backup-protocol-v1"
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! PROXMOX_BACKUP_READER_PROTOCOL_ID_V1 {
|
||||
() => {
|
||||
"proxmox-backup-reader-protocol-v1"
|
||||
};
|
||||
}
|
||||
|
||||
/// Unix system user used by proxmox-backup-proxy
|
||||
pub const BACKUP_USER_NAME: &str = "backup";
|
||||
/// Unix system group used by proxmox-backup-proxy
|
||||
pub const BACKUP_GROUP_NAME: &str = "backup";
|
||||
|
||||
/// Return User info for the 'backup' user (``getpwnam_r(3)``)
|
||||
pub fn backup_user() -> Result<nix::unistd::User, Error> {
|
||||
nix::unistd::User::from_name(BACKUP_USER_NAME)?
|
||||
.ok_or_else(|| format_err!("Unable to lookup backup user."))
|
||||
}
|
||||
|
||||
/// Return Group info for the 'backup' group (``getgrnam(3)``)
|
||||
pub fn backup_group() -> Result<nix::unistd::Group, Error> {
|
||||
nix::unistd::Group::from_name(BACKUP_GROUP_NAME)?
|
||||
.ok_or_else(|| format_err!("Unable to lookup backup user."))
|
||||
}
|
||||
|
||||
pub mod catalog;
|
||||
pub mod checksum_reader;
|
||||
pub mod checksum_writer;
|
||||
pub mod chunker;
|
||||
pub mod crypt_config;
|
||||
pub mod crypt_reader;
|
||||
pub mod crypt_writer;
|
||||
pub mod data_blob;
|
||||
pub mod data_blob_reader;
|
||||
pub mod data_blob_writer;
|
||||
pub mod file_formats;
|
||||
pub mod index;
|
||||
|
||||
pub use checksum_reader::ChecksumReader;
|
||||
pub use checksum_writer::ChecksumWriter;
|
||||
pub use chunker::Chunker;
|
||||
pub use crypt_config::{CryptConfig, CryptMode};
|
||||
pub use crypt_reader::CryptReader;
|
||||
pub use crypt_writer::CryptWriter;
|
Reference in New Issue
Block a user