asyncify pxar create_archive

...to take advantage of the aio::Encoder from the pxar create.

Rather straightforward conversion, but does require getting rid of
references in the Archiver struct, and thus has to be given the Mutex
for the catalog directly. The callback is boxed.

archive_dir_contents can call itself recursively, and thus needs to
return a boxed future.

Users are adjusted, namely PxarBackupStream is converted to use an
Abortable future instead of a thread so it supports async in its handler
function, and the pxar bin create_archive is converted to an async API
function. One test case is made to just use 'block_on'.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Stefan Reiter 2021-02-09 13:03:48 +01:00 committed by Wolfgang Bumiller
parent a42212fc1e
commit 6afb60abf5
4 changed files with 143 additions and 140 deletions

View File

@ -295,7 +295,7 @@ fn extract_archive(
)] )]
/// Create a new .pxar archive. /// Create a new .pxar archive.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn create_archive( async fn create_archive(
archive: String, archive: String,
source: String, source: String,
verbose: bool, verbose: bool,
@ -376,7 +376,7 @@ fn create_archive(
dir, dir,
writer, writer,
feature_flags, feature_flags,
|path| { move |path| {
if verbose { if verbose {
println!("{:?}", path); println!("{:?}", path);
} }
@ -384,7 +384,7 @@ fn create_archive(
}, },
None, None,
options, options,
)?; ).await?;
Ok(()) Ok(())
} }

View File

@ -4,10 +4,10 @@ use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::thread;
use anyhow::{format_err, Error}; use anyhow::{format_err, Error};
use futures::stream::Stream; use futures::stream::Stream;
use futures::future::{Abortable, AbortHandle};
use nix::dir::Dir; use nix::dir::Dir;
use nix::fcntl::OFlag; use nix::fcntl::OFlag;
use nix::sys::stat::Mode; use nix::sys::stat::Mode;
@ -21,14 +21,14 @@ use crate::backup::CatalogWriter;
/// consumer. /// consumer.
pub struct PxarBackupStream { pub struct PxarBackupStream {
rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>, rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
child: Option<thread::JoinHandle<()>>, handle: Option<AbortHandle>,
error: Arc<Mutex<Option<String>>>, error: Arc<Mutex<Option<String>>>,
} }
impl Drop for PxarBackupStream { impl Drop for PxarBackupStream {
fn drop(&mut self) { fn drop(&mut self) {
self.rx = None; self.rx = None;
self.child.take().unwrap().join().unwrap(); self.handle.take().unwrap().abort();
} }
} }
@ -43,12 +43,8 @@ impl PxarBackupStream {
let buffer_size = 256 * 1024; let buffer_size = 256 * 1024;
let error = Arc::new(Mutex::new(None)); let error = Arc::new(Mutex::new(None));
let child = std::thread::Builder::new() let error2 = Arc::clone(&error);
.name("PxarBackupStream".to_string()) let handler = async move {
.spawn({
let error = Arc::clone(&error);
move || {
let mut catalog_guard = catalog.lock().unwrap();
let writer = std::io::BufWriter::with_capacity( let writer = std::io::BufWriter::with_capacity(
buffer_size, buffer_size,
crate::tools::StdChannelWriter::new(tx), crate::tools::StdChannelWriter::new(tx),
@ -61,24 +57,27 @@ impl PxarBackupStream {
dir, dir,
writer, writer,
crate::pxar::Flags::DEFAULT, crate::pxar::Flags::DEFAULT,
|path| { move |path| {
if verbose { if verbose {
println!("{:?}", path); println!("{:?}", path);
} }
Ok(()) Ok(())
}, },
Some(&mut *catalog_guard), Some(catalog),
options, options,
) { ).await {
let mut error = error.lock().unwrap(); let mut error = error2.lock().unwrap();
*error = Some(err.to_string()); *error = Some(err.to_string());
} }
} };
})?;
let (handle, registration) = AbortHandle::new_pair();
let future = Abortable::new(handler, registration);
tokio::spawn(future);
Ok(Self { Ok(Self {
rx: Some(rx), rx: Some(rx),
child: Some(child), handle: Some(handle),
error, error,
}) })
} }

View File

@ -5,16 +5,19 @@ use std::io::{self, Read, Write};
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use nix::dir::Dir; use nix::dir::Dir;
use nix::errno::Errno; use nix::errno::Errno;
use nix::fcntl::OFlag; use nix::fcntl::OFlag;
use nix::sys::stat::{FileStat, Mode}; use nix::sys::stat::{FileStat, Mode};
use futures::future::BoxFuture;
use futures::FutureExt;
use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag}; use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
use pxar::Metadata; use pxar::Metadata;
use pxar::encoder::LinkOffset; use pxar::encoder::{SeqWrite, LinkOffset};
use proxmox::c_str; use proxmox::c_str;
use proxmox::sys::error::SysError; use proxmox::sys::error::SysError;
@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter {
} }
} }
struct Archiver<'a, 'b> { struct Archiver {
feature_flags: Flags, feature_flags: Flags,
fs_feature_flags: Flags, fs_feature_flags: Flags,
fs_magic: i64, fs_magic: i64,
patterns: Vec<MatchEntry>, patterns: Vec<MatchEntry>,
callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>, callback: Box<dyn FnMut(&Path) -> Result<(), Error> + Send>,
catalog: Option<&'b mut dyn BackupCatalogWriter>, catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
path: PathBuf, path: PathBuf,
entry_counter: usize, entry_counter: usize,
entry_limit: usize, entry_limit: usize,
@ -147,19 +150,19 @@ struct Archiver<'a, 'b> {
file_copy_buffer: Vec<u8>, file_copy_buffer: Vec<u8>,
} }
type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>; type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
pub fn create_archive<T, F>( pub async fn create_archive<T, F>(
source_dir: Dir, source_dir: Dir,
mut writer: T, mut writer: T,
feature_flags: Flags, feature_flags: Flags,
mut callback: F, callback: F,
catalog: Option<&mut dyn BackupCatalogWriter>, catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
options: PxarCreateOptions, options: PxarCreateOptions,
) -> Result<(), Error> ) -> Result<(), Error>
where where
T: pxar::encoder::SeqWrite, T: SeqWrite + Send,
F: FnMut(&Path) -> Result<(), Error>, F: FnMut(&Path) -> Result<(), Error> + Send + 'static,
{ {
let fs_magic = detect_fs_type(source_dir.as_raw_fd())?; let fs_magic = detect_fs_type(source_dir.as_raw_fd())?;
if is_virtual_file_system(fs_magic) { if is_virtual_file_system(fs_magic) {
@ -182,8 +185,7 @@ where
set.insert(stat.st_dev); set.insert(stat.st_dev);
} }
let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite; let mut encoder = Encoder::new(&mut writer, &metadata).await?;
let mut encoder = Encoder::new(writer, &metadata)?;
let mut patterns = options.patterns; let mut patterns = options.patterns;
@ -199,7 +201,7 @@ where
feature_flags, feature_flags,
fs_feature_flags, fs_feature_flags,
fs_magic, fs_magic,
callback: &mut callback, callback: Box::new(callback),
patterns, patterns,
catalog, catalog,
path: PathBuf::new(), path: PathBuf::new(),
@ -213,8 +215,8 @@ where
file_copy_buffer: vec::undefined(4 * 1024 * 1024), file_copy_buffer: vec::undefined(4 * 1024 * 1024),
}; };
archiver.archive_dir_contents(&mut encoder, source_dir, true)?; archiver.archive_dir_contents(&mut encoder, source_dir, true).await?;
encoder.finish()?; encoder.finish().await?;
Ok(()) Ok(())
} }
@ -224,7 +226,7 @@ struct FileListEntry {
stat: FileStat, stat: FileStat,
} }
impl<'a, 'b> Archiver<'a, 'b> { impl Archiver {
/// Get the currently effective feature flags. (Requested flags masked by the file system /// Get the currently effective feature flags. (Requested flags masked by the file system
/// feature flags). /// feature flags).
fn flags(&self) -> Flags { fn flags(&self) -> Flags {
@ -239,12 +241,13 @@ impl<'a, 'b> Archiver<'a, 'b> {
} }
} }
fn archive_dir_contents( fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>(
&mut self, &'a mut self,
encoder: &mut Encoder, encoder: &'a mut Encoder<'b, T>,
mut dir: Dir, mut dir: Dir,
is_root: bool, is_root: bool,
) -> Result<(), Error> { ) -> BoxFuture<'a, Result<(), Error>> {
async move {
let entry_counter = self.entry_counter; let entry_counter = self.entry_counter;
let old_patterns_count = self.patterns.len(); let old_patterns_count = self.patterns.len();
@ -268,13 +271,13 @@ impl<'a, 'b> Archiver<'a, 'b> {
let file_name = file_entry.name.to_bytes(); let file_name = file_entry.name.to_bytes();
if is_root && file_name == b".pxarexclude-cli" { if is_root && file_name == b".pxarexclude-cli" {
self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?; self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?;
continue; continue;
} }
(self.callback)(&file_entry.path)?; (self.callback)(&file_entry.path)?;
self.path = file_entry.path; self.path = file_entry.path;
self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat) self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await
.map_err(|err| self.wrap_err(err))?; .map_err(|err| self.wrap_err(err))?;
} }
self.path = old_path; self.path = old_path;
@ -282,6 +285,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
self.patterns.truncate(old_patterns_count); self.patterns.truncate(old_patterns_count);
Ok(()) Ok(())
}.boxed()
} }
/// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`. /// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> {
Ok(()) Ok(())
} }
fn encode_pxarexclude_cli( async fn encode_pxarexclude_cli<T: SeqWrite + Send>(
&mut self, &mut self,
encoder: &mut Encoder, encoder: &mut Encoder<'_, T>,
file_name: &CStr, file_name: &CStr,
patterns_count: usize, patterns_count: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]); let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]);
if let Some(ref catalog) = self.catalog {
if let Some(ref mut catalog) = self.catalog { catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?;
catalog.add_file(file_name, content.len() as u64, 0)?;
} }
let mut metadata = Metadata::default(); let mut metadata = Metadata::default();
metadata.stat.mode = pxar::format::mode::IFREG | 0o600; metadata.stat.mode = pxar::format::mode::IFREG | 0o600;
let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?; let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?;
file.write_all(&content)?; file.write_all(&content).await?;
Ok(()) Ok(())
} }
@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
Ok(()) Ok(())
} }
fn add_entry( async fn add_entry<T: SeqWrite + Send>(
&mut self, &mut self,
encoder: &mut Encoder, encoder: &mut Encoder<'_, T>,
parent: RawFd, parent: RawFd,
c_file_name: &CStr, c_file_name: &CStr,
stat: &FileStat, stat: &FileStat,
@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> {
if stat.st_nlink > 1 { if stat.st_nlink > 1 {
if let Some((path, offset)) = self.hardlinks.get(&link_info) { if let Some((path, offset)) = self.hardlinks.get(&link_info) {
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.add_hardlink(c_file_name)?; catalog.lock().unwrap().add_hardlink(c_file_name)?;
} }
encoder.add_hardlink(file_name, path, *offset)?; encoder.add_hardlink(file_name, path, *offset).await?;
return Ok(()); return Ok(());
} }
} }
let file_size = stat.st_size as u64; let file_size = stat.st_size as u64;
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.add_file(c_file_name, file_size, stat.st_mtime)?; catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?;
} }
let offset: LinkOffset = let offset: LinkOffset =
self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?; self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?;
if stat.st_nlink > 1 { if stat.st_nlink > 1 {
self.hardlinks.insert(link_info, (self.path.clone(), offset)); self.hardlinks.insert(link_info, (self.path.clone(), offset));
@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> {
mode::IFDIR => { mode::IFDIR => {
let dir = Dir::from_fd(fd.into_raw_fd())?; let dir = Dir::from_fd(fd.into_raw_fd())?;
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.start_directory(c_file_name)?; catalog.lock().unwrap().start_directory(c_file_name)?;
} }
let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat); let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await;
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.end_directory()?; catalog.lock().unwrap().end_directory()?;
} }
result result
} }
mode::IFSOCK => { mode::IFSOCK => {
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.add_socket(c_file_name)?; catalog.lock().unwrap().add_socket(c_file_name)?;
} }
Ok(encoder.add_socket(&metadata, file_name)?) Ok(encoder.add_socket(&metadata, file_name).await?)
} }
mode::IFIFO => { mode::IFIFO => {
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.add_fifo(c_file_name)?; catalog.lock().unwrap().add_fifo(c_file_name)?;
} }
Ok(encoder.add_fifo(&metadata, file_name)?) Ok(encoder.add_fifo(&metadata, file_name).await?)
} }
mode::IFLNK => { mode::IFLNK => {
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.add_symlink(c_file_name)?; catalog.lock().unwrap().add_symlink(c_file_name)?;
} }
self.add_symlink(encoder, fd, file_name, &metadata) self.add_symlink(encoder, fd, file_name, &metadata).await
} }
mode::IFBLK => { mode::IFBLK => {
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.add_block_device(c_file_name)?; catalog.lock().unwrap().add_block_device(c_file_name)?;
} }
self.add_device(encoder, file_name, &metadata, &stat) self.add_device(encoder, file_name, &metadata, &stat).await
} }
mode::IFCHR => { mode::IFCHR => {
if let Some(ref mut catalog) = self.catalog { if let Some(ref catalog) = self.catalog {
catalog.add_char_device(c_file_name)?; catalog.lock().unwrap().add_char_device(c_file_name)?;
} }
self.add_device(encoder, file_name, &metadata, &stat) self.add_device(encoder, file_name, &metadata, &stat).await
} }
other => bail!( other => bail!(
"encountered unknown file type: 0x{:x} (0o{:o})", "encountered unknown file type: 0x{:x} (0o{:o})",
@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
} }
} }
fn add_directory( async fn add_directory<T: SeqWrite + Send>(
&mut self, &mut self,
encoder: &mut Encoder, encoder: &mut Encoder<'_, T>,
dir: Dir, dir: Dir,
dir_name: &CStr, dir_name: &CStr,
metadata: &Metadata, metadata: &Metadata,
@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
) -> Result<(), Error> { ) -> Result<(), Error> {
let dir_name = OsStr::from_bytes(dir_name.to_bytes()); let dir_name = OsStr::from_bytes(dir_name.to_bytes());
let mut encoder = encoder.create_directory(dir_name, &metadata)?; let mut encoder = encoder.create_directory(dir_name, &metadata).await?;
let old_fs_magic = self.fs_magic; let old_fs_magic = self.fs_magic;
let old_fs_feature_flags = self.fs_feature_flags; let old_fs_feature_flags = self.fs_feature_flags;
@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> {
writeln!(self.logger, "skipping mount point: {:?}", self.path)?; writeln!(self.logger, "skipping mount point: {:?}", self.path)?;
Ok(()) Ok(())
} else { } else {
self.archive_dir_contents(&mut encoder, dir, false) self.archive_dir_contents(&mut encoder, dir, false).await
}; };
self.fs_magic = old_fs_magic; self.fs_magic = old_fs_magic;
self.fs_feature_flags = old_fs_feature_flags; self.fs_feature_flags = old_fs_feature_flags;
self.current_st_dev = old_st_dev; self.current_st_dev = old_st_dev;
encoder.finish()?; encoder.finish().await?;
result result
} }
fn add_regular_file( async fn add_regular_file<T: SeqWrite + Send>(
&mut self, &mut self,
encoder: &mut Encoder, encoder: &mut Encoder<'_, T>,
fd: Fd, fd: Fd,
file_name: &Path, file_name: &Path,
metadata: &Metadata, metadata: &Metadata,
@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
) -> Result<LinkOffset, Error> { ) -> Result<LinkOffset, Error> {
let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) }; let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
let mut remaining = file_size; let mut remaining = file_size;
let mut out = encoder.create_file(metadata, file_name, file_size)?; let mut out = encoder.create_file(metadata, file_name, file_size).await?;
while remaining != 0 { while remaining != 0 {
let mut got = match file.read(&mut self.file_copy_buffer[..]) { let mut got = match file.read(&mut self.file_copy_buffer[..]) {
Ok(0) => break, Ok(0) => break,
@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
self.report_file_grew_while_reading()?; self.report_file_grew_while_reading()?;
got = remaining as usize; got = remaining as usize;
} }
out.write_all(&self.file_copy_buffer[..got])?; out.write_all(&self.file_copy_buffer[..got]).await?;
remaining -= got as u64; remaining -= got as u64;
} }
if remaining > 0 { if remaining > 0 {
@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
vec::clear(&mut self.file_copy_buffer[..to_zero]); vec::clear(&mut self.file_copy_buffer[..to_zero]);
while remaining != 0 { while remaining != 0 {
let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize; let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
out.write_all(&self.file_copy_buffer[..fill])?; out.write_all(&self.file_copy_buffer[..fill]).await?;
remaining -= fill as u64; remaining -= fill as u64;
} }
} }
@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> {
Ok(out.file_offset()) Ok(out.file_offset())
} }
fn add_symlink( async fn add_symlink<T: SeqWrite + Send>(
&mut self, &mut self,
encoder: &mut Encoder, encoder: &mut Encoder<'_, T>,
fd: Fd, fd: Fd,
file_name: &Path, file_name: &Path,
metadata: &Metadata, metadata: &Metadata,
) -> Result<(), Error> { ) -> Result<(), Error> {
let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?; let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?;
encoder.add_symlink(metadata, file_name, dest)?; encoder.add_symlink(metadata, file_name, dest).await?;
Ok(()) Ok(())
} }
fn add_device( async fn add_device<T: SeqWrite + Send>(
&mut self, &mut self,
encoder: &mut Encoder, encoder: &mut Encoder<'_, T>,
file_name: &Path, file_name: &Path,
metadata: &Metadata, metadata: &Metadata,
stat: &FileStat, stat: &FileStat,
@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
metadata, metadata,
file_name, file_name,
pxar::format::Device::from_dev_t(stat.st_rdev), pxar::format::Device::from_dev_t(stat.st_rdev),
)?) ).await?)
} }
} }

View File

@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
..PxarCreateOptions::default() ..PxarCreateOptions::default()
}; };
create_archive( let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(create_archive(
dir, dir,
writer, writer,
Flags::DEFAULT, Flags::DEFAULT,
|_| Ok(()), |_| Ok(()),
None, None,
options, options,
)?; ))?;
Command::new("cmp") Command::new("cmp")
.arg("--verbose") .arg("--verbose")