diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 5aae0873..45b26c7a 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -32,7 +32,11 @@ use proxmox::{ }; use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation}; -use proxmox_backup::tools; +use proxmox_backup::tools::{ + self, + StdChannelWriter, + TokioWriterAdapter, +}; use proxmox_backup::api2::types::*; use proxmox_backup::api2::version; use proxmox_backup::client::*; @@ -162,7 +166,7 @@ async fn backup_directory>( dir_path: P, archive_name: &str, chunk_size: Option, - catalog: Arc>>, + catalog: Arc>>>, pxar_create_options: proxmox_backup::pxar::PxarCreateOptions, upload_options: UploadOptions, ) -> Result { @@ -460,7 +464,7 @@ async fn start_garbage_collection(param: Value) -> Result { } struct CatalogUploadResult { - catalog_writer: Arc>>, + catalog_writer: Arc>>>, result: tokio::sync::oneshot::Receiver>, } @@ -473,7 +477,7 @@ fn spawn_catalog_upload( let catalog_chunk_size = 512*1024; let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); - let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?)); + let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(StdChannelWriter::new(catalog_tx)))?)); let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel(); diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs index b57061a3..035f735c 100644 --- a/src/client/pxar_backup_stream.rs +++ b/src/client/pxar_backup_stream.rs @@ -13,6 +13,10 @@ use nix::fcntl::OFlag; use nix::sys::stat::Mode; use crate::backup::CatalogWriter; +use crate::tools::{ + StdChannelWriter, + TokioWriterAdapter, +}; /// Stream implementation to encode and upload .pxar archives. /// @@ -45,10 +49,10 @@ impl PxarBackupStream { let error = Arc::new(Mutex::new(None)); let error2 = Arc::clone(&error); let handler = async move { - let writer = std::io::BufWriter::with_capacity( + let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( buffer_size, - crate::tools::StdChannelWriter::new(tx), - ); + StdChannelWriter::new(tx), + )); let verbose = options.verbose; diff --git a/src/tools.rs b/src/tools.rs index cc782da2..7e3bff7b 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -57,6 +57,9 @@ pub use async_channel_writer::AsyncChannelWriter; mod std_channel_writer; pub use std_channel_writer::StdChannelWriter; +mod tokio_writer_adapter; +pub use tokio_writer_adapter::TokioWriterAdapter; + mod process_locker; pub use process_locker::{ProcessLocker, ProcessLockExclusiveGuard, ProcessLockSharedGuard}; diff --git a/src/tools/tokio_writer_adapter.rs b/src/tools/tokio_writer_adapter.rs new file mode 100644 index 00000000..7b7f5dcf --- /dev/null +++ b/src/tools/tokio_writer_adapter.rs @@ -0,0 +1,26 @@ +use std::io::Write; + +use tokio::task::block_in_place; + +/// Wrapper around a writer which implements Write +/// +/// wraps each write with a 'block_in_place' so that +/// any (blocking) writer can be safely used in async context in a +/// tokio runtime +pub struct TokioWriterAdapter(W); + +impl TokioWriterAdapter { + pub fn new(writer: W) -> Self { + Self(writer) + } +} + +impl Write for TokioWriterAdapter { + fn write(&mut self, buf: &[u8]) -> Result { + block_in_place(|| self.0.write(buf)) + } + + fn flush(&mut self) -> Result<(), std::io::Error> { + block_in_place(|| self.0.flush()) + } +}