tokio 1.0: use ReceiverStream from tokio-stream

to wrap a Receiver in a Stream. this will likely move back into tokio
proper once we have a std Stream..

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2021-01-11 09:50:04 +01:00
parent 585e90c0de
commit 7c66701366
4 changed files with 9 additions and 5 deletions

View File

@ -10,6 +10,7 @@ use futures::*;
use hyper::http::request::Parts; use hyper::http::request::Parts;
use hyper::{header, Body, Response, StatusCode}; use hyper::{header, Body, Response, StatusCode};
use serde_json::{json, Value}; use serde_json::{json, Value};
use tokio_stream::wrappers::ReceiverStream;
use proxmox::api::{ use proxmox::api::{
api, ApiResponseFuture, ApiHandler, ApiMethod, Router, api, ApiResponseFuture, ApiHandler, ApiMethod, Router,
@ -1562,7 +1563,7 @@ fn pxar_file_download(
.map_err(|err| eprintln!("error during finishing of zip: {}", err)) .map_err(|err| eprintln!("error during finishing of zip: {}", err))
}); });
Body::wrap_stream(receiver.map_err(move |err| { Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
eprintln!("error during streaming of zip '{:?}' - {}", filepath, err); eprintln!("error during streaming of zip '{:?}' - {}", filepath, err);
err err
})) }))

View File

@ -12,6 +12,7 @@ use futures::future::FutureExt;
use futures::stream::{StreamExt, TryStreamExt}; use futures::stream::{StreamExt, TryStreamExt};
use serde_json::{json, Value}; use serde_json::{json, Value};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use xdg::BaseDirectories; use xdg::BaseDirectories;
use pathpatterns::{MatchEntry, MatchType, PatternFlag}; use pathpatterns::{MatchEntry, MatchType, PatternFlag};
@ -306,7 +307,7 @@ async fn backup_directory<P: AsRef<Path>>(
let (mut tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let (mut tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
let stream = rx let stream = ReceiverStream::new(rx)
.map_err(Error::from); .map_err(Error::from);
// spawn chunker inside a separate task so that it can run parallel // spawn chunker inside a separate task so that it can run parallel

View File

@ -6,6 +6,7 @@ use anyhow::{bail, format_err, Error};
use futures::*; use futures::*;
use hyper; use hyper;
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
use tokio_stream::wrappers::ReceiverStream;
use proxmox::try_block; use proxmox::try_block;
use proxmox::api::RpcEnvironmentType; use proxmox::api::RpcEnvironmentType;
@ -122,7 +123,7 @@ async fn run() -> Result<(), Error> {
|listener, ready| { |listener, ready| {
let connections = accept_connections(listener, acceptor, debug); let connections = accept_connections(listener, acceptor, debug);
let connections = hyper::server::accept::from_stream(connections); let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections));
Ok(ready Ok(ready
.and_then(|_| hyper::Server::builder(connections) .and_then(|_| hyper::Server::builder(connections)

View File

@ -10,6 +10,7 @@ use futures::future::AbortHandle;
use serde_json::{json, Value}; use serde_json::{json, Value};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use proxmox::tools::digest_to_hex; use proxmox::tools::digest_to_hex;
@ -321,7 +322,7 @@ impl BackupWriter {
// }); // });
// old code for reference? // old code for reference?
tokio::spawn( tokio::spawn(
verify_queue_rx ReceiverStream::new(verify_queue_rx)
.map(Ok::<_, Error>) .map(Ok::<_, Error>)
.try_for_each(move |response: h2::client::ResponseFuture| { .try_for_each(move |response: h2::client::ResponseFuture| {
response response
@ -349,7 +350,7 @@ impl BackupWriter {
// FIXME: async-block-ify this code! // FIXME: async-block-ify this code!
tokio::spawn( tokio::spawn(
verify_queue_rx ReceiverStream::new(verify_queue_rx)
.map(Ok::<_, Error>) .map(Ok::<_, Error>)
.and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| { .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
match (response, merged_chunk_info) { match (response, merged_chunk_info) {