From 7c66701366057eda6490598e89eb538e285125ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20Gr=C3=BCnbichler?= Date: Mon, 11 Jan 2021 09:50:04 +0100 Subject: [PATCH] tokio 1.0: use ReceiverStream from tokio-stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit to wrap a Receiver in a Stream. this will likely move back into tokio proper once we have a std Stream.. Signed-off-by: Fabian Grünbichler --- src/api2/admin/datastore.rs | 3 ++- src/bin/proxmox-backup-client.rs | 3 ++- src/bin/proxmox-backup-proxy.rs | 3 ++- src/client/backup_writer.rs | 5 +++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 32352e5c..5b9a1e84 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -10,6 +10,7 @@ use futures::*; use hyper::http::request::Parts; use hyper::{header, Body, Response, StatusCode}; use serde_json::{json, Value}; +use tokio_stream::wrappers::ReceiverStream; use proxmox::api::{ api, ApiResponseFuture, ApiHandler, ApiMethod, Router, @@ -1562,7 +1563,7 @@ fn pxar_file_download( .map_err(|err| eprintln!("error during finishing of zip: {}", err)) }); - Body::wrap_stream(receiver.map_err(move |err| { + Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| { eprintln!("error during streaming of zip '{:?}' - {}", filepath, err); err })) diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index b8f09a4a..d91f04cc 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -12,6 +12,7 @@ use futures::future::FutureExt; use futures::stream::{StreamExt, TryStreamExt}; use serde_json::{json, Value}; use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use xdg::BaseDirectories; use pathpatterns::{MatchEntry, MatchType, PatternFlag}; @@ -306,7 +307,7 @@ async fn backup_directory>( let (mut tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks - let stream = rx + let stream = ReceiverStream::new(rx) .map_err(Error::from); // spawn chunker inside a separate task so that it can run parallel diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 2228253d..16450244 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -6,6 +6,7 @@ use anyhow::{bail, format_err, Error}; use futures::*; use hyper; use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; +use tokio_stream::wrappers::ReceiverStream; use proxmox::try_block; use proxmox::api::RpcEnvironmentType; @@ -122,7 +123,7 @@ async fn run() -> Result<(), Error> { |listener, ready| { let connections = accept_connections(listener, acceptor, debug); - let connections = hyper::server::accept::from_stream(connections); + let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections)); Ok(ready .and_then(|_| hyper::Server::builder(connections) diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs index 39cd574d..bcbd6f28 100644 --- a/src/client/backup_writer.rs +++ b/src/client/backup_writer.rs @@ -10,6 +10,7 @@ use futures::future::AbortHandle; use serde_json::{json, Value}; use tokio::io::AsyncReadExt; use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::ReceiverStream; use proxmox::tools::digest_to_hex; @@ -321,7 +322,7 @@ impl BackupWriter { // }); // old code for reference? tokio::spawn( - verify_queue_rx + ReceiverStream::new(verify_queue_rx) .map(Ok::<_, Error>) .try_for_each(move |response: h2::client::ResponseFuture| { response @@ -349,7 +350,7 @@ impl BackupWriter { // FIXME: async-block-ify this code! tokio::spawn( - verify_queue_rx + ReceiverStream::new(verify_queue_rx) .map(Ok::<_, Error>) .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { match (response, merged_chunk_info) {