fix #3359: fix blocking writes in async code during pxar create

in commit `asyncify pxar create_archive`, we changed from a
separate thread for creating a pxar to using async code, but the
StdChannelWriter used for both pxar and catalog can block, which
may block the tokio runtime for single (and probably dual) core
environments

this patch adds a wrapper struct for any writer that implements
'std::io::Write' and wraps the write calls with 'block_in_place'
so that if called in a tokio runtime, it knows that this code
potentially blocks

Fixes: 6afb60abf5 ("asyncify pxar create_archive")

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
Dominik Csapak 2021-03-23 11:12:20 +01:00 committed by Dietmar Maurer
parent 074503f288
commit f1d76ecf6c
4 changed files with 44 additions and 7 deletions

View File

@ -32,7 +32,11 @@ use proxmox::{
}; };
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
use proxmox_backup::tools; use proxmox_backup::tools::{
self,
StdChannelWriter,
TokioWriterAdapter,
};
use proxmox_backup::api2::types::*; use proxmox_backup::api2::types::*;
use proxmox_backup::api2::version; use proxmox_backup::api2::version;
use proxmox_backup::client::*; use proxmox_backup::client::*;
@ -162,7 +166,7 @@ async fn backup_directory<P: AsRef<Path>>(
dir_path: P, dir_path: P,
archive_name: &str, archive_name: &str,
chunk_size: Option<usize>, chunk_size: Option<usize>,
catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>, catalog: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter>>>>,
pxar_create_options: proxmox_backup::pxar::PxarCreateOptions, pxar_create_options: proxmox_backup::pxar::PxarCreateOptions,
upload_options: UploadOptions, upload_options: UploadOptions,
) -> Result<BackupStats, Error> { ) -> Result<BackupStats, Error> {
@ -460,7 +464,7 @@ async fn start_garbage_collection(param: Value) -> Result<Value, Error> {
} }
struct CatalogUploadResult { struct CatalogUploadResult {
catalog_writer: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>, catalog_writer: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter>>>>,
result: tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>, result: tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>,
} }
@ -473,7 +477,7 @@ fn spawn_catalog_upload(
let catalog_chunk_size = 512*1024; let catalog_chunk_size = 512*1024;
let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?)); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(StdChannelWriter::new(catalog_tx)))?));
let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel(); let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();

View File

@ -13,6 +13,10 @@ use nix::fcntl::OFlag;
use nix::sys::stat::Mode; use nix::sys::stat::Mode;
use crate::backup::CatalogWriter; use crate::backup::CatalogWriter;
use crate::tools::{
StdChannelWriter,
TokioWriterAdapter,
};
/// Stream implementation to encode and upload .pxar archives. /// Stream implementation to encode and upload .pxar archives.
/// ///
@ -45,10 +49,10 @@ impl PxarBackupStream {
let error = Arc::new(Mutex::new(None)); let error = Arc::new(Mutex::new(None));
let error2 = Arc::clone(&error); let error2 = Arc::clone(&error);
let handler = async move { let handler = async move {
let writer = std::io::BufWriter::with_capacity( let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
buffer_size, buffer_size,
crate::tools::StdChannelWriter::new(tx), StdChannelWriter::new(tx),
); ));
let verbose = options.verbose; let verbose = options.verbose;

View File

@ -57,6 +57,9 @@ pub use async_channel_writer::AsyncChannelWriter;
mod std_channel_writer; mod std_channel_writer;
pub use std_channel_writer::StdChannelWriter; pub use std_channel_writer::StdChannelWriter;
mod tokio_writer_adapter;
pub use tokio_writer_adapter::TokioWriterAdapter;
mod process_locker; mod process_locker;
pub use process_locker::{ProcessLocker, ProcessLockExclusiveGuard, ProcessLockSharedGuard}; pub use process_locker::{ProcessLocker, ProcessLockExclusiveGuard, ProcessLockSharedGuard};

View File

@ -0,0 +1,26 @@
use std::io::Write;
use tokio::task::block_in_place;
/// Wrapper around a writer which implements Write
///
/// wraps each write with a 'block_in_place' so that
/// any (blocking) writer can be safely used in async context in a
/// tokio runtime
pub struct TokioWriterAdapter<W: Write>(W);
impl<W: Write> TokioWriterAdapter<W> {
pub fn new(writer: W) -> Self {
Self(writer)
}
}
impl<W: Write> Write for TokioWriterAdapter<W> {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
block_in_place(|| self.0.write(buf))
}
fn flush(&mut self) -> Result<(), std::io::Error> {
block_in_place(|| self.0.flush())
}
}