From 2107bb40c1f50ac7c7a474933242ec165cf384f0 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Fri, 23 Aug 2019 13:43:17 +0200 Subject: [PATCH] src/client/pipe_to_stream.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/client/pipe_to_stream.rs | 49 ++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/src/client/pipe_to_stream.rs b/src/client/pipe_to_stream.rs index 5968f02f..d67a3833 100644 --- a/src/client/pipe_to_stream.rs +++ b/src/client/pipe_to_stream.rs @@ -2,9 +2,12 @@ // // See also: hyper/src/proto/h2/mod.rs +use std::pin::Pin; +use std::task::{Context, Poll}; + use bytes::Bytes; use failure::*; -use futures::{try_ready, Async, Future, Poll}; +use futures::{ready, Future}; use h2::SendStream; pub struct PipeToSendStream { @@ -22,41 +25,49 @@ impl PipeToSendStream { } impl Future for PipeToSendStream { - type Item = (); - type Error = Error; + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); - fn poll(&mut self) -> Poll { loop { - if self.data != None { + if this.data != None { // just reserve 1 byte to make sure there's some // capacity available. h2 will handle the capacity // management for the actual body chunk. - self.body_tx.reserve_capacity(1); + this.body_tx.reserve_capacity(1); - if self.body_tx.capacity() == 0 { + if this.body_tx.capacity() == 0 { loop { - match try_ready!(self.body_tx.poll_capacity().map_err(Error::from)) { - Some(0) => {} - Some(_) => break, - None => return Err(format_err!("protocol canceled")), + match ready!(this.body_tx.poll_capacity(cx)) { + Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), + Some(Ok(0)) => {} + Some(Ok(_)) => break, + None => return Poll::Ready(Err(format_err!("protocol canceled"))), } } } else { - if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? { - return Err(format_err!("stream received RST_STREAM: {:?}", reason)); + if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) { + return Poll::Ready(Err(match reset { + Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason), + Err(err) => Error::from(err), + })); } } - self.body_tx - .send_data(self.data.take().unwrap(), true) + this.body_tx + .send_data(this.data.take().unwrap(), true) .map_err(Error::from)?; - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } else { - if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? { - return Err(format_err!("stream received RST_STREAM: {:?}", reason)); + if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) { + return Poll::Ready(Err(match reset { + Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason), + Err(err) => Error::from(err), + })); } - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } } }