src/client/pipe_to_stream.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-23 13:43:17 +02:00
parent 5b3911995b
commit 2107bb40c1

View File

@ -2,9 +2,12 @@
// //
// See also: hyper/src/proto/h2/mod.rs // See also: hyper/src/proto/h2/mod.rs
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::Bytes; use bytes::Bytes;
use failure::*; use failure::*;
use futures::{try_ready, Async, Future, Poll}; use futures::{ready, Future};
use h2::SendStream; use h2::SendStream;
pub struct PipeToSendStream { pub struct PipeToSendStream {
@ -22,41 +25,49 @@ impl PipeToSendStream {
} }
impl Future for PipeToSendStream { impl Future for PipeToSendStream {
type Item = (); type Output = Result<(), Error>;
type Error = Error;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop { loop {
if self.data != None { if this.data != None {
// just reserve 1 byte to make sure there's some // just reserve 1 byte to make sure there's some
// capacity available. h2 will handle the capacity // capacity available. h2 will handle the capacity
// management for the actual body chunk. // management for the actual body chunk.
self.body_tx.reserve_capacity(1); this.body_tx.reserve_capacity(1);
if self.body_tx.capacity() == 0 { if this.body_tx.capacity() == 0 {
loop { loop {
match try_ready!(self.body_tx.poll_capacity().map_err(Error::from)) { match ready!(this.body_tx.poll_capacity(cx)) {
Some(0) => {} Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
Some(_) => break, Some(Ok(0)) => {}
None => return Err(format_err!("protocol canceled")), Some(Ok(_)) => break,
None => return Poll::Ready(Err(format_err!("protocol canceled"))),
} }
} }
} else { } else {
if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? { if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) {
return Err(format_err!("stream received RST_STREAM: {:?}", reason)); return Poll::Ready(Err(match reset {
Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason),
Err(err) => Error::from(err),
}));
} }
} }
self.body_tx this.body_tx
.send_data(self.data.take().unwrap(), true) .send_data(this.data.take().unwrap(), true)
.map_err(Error::from)?; .map_err(Error::from)?;
return Ok(Async::Ready(())); return Poll::Ready(Ok(()));
} else { } else {
if let Async::Ready(reason) = self.body_tx.poll_reset().map_err(Error::from)? { if let Poll::Ready(reset) = this.body_tx.poll_reset(cx) {
return Err(format_err!("stream received RST_STREAM: {:?}", reason)); return Poll::Ready(Err(match reset {
Ok(reason) => format_err!("stream received RST_STREAM: {:?}", reason),
Err(err) => Error::from(err),
}));
} }
return Ok(Async::Ready(())); return Poll::Ready(Ok(()));
} }
} }
} }