proxmox-backup/src/client/pxar_backup_stream.rs
Dominik Csapak f1d76ecf6c fix #3359: fix blocking writes in async code during pxar create
in commit `asyncify pxar create_archive`, we changed from a
separate thread for creating a pxar to using async code, but the
StdChannelWriter used for both pxar and catalog can block, which
may block the tokio runtime for single (and probably dual) core
environments

this patch adds a wrapper struct for any writer that implements
'std::io::Write' and wraps the write calls with 'block_in_place'
so that if called in a tokio runtime, it knows that this code
potentially blocks

Fixes: 6afb60abf5 ("asyncify pxar create_archive")

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2021-03-24 09:00:07 +01:00

128 lines
3.6 KiB
Rust

use std::io::Write;
//use std::os::unix::io::FromRawFd;
use std::path::Path;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use anyhow::{format_err, Error};
use futures::stream::Stream;
use futures::future::{Abortable, AbortHandle};
use nix::dir::Dir;
use nix::fcntl::OFlag;
use nix::sys::stat::Mode;
use crate::backup::CatalogWriter;
use crate::tools::{
StdChannelWriter,
TokioWriterAdapter,
};
/// Stream implementation to encode and upload .pxar archives.
///
/// The hyper client needs an async Stream for file upload, so we
/// spawn an extra thread to encode the .pxar data and pipe it to the
/// consumer.
pub struct PxarBackupStream {
rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
handle: Option<AbortHandle>,
error: Arc<Mutex<Option<String>>>,
}
impl Drop for PxarBackupStream {
fn drop(&mut self) {
self.rx = None;
self.handle.take().unwrap().abort();
}
}
impl PxarBackupStream {
pub fn new<W: Write + Send + 'static>(
dir: Dir,
catalog: Arc<Mutex<CatalogWriter<W>>>,
options: crate::pxar::PxarCreateOptions,
) -> Result<Self, Error> {
let (tx, rx) = std::sync::mpsc::sync_channel(10);
let buffer_size = 256 * 1024;
let error = Arc::new(Mutex::new(None));
let error2 = Arc::clone(&error);
let handler = async move {
let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
buffer_size,
StdChannelWriter::new(tx),
));
let verbose = options.verbose;
let writer = pxar::encoder::sync::StandardWriter::new(writer);
if let Err(err) = crate::pxar::create_archive(
dir,
writer,
crate::pxar::Flags::DEFAULT,
move |path| {
if verbose {
println!("{:?}", path);
}
Ok(())
},
Some(catalog),
options,
).await {
let mut error = error2.lock().unwrap();
*error = Some(err.to_string());
}
};
let (handle, registration) = AbortHandle::new_pair();
let future = Abortable::new(handler, registration);
tokio::spawn(future);
Ok(Self {
rx: Some(rx),
handle: Some(handle),
error,
})
}
pub fn open<W: Write + Send + 'static>(
dirname: &Path,
catalog: Arc<Mutex<CatalogWriter<W>>>,
options: crate::pxar::PxarCreateOptions,
) -> Result<Self, Error> {
let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
Self::new(
dir,
catalog,
options,
)
}
}
impl Stream for PxarBackupStream {
type Item = Result<Vec<u8>, Error>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
{
// limit lock scope
let error = self.error.lock().unwrap();
if let Some(ref msg) = *error {
return Poll::Ready(Some(Err(format_err!("{}", msg))));
}
}
match crate::tools::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
Ok(data) => Poll::Ready(Some(data)),
Err(_) => {
let error = self.error.lock().unwrap();
if let Some(ref msg) = *error {
return Poll::Ready(Some(Err(format_err!("{}", msg))));
}
Poll::Ready(None) // channel closed, no error
}
}
}
}