src/pxar/encoder.rs: use BackupCatalogWriter
This commit is contained in:
parent
aea0815d32
commit
2761d6a4f3
@ -7,6 +7,8 @@ use chrono::{Local, Utc, TimeZone};
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::collections::{HashSet, HashMap};
|
use std::collections::{HashSet, HashMap};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
use std::os::unix::fs::OpenOptionsExt;
|
||||||
|
|
||||||
use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size};
|
use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size};
|
||||||
|
|
||||||
use proxmox_backup::tools;
|
use proxmox_backup::tools;
|
||||||
@ -25,7 +27,7 @@ use proxmox_backup::pxar;
|
|||||||
|
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
//use hyper::Body;
|
//use hyper::Body;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use xdg::BaseDirectories;
|
use xdg::BaseDirectories;
|
||||||
|
|
||||||
@ -157,9 +159,10 @@ fn backup_directory<P: AsRef<Path>>(
|
|||||||
verbose: bool,
|
verbose: bool,
|
||||||
skip_lost_and_found: bool,
|
skip_lost_and_found: bool,
|
||||||
crypt_config: Option<Arc<CryptConfig>>,
|
crypt_config: Option<Arc<CryptConfig>>,
|
||||||
|
catalog: Arc<Mutex<pxar::catalog::SimpleCatalog>>,
|
||||||
) -> Result<BackupStats, Error> {
|
) -> Result<BackupStats, Error> {
|
||||||
|
|
||||||
let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found)?;
|
let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found, catalog)?;
|
||||||
let chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
|
let chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
|
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
|
||||||
@ -595,6 +598,10 @@ fn create_backup(
|
|||||||
|
|
||||||
let mut file_list = vec![];
|
let mut file_list = vec![];
|
||||||
|
|
||||||
|
let catalog_filename = format!("/tmp/pbs-catalog-{}.cat", std::process::id());
|
||||||
|
let catalog = Arc::new(Mutex::new(pxar::catalog::SimpleCatalog::new(&catalog_filename)?));
|
||||||
|
let mut upload_catalog = false;
|
||||||
|
|
||||||
for (backup_type, filename, target, size) in upload_list {
|
for (backup_type, filename, target, size) in upload_list {
|
||||||
match backup_type {
|
match backup_type {
|
||||||
BackupType::CONFIG => {
|
BackupType::CONFIG => {
|
||||||
@ -608,6 +615,7 @@ fn create_backup(
|
|||||||
file_list.push((target, stats));
|
file_list.push((target, stats));
|
||||||
}
|
}
|
||||||
BackupType::PXAR => {
|
BackupType::PXAR => {
|
||||||
|
upload_catalog = true;
|
||||||
println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
|
println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
|
||||||
let stats = backup_directory(
|
let stats = backup_directory(
|
||||||
&client,
|
&client,
|
||||||
@ -618,6 +626,7 @@ fn create_backup(
|
|||||||
verbose,
|
verbose,
|
||||||
skip_lost_and_found,
|
skip_lost_and_found,
|
||||||
crypt_config.clone(),
|
crypt_config.clone(),
|
||||||
|
catalog.clone(),
|
||||||
)?;
|
)?;
|
||||||
file_list.push((target, stats));
|
file_list.push((target, stats));
|
||||||
}
|
}
|
||||||
@ -637,6 +646,19 @@ fn create_backup(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// finalize and upload catalog
|
||||||
|
if upload_catalog {
|
||||||
|
let mutex = Arc::try_unwrap(catalog)
|
||||||
|
.map_err(|_| format_err!("unable to get catalog (still used)"))?;
|
||||||
|
drop(mutex); // close catalog
|
||||||
|
|
||||||
|
let target = "catalog.blob";
|
||||||
|
let stats = client.upload_blob_from_file(&catalog_filename, target, crypt_config.clone(), true).wait()?;
|
||||||
|
file_list.push((target.to_owned(), stats));
|
||||||
|
|
||||||
|
let _ = std::fs::remove_file(&catalog_filename);
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(rsa_encrypted_key) = rsa_encrypted_key {
|
if let Some(rsa_encrypted_key) = rsa_encrypted_key {
|
||||||
let target = "rsa-encrypted.key";
|
let target = "rsa-encrypted.key";
|
||||||
println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
|
println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
|
||||||
@ -772,8 +794,6 @@ fn restore(
|
|||||||
|
|
||||||
let client = client.start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true).wait()?;
|
let client = client.start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true).wait()?;
|
||||||
|
|
||||||
use std::os::unix::fs::OpenOptionsExt;
|
|
||||||
|
|
||||||
let tmpfile = std::fs::OpenOptions::new()
|
let tmpfile = std::fs::OpenOptions::new()
|
||||||
.write(true)
|
.write(true)
|
||||||
.read(true)
|
.read(true)
|
||||||
|
@ -208,7 +208,8 @@ fn create_archive(
|
|||||||
feature_flags ^= pxar::flags::WITH_SOCKETS;
|
feature_flags ^= pxar::flags::WITH_SOCKETS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pxar::Encoder::encode(source, &mut dir, &mut writer, devices, verbose, false, feature_flags)?;
|
let catalog = None::<&mut pxar::catalog::SimpleCatalog>;
|
||||||
|
pxar::Encoder::encode(source, &mut dir, &mut writer, catalog, devices, verbose, false, feature_flags)?;
|
||||||
|
|
||||||
writer.flush()?;
|
writer.flush()?;
|
||||||
|
|
||||||
|
@ -37,7 +37,14 @@ impl Drop for PxarBackupStream {
|
|||||||
|
|
||||||
impl PxarBackupStream {
|
impl PxarBackupStream {
|
||||||
|
|
||||||
pub fn new(mut dir: Dir, path: PathBuf, device_set: Option<HashSet<u64>>, verbose: bool, skip_lost_and_found: bool) -> Result<Self, Error> {
|
pub fn new(
|
||||||
|
mut dir: Dir,
|
||||||
|
path: PathBuf,
|
||||||
|
device_set: Option<HashSet<u64>>,
|
||||||
|
verbose: bool,
|
||||||
|
skip_lost_and_found: bool,
|
||||||
|
catalog: Arc<Mutex<crate::pxar::catalog::SimpleCatalog>>,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
|
||||||
let (rx, tx) = nix::unistd::pipe()?;
|
let (rx, tx) = nix::unistd::pipe()?;
|
||||||
|
|
||||||
@ -47,9 +54,11 @@ impl PxarBackupStream {
|
|||||||
let error = Arc::new(Mutex::new(None));
|
let error = Arc::new(Mutex::new(None));
|
||||||
let error2 = error.clone();
|
let error2 = error.clone();
|
||||||
|
|
||||||
let child = thread::spawn(move|| {
|
let catalog = catalog.clone();
|
||||||
|
let child = thread::spawn(move || {
|
||||||
|
let mut guard = catalog.lock().unwrap();
|
||||||
let mut writer = unsafe { std::fs::File::from_raw_fd(tx) };
|
let mut writer = unsafe { std::fs::File::from_raw_fd(tx) };
|
||||||
if let Err(err) = pxar::Encoder::encode(path, &mut dir, &mut writer, device_set, verbose, skip_lost_and_found, pxar::flags::DEFAULT) {
|
if let Err(err) = pxar::Encoder::encode(path, &mut dir, &mut writer, Some(&mut *guard), device_set, verbose, skip_lost_and_found, pxar::flags::DEFAULT) {
|
||||||
let mut error = error2.lock().unwrap();
|
let mut error = error2.lock().unwrap();
|
||||||
*error = Some(err.to_string());
|
*error = Some(err.to_string());
|
||||||
}
|
}
|
||||||
@ -65,12 +74,18 @@ impl PxarBackupStream {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn open(dirname: &Path, device_set: Option<HashSet<u64>>, verbose: bool, skip_lost_and_found: bool) -> Result<Self, Error> {
|
pub fn open(
|
||||||
|
dirname: &Path,
|
||||||
|
device_set: Option<HashSet<u64>>,
|
||||||
|
verbose: bool,
|
||||||
|
skip_lost_and_found: bool,
|
||||||
|
catalog: Arc<Mutex<crate::pxar::catalog::SimpleCatalog>>,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
|
||||||
let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
|
let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
|
||||||
let path = std::path::PathBuf::from(dirname);
|
let path = std::path::PathBuf::from(dirname);
|
||||||
|
|
||||||
Self::new(dir, path, device_set, verbose, skip_lost_and_found)
|
Self::new(dir, path, device_set, verbose, skip_lost_and_found, catalog)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,6 +11,8 @@ use super::format_definition::*;
|
|||||||
use super::binary_search_tree::*;
|
use super::binary_search_tree::*;
|
||||||
use super::helper::*;
|
use super::helper::*;
|
||||||
use super::match_pattern::*;
|
use super::match_pattern::*;
|
||||||
|
use super::catalog::BackupCatalogWriter;
|
||||||
|
|
||||||
use crate::tools::fs;
|
use crate::tools::fs;
|
||||||
use crate::tools::acl;
|
use crate::tools::acl;
|
||||||
use crate::tools::xattr;
|
use crate::tools::xattr;
|
||||||
@ -42,11 +44,12 @@ struct HardLinkInfo {
|
|||||||
st_ino: u64,
|
st_ino: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Encoder<'a, W: Write> {
|
pub struct Encoder<'a, W: Write, C: BackupCatalogWriter> {
|
||||||
base_path: PathBuf,
|
base_path: PathBuf,
|
||||||
relative_path: PathBuf,
|
relative_path: PathBuf,
|
||||||
writer: &'a mut W,
|
writer: &'a mut W,
|
||||||
writer_pos: usize,
|
writer_pos: usize,
|
||||||
|
catalog: Option<&'a mut C>,
|
||||||
_size: usize,
|
_size: usize,
|
||||||
file_copy_buffer: Vec<u8>,
|
file_copy_buffer: Vec<u8>,
|
||||||
device_set: Option<HashSet<u64>>,
|
device_set: Option<HashSet<u64>>,
|
||||||
@ -58,7 +61,7 @@ pub struct Encoder<'a, W: Write> {
|
|||||||
hardlinks: HashMap<HardLinkInfo, (PathBuf, u64)>,
|
hardlinks: HashMap<HardLinkInfo, (PathBuf, u64)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl <'a, W: Write> Encoder<'a, W> {
|
impl <'a, W: Write, C: BackupCatalogWriter> Encoder<'a, W, C> {
|
||||||
|
|
||||||
// used for error reporting
|
// used for error reporting
|
||||||
fn full_path(&self) -> PathBuf {
|
fn full_path(&self) -> PathBuf {
|
||||||
@ -78,6 +81,7 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
dir: &mut nix::dir::Dir,
|
dir: &mut nix::dir::Dir,
|
||||||
writer: &'a mut W,
|
writer: &'a mut W,
|
||||||
|
catalog: Option<&'a mut C>,
|
||||||
device_set: Option<HashSet<u64>>,
|
device_set: Option<HashSet<u64>>,
|
||||||
verbose: bool,
|
verbose: bool,
|
||||||
skip_lost_and_found: bool, // fixme: should be a feature flag ??
|
skip_lost_and_found: bool, // fixme: should be a feature flag ??
|
||||||
@ -118,6 +122,7 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
relative_path: PathBuf::new(),
|
relative_path: PathBuf::new(),
|
||||||
writer: writer,
|
writer: writer,
|
||||||
writer_pos: 0,
|
writer_pos: 0,
|
||||||
|
catalog,
|
||||||
_size: 0,
|
_size: 0,
|
||||||
file_copy_buffer,
|
file_copy_buffer,
|
||||||
device_set,
|
device_set,
|
||||||
@ -758,7 +763,13 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
self.write_filename(&filename)?;
|
self.write_filename(&filename)?;
|
||||||
|
if let Some(ref mut catalog) = self.catalog {
|
||||||
|
catalog.start_directory(&filename)?;
|
||||||
|
}
|
||||||
self.encode_dir(&mut dir, &stat, child_magic, exclude_list)?;
|
self.encode_dir(&mut dir, &stat, child_magic, exclude_list)?;
|
||||||
|
if let Some(ref mut catalog) = self.catalog {
|
||||||
|
catalog.end_directory()?;
|
||||||
|
}
|
||||||
|
|
||||||
} else if is_reg_file(&stat) {
|
} else if is_reg_file(&stat) {
|
||||||
|
|
||||||
@ -777,7 +788,9 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some((target, offset)) = hardlink_target {
|
if let Some((target, offset)) = hardlink_target {
|
||||||
|
if let Some(ref mut catalog) = self.catalog {
|
||||||
|
catalog.add_hardlink(&filename)?;
|
||||||
|
}
|
||||||
self.write_filename(&filename)?;
|
self.write_filename(&filename)?;
|
||||||
self.encode_hardlink(target.as_bytes(), offset)?;
|
self.encode_hardlink(target.as_bytes(), offset)?;
|
||||||
|
|
||||||
@ -792,6 +805,9 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
Err(err) => bail!("open file {:?} failed - {}", self.full_path(), err),
|
Err(err) => bail!("open file {:?} failed - {}", self.full_path(), err),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(ref mut catalog) = self.catalog {
|
||||||
|
catalog.add_file(&filename, stat.st_size as u64, stat.st_mtime as u64)?;
|
||||||
|
}
|
||||||
let child_magic = if dir_stat.st_dev != stat.st_dev {
|
let child_magic = if dir_stat.st_dev != stat.st_dev {
|
||||||
detect_fs_type(filefd)?
|
detect_fs_type(filefd)?
|
||||||
} else {
|
} else {
|
||||||
@ -805,6 +821,7 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
} else if is_symlink(&stat) {
|
} else if is_symlink(&stat) {
|
||||||
|
|
||||||
let mut buffer = vec::undefined(libc::PATH_MAX as usize);
|
let mut buffer = vec::undefined(libc::PATH_MAX as usize);
|
||||||
|
|
||||||
let res = filename.with_nix_path(|cstr| {
|
let res = filename.with_nix_path(|cstr| {
|
||||||
@ -813,6 +830,9 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
|
|
||||||
match Errno::result(res) {
|
match Errno::result(res) {
|
||||||
Ok(len) => {
|
Ok(len) => {
|
||||||
|
if let Some(ref mut catalog) = self.catalog {
|
||||||
|
catalog.add_symlink(&filename)?;
|
||||||
|
}
|
||||||
buffer[len as usize] = 0u8; // add Nul byte
|
buffer[len as usize] = 0u8; // add Nul byte
|
||||||
self.write_filename(&filename)?;
|
self.write_filename(&filename)?;
|
||||||
self.encode_symlink(&buffer[..((len+1) as usize)], &stat)?
|
self.encode_symlink(&buffer[..((len+1) as usize)], &stat)?
|
||||||
@ -825,6 +845,13 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
}
|
}
|
||||||
} else if is_block_dev(&stat) || is_char_dev(&stat) {
|
} else if is_block_dev(&stat) || is_char_dev(&stat) {
|
||||||
if self.has_features(flags::WITH_DEVICE_NODES) {
|
if self.has_features(flags::WITH_DEVICE_NODES) {
|
||||||
|
if let Some(ref mut catalog) = self.catalog {
|
||||||
|
if is_block_dev(&stat) {
|
||||||
|
catalog.add_block_device(&filename)?;
|
||||||
|
} else {
|
||||||
|
catalog.add_char_device(&filename)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
self.write_filename(&filename)?;
|
self.write_filename(&filename)?;
|
||||||
self.encode_device(&stat)?;
|
self.encode_device(&stat)?;
|
||||||
} else {
|
} else {
|
||||||
@ -832,6 +859,9 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
}
|
}
|
||||||
} else if is_fifo(&stat) {
|
} else if is_fifo(&stat) {
|
||||||
if self.has_features(flags::WITH_FIFOS) {
|
if self.has_features(flags::WITH_FIFOS) {
|
||||||
|
if let Some(ref mut catalog) = self.catalog {
|
||||||
|
catalog.add_fifo(&filename)?;
|
||||||
|
}
|
||||||
self.write_filename(&filename)?;
|
self.write_filename(&filename)?;
|
||||||
self.encode_special(&stat)?;
|
self.encode_special(&stat)?;
|
||||||
} else {
|
} else {
|
||||||
@ -839,6 +869,9 @@ impl <'a, W: Write> Encoder<'a, W> {
|
|||||||
}
|
}
|
||||||
} else if is_socket(&stat) {
|
} else if is_socket(&stat) {
|
||||||
if self.has_features(flags::WITH_SOCKETS) {
|
if self.has_features(flags::WITH_SOCKETS) {
|
||||||
|
if let Some(ref mut catalog) = self.catalog {
|
||||||
|
catalog.add_socket(&filename)?;
|
||||||
|
}
|
||||||
self.write_filename(&filename)?;
|
self.write_filename(&filename)?;
|
||||||
self.encode_special(&stat)?;
|
self.encode_special(&stat)?;
|
||||||
} else {
|
} else {
|
||||||
|
@ -26,7 +26,8 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
|
|||||||
|
|
||||||
let path = std::path::PathBuf::from(dir_name);
|
let path = std::path::PathBuf::from(dir_name);
|
||||||
|
|
||||||
Encoder::encode(path, &mut dir, &mut writer, None, false, false, flags::DEFAULT)?;
|
let catalog = None::<&mut catalog::SimpleCatalog>;
|
||||||
|
Encoder::encode(path, &mut dir, &mut writer, catalog, None, false, false, flags::DEFAULT)?;
|
||||||
|
|
||||||
Command::new("cmp")
|
Command::new("cmp")
|
||||||
.arg("--verbose")
|
.arg("--verbose")
|
||||||
|
Loading…
Reference in New Issue
Block a user