From 55d8a631fc00d19114ccd6b120bad48c26e2a908 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Wed, 28 Aug 2019 15:05:45 +0200 Subject: [PATCH] src/bin/h2s-client.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/bin/h2s-client.rs | 140 +++++++++++++++++++++--------------------- 1 file changed, 71 insertions(+), 69 deletions(-) diff --git a/src/bin/h2s-client.rs b/src/bin/h2s-client.rs index a26191e2..3eec67fe 100644 --- a/src/bin/h2s-client.rs +++ b/src/bin/h2s-client.rs @@ -1,5 +1,10 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + use failure::*; -use futures::*; +use futures::future::TryFutureExt; +use futures::stream::Stream; // Simple H2 client to test H2 download speed using h2s-server.rs @@ -10,28 +15,32 @@ struct Process { } impl Future for Process { - type Item = usize; - type Error = Error; + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); - fn poll(&mut self) -> Poll { loop { - if self.trailers { - let trailers = try_ready!(self.body.poll_trailers()); - if let Some(trailers) = trailers { - println!("trailers: {:?}", trailers); + if this.trailers { + match futures::ready!(this.body.poll_trailers(cx)) { + Ok(Some(trailers)) => println!("trailers: {:?}", trailers), + Ok(None) => (), + Err(err) => return Poll::Ready(Err(Error::from(err))), } - println!("Received {} bytes", self.bytes); - return Ok(Async::Ready(self.bytes)); + println!("Received {} bytes", this.bytes); + + return Poll::Ready(Ok(this.bytes)); } else { - match try_ready!(self.body.poll()) { - Some(chunk) => { - self.body.release_capacity().release_capacity(chunk.len())?; - self.bytes += chunk.len(); + match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) { + Some(Ok(chunk)) => { + this.body.release_capacity().release_capacity(chunk.len())?; + this.bytes += chunk.len(); // println!("GOT FRAME {}", chunk.len()); }, + Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), None => { - self.trailers = true; + this.trailers = true; }, } } @@ -39,8 +48,9 @@ impl Future for Process { } } -fn send_request(mut client: h2::client::SendRequest) -> impl Future { - +fn send_request( + mut client: h2::client::SendRequest, +) -> impl Future> { println!("sending request"); let request = http::Request::builder() @@ -57,68 +67,60 @@ fn send_request(mut client: h2::client::SendRequest) -> impl Futur }) } -pub fn main() -> Result<(), Error> { - - let tcp_stream = tokio::net::TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()); - +#[tokio::main] +async fn main() -> Result<(), Error> { let start = std::time::SystemTime::now(); - let tcp = tcp_stream - .map_err(Error::from) - .and_then(|c| { - c.set_nodelay(true).unwrap(); - c.set_recv_buffer_size(1024*1024).unwrap(); + let conn = tokio::net::TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()).await?; - use openssl::ssl::*; - use tokio_openssl::SslConnectorExt; + conn.set_nodelay(true).unwrap(); + conn.set_recv_buffer_size(1024*1024).unwrap(); - let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); - ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); + use openssl::ssl::{SslConnector, SslMethod}; - let connector = ssl_connector_builder.build(); + let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); + ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); + let conn = + tokio_openssl::connect( + ssl_connector_builder.build().configure()?, + "localhost", + conn, + ) + .await + .map_err(|err| format_err!("connect failed - {}", err))?; - connector.connect_async("localhost", c) - .map_err(|err| format_err!("connect failed - {}", err)) - }) - .map_err(Error::from) - .and_then(|c| { - h2::client::Builder::new() - .initial_connection_window_size(1024*1024*1024) - .initial_window_size(1024*1024*1024) - .max_frame_size(4*1024*1024) - .handshake(c) - .map_err(Error::from) - }) - .and_then(|(client, h2)| { + let (client, h2) = h2::client::Builder::new() + .initial_connection_window_size(1024*1024*1024) + .initial_window_size(1024*1024*1024) + .max_frame_size(4*1024*1024) + .handshake(conn) + .await?; - // Spawn a task to run the conn... - tokio::spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); + // Spawn a task to run the conn... + tokio::spawn(async move { + if let Err(e) = h2.await { + println!("GOT ERR={:?}", e); + } + }); - futures::stream::repeat(()) - .take(100) - .and_then(move |_| send_request(client.clone())) - .fold(0, move |mut acc, size| { - acc += size; - Ok::<_, Error>(acc) - }) - }) - .then(move |result| { - match result { - Err(err) => { - println!("ERROR {}", err); - } - Ok(bytes) => { - let elapsed = start.elapsed().unwrap(); - let elapsed = (elapsed.as_secs() as f64) + - (elapsed.subsec_millis() as f64)/1000.0; - - println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); - } + let mut bytes = 0; + for _ in 0..100 { + match send_request(client.clone()).await { + Ok(b) => { + bytes += b; } - Ok(()) - }); + Err(e) => { + println!("ERROR {}", e); + return Ok(()); + } + } + } - tokio::run(tcp); + let elapsed = start.elapsed().unwrap(); + let elapsed = (elapsed.as_secs() as f64) + + (elapsed.subsec_millis() as f64)/1000.0; + + println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); Ok(()) }