diff --git a/src/api2/backup.rs b/src/api2/backup.rs index 5e4b4238..8ce6d29e 100644 --- a/src/api2/backup.rs +++ b/src/api2/backup.rs @@ -10,8 +10,7 @@ use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironm use proxmox::api::router::SubdirMap; use proxmox::api::schema::*; -use crate::tools; -use crate::tools::wrapped_reader_stream::*; +use crate::tools::{self, WrappedReaderStream}; use crate::server::{WorkerTask, H2Service}; use crate::backup::*; use crate::api2::types::*; diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 4acdb8f6..e7e8b4de 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -230,7 +230,7 @@ async fn backup_directory>( verbose: bool, skip_lost_and_found: bool, crypt_config: Option>, - catalog: Arc>>, + catalog: Arc>>, entries_max: usize, ) -> Result { @@ -708,16 +708,16 @@ fn spawn_catalog_upload( crypt_config: Option>, ) -> Result< ( - Arc>>, + Arc>>, tokio::sync::oneshot::Receiver> ), Error> { - let (catalog_tx, catalog_rx) = mpsc::channel(10); // allow to buffer 10 writes - let catalog_stream = catalog_rx.map_err(Error::from); + let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes + let catalog_stream = crate::tools::StdChannelStream(catalog_rx); let catalog_chunk_size = 512*1024; let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); - let catalog = Arc::new(Mutex::new(CatalogWriter::new(SenderWriter::new(catalog_tx))?)); + let catalog = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?)); let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel(); diff --git a/src/tools.rs b/src/tools.rs index 750364f8..cca183bd 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -27,7 +27,9 @@ pub mod runtime; pub mod ticket; pub mod timer; pub mod tty; -pub mod wrapped_reader_stream; + +mod wrapped_reader_stream; +pub use wrapped_reader_stream::*; mod std_channel_writer; pub use std_channel_writer::*; diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs index 927132dc..267a0bad 100644 --- a/src/tools/wrapped_reader_stream.rs +++ b/src/tools/wrapped_reader_stream.rs @@ -1,11 +1,14 @@ use std::io::{self, Read}; use std::pin::Pin; use std::task::{Context, Poll}; +use std::sync::mpsc::Receiver; + use futures::stream::Stream; use crate::tools::runtime::block_in_place; +/// Wrapper struct to convert a Reader into a Stream pub struct WrappedReaderStream { reader: R, buffer: Vec, @@ -39,6 +42,21 @@ impl Stream for WrappedReaderStream { } } + +/// Wrapper struct to convert a channel Receiver into a Stream +pub struct StdChannelStream(pub Receiver); + +impl Stream for StdChannelStream { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + match block_in_place(|| self.0.recv()) { + Ok(data) => Poll::Ready(Some(data)), + Err(_) => Poll::Ready(None),// channel closed + } + } +} + #[cfg(test)] mod test { use std::io;