src/bin/proxmox-backup-client.rs: use a std channel to write the catalog

This commit is contained in:
Dietmar Maurer 2020-01-22 12:49:08 +01:00
parent 02141b4d9b
commit f1d99e3f6a
4 changed files with 27 additions and 8 deletions

View File

@ -10,8 +10,7 @@ use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironm
use proxmox::api::router::SubdirMap; use proxmox::api::router::SubdirMap;
use proxmox::api::schema::*; use proxmox::api::schema::*;
use crate::tools; use crate::tools::{self, WrappedReaderStream};
use crate::tools::wrapped_reader_stream::*;
use crate::server::{WorkerTask, H2Service}; use crate::server::{WorkerTask, H2Service};
use crate::backup::*; use crate::backup::*;
use crate::api2::types::*; use crate::api2::types::*;

View File

@ -230,7 +230,7 @@ async fn backup_directory<P: AsRef<Path>>(
verbose: bool, verbose: bool,
skip_lost_and_found: bool, skip_lost_and_found: bool,
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
catalog: Arc<Mutex<CatalogWriter<SenderWriter>>>, catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
entries_max: usize, entries_max: usize,
) -> Result<BackupStats, Error> { ) -> Result<BackupStats, Error> {
@ -708,16 +708,16 @@ fn spawn_catalog_upload(
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
) -> Result< ) -> Result<
( (
Arc<Mutex<CatalogWriter<SenderWriter>>>, Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
tokio::sync::oneshot::Receiver<Result<BackupStats, Error>> tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>
), Error> ), Error>
{ {
let (catalog_tx, catalog_rx) = mpsc::channel(10); // allow to buffer 10 writes let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
let catalog_stream = catalog_rx.map_err(Error::from); let catalog_stream = crate::tools::StdChannelStream(catalog_rx);
let catalog_chunk_size = 512*1024; let catalog_chunk_size = 512*1024;
let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
let catalog = Arc::new(Mutex::new(CatalogWriter::new(SenderWriter::new(catalog_tx))?)); let catalog = Arc::new(Mutex::new(CatalogWriter::new(crate::tools::StdChannelWriter::new(catalog_tx))?));
let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel(); let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();

View File

@ -27,7 +27,9 @@ pub mod runtime;
pub mod ticket; pub mod ticket;
pub mod timer; pub mod timer;
pub mod tty; pub mod tty;
pub mod wrapped_reader_stream;
mod wrapped_reader_stream;
pub use wrapped_reader_stream::*;
mod std_channel_writer; mod std_channel_writer;
pub use std_channel_writer::*; pub use std_channel_writer::*;

View File

@ -1,11 +1,14 @@
use std::io::{self, Read}; use std::io::{self, Read};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::sync::mpsc::Receiver;
use futures::stream::Stream; use futures::stream::Stream;
use crate::tools::runtime::block_in_place; use crate::tools::runtime::block_in_place;
/// Wrapper struct to convert a Reader into a Stream
pub struct WrappedReaderStream<R: Read + Unpin> { pub struct WrappedReaderStream<R: Read + Unpin> {
reader: R, reader: R,
buffer: Vec<u8>, buffer: Vec<u8>,
@ -39,6 +42,21 @@ impl<R: Read + Unpin> Stream for WrappedReaderStream<R> {
} }
} }
/// Wrapper struct to convert a channel Receiver into a Stream
pub struct StdChannelStream<T>(pub Receiver<T>);
impl<T> Stream for StdChannelStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
match block_in_place(|| self.0.recv()) {
Ok(data) => Poll::Ready(Some(data)),
Err(_) => Poll::Ready(None),// channel closed
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::io; use std::io;