src/bin/h2s-client.rs: switch to async
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
		| @ -1,5 +1,10 @@ | ||||
| use std::future::Future; | ||||
| use std::pin::Pin; | ||||
| use std::task::{Context, Poll}; | ||||
|  | ||||
| use failure::*; | ||||
| use futures::*; | ||||
| use futures::future::TryFutureExt; | ||||
| use futures::stream::Stream; | ||||
|  | ||||
| // Simple H2 client to test H2 download speed using h2s-server.rs | ||||
|  | ||||
| @ -10,28 +15,32 @@ struct Process { | ||||
| } | ||||
|  | ||||
| impl Future for Process { | ||||
|     type Item = usize; | ||||
|     type Error = Error; | ||||
|     type Output = Result<usize, Error>; | ||||
|  | ||||
|     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | ||||
|         let this = self.get_mut(); | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<usize, Error> { | ||||
|         loop { | ||||
|             if self.trailers { | ||||
|                 let trailers = try_ready!(self.body.poll_trailers()); | ||||
|                 if let Some(trailers) = trailers { | ||||
|                     println!("trailers: {:?}", trailers); | ||||
|             if this.trailers { | ||||
|                 match futures::ready!(this.body.poll_trailers(cx)) { | ||||
|                     Ok(Some(trailers)) => println!("trailers: {:?}", trailers), | ||||
|                     Ok(None) => (), | ||||
|                     Err(err) => return Poll::Ready(Err(Error::from(err))), | ||||
|                 } | ||||
|                 println!("Received {} bytes", self.bytes); | ||||
|  | ||||
|                 return Ok(Async::Ready(self.bytes)); | ||||
|                 println!("Received {} bytes", this.bytes); | ||||
|  | ||||
|                 return Poll::Ready(Ok(this.bytes)); | ||||
|             } else { | ||||
|                 match try_ready!(self.body.poll()) { | ||||
|                     Some(chunk) => { | ||||
|                         self.body.release_capacity().release_capacity(chunk.len())?; | ||||
|                         self.bytes += chunk.len(); | ||||
|                 match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) { | ||||
|                     Some(Ok(chunk)) => { | ||||
|                         this.body.release_capacity().release_capacity(chunk.len())?; | ||||
|                         this.bytes += chunk.len(); | ||||
|                         // println!("GOT FRAME {}", chunk.len()); | ||||
|                     }, | ||||
|                     Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), | ||||
|                     None => { | ||||
|                         self.trailers = true; | ||||
|                         this.trailers = true; | ||||
|                     }, | ||||
|                 } | ||||
|             } | ||||
| @ -39,8 +48,9 @@ impl Future for Process { | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn send_request(mut client: h2::client::SendRequest<bytes::Bytes>) -> impl Future<Item=usize, Error=Error> { | ||||
|  | ||||
| fn send_request( | ||||
|     mut client: h2::client::SendRequest<bytes::Bytes>, | ||||
| ) -> impl Future<Output = Result<usize, Error>> { | ||||
|     println!("sending request"); | ||||
|  | ||||
|     let request = http::Request::builder() | ||||
| @ -57,68 +67,60 @@ fn send_request(mut client: h2::client::SendRequest<bytes::Bytes>) -> impl Futur | ||||
|         }) | ||||
| } | ||||
|  | ||||
| pub fn main() -> Result<(), Error> { | ||||
|  | ||||
|     let tcp_stream = tokio::net::TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()); | ||||
|  | ||||
| #[tokio::main] | ||||
| async fn main() -> Result<(), Error> { | ||||
|     let start = std::time::SystemTime::now(); | ||||
|  | ||||
|     let tcp = tcp_stream | ||||
|         .map_err(Error::from) | ||||
|         .and_then(|c| { | ||||
|             c.set_nodelay(true).unwrap(); | ||||
|             c.set_recv_buffer_size(1024*1024).unwrap(); | ||||
|     let conn = tokio::net::TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()).await?; | ||||
|  | ||||
|             use openssl::ssl::*; | ||||
|             use tokio_openssl::SslConnectorExt; | ||||
|     conn.set_nodelay(true).unwrap(); | ||||
|     conn.set_recv_buffer_size(1024*1024).unwrap(); | ||||
|  | ||||
|             let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); | ||||
|             ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); | ||||
|     use openssl::ssl::{SslConnector, SslMethod}; | ||||
|  | ||||
|             let connector = ssl_connector_builder.build(); | ||||
|     let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); | ||||
|     ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); | ||||
|     let conn = | ||||
|         tokio_openssl::connect( | ||||
|             ssl_connector_builder.build().configure()?, | ||||
|             "localhost", | ||||
|             conn, | ||||
|         ) | ||||
|         .await | ||||
|         .map_err(|err| format_err!("connect failed - {}", err))?; | ||||
|  | ||||
|             connector.connect_async("localhost", c) | ||||
|                 .map_err(|err| format_err!("connect failed - {}", err)) | ||||
|         }) | ||||
|         .map_err(Error::from) | ||||
|         .and_then(|c| { | ||||
|             h2::client::Builder::new() | ||||
|                 .initial_connection_window_size(1024*1024*1024) | ||||
|                 .initial_window_size(1024*1024*1024) | ||||
|                 .max_frame_size(4*1024*1024) | ||||
|                 .handshake(c) | ||||
|                 .map_err(Error::from) | ||||
|         }) | ||||
|         .and_then(|(client, h2)| { | ||||
|     let (client, h2) = h2::client::Builder::new() | ||||
|         .initial_connection_window_size(1024*1024*1024) | ||||
|         .initial_window_size(1024*1024*1024) | ||||
|         .max_frame_size(4*1024*1024) | ||||
|         .handshake(conn) | ||||
|         .await?; | ||||
|  | ||||
|             // Spawn a task to run the conn... | ||||
|             tokio::spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); | ||||
|     // Spawn a task to run the conn... | ||||
|     tokio::spawn(async move { | ||||
|         if let Err(e) = h2.await { | ||||
|             println!("GOT ERR={:?}", e); | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|             futures::stream::repeat(()) | ||||
|                 .take(100) | ||||
|                 .and_then(move |_| send_request(client.clone())) | ||||
|                 .fold(0, move |mut acc, size| { | ||||
|                     acc += size; | ||||
|                     Ok::<_, Error>(acc) | ||||
|                 }) | ||||
|         }) | ||||
|         .then(move |result| { | ||||
|             match result { | ||||
|                 Err(err) => { | ||||
|                     println!("ERROR {}", err); | ||||
|                 } | ||||
|                 Ok(bytes) => { | ||||
|                     let elapsed = start.elapsed().unwrap(); | ||||
|                     let elapsed = (elapsed.as_secs() as f64) + | ||||
|                         (elapsed.subsec_millis() as f64)/1000.0; | ||||
|  | ||||
|                     println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); | ||||
|                 } | ||||
|     let mut bytes = 0; | ||||
|     for _ in 0..100 { | ||||
|         match send_request(client.clone()).await { | ||||
|             Ok(b) => { | ||||
|                 bytes += b; | ||||
|             } | ||||
|             Ok(()) | ||||
|         }); | ||||
|             Err(e) => { | ||||
|                 println!("ERROR {}", e); | ||||
|                 return Ok(()); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     tokio::run(tcp); | ||||
|     let elapsed = start.elapsed().unwrap(); | ||||
|     let elapsed = (elapsed.as_secs() as f64) + | ||||
|         (elapsed.subsec_millis() as f64)/1000.0; | ||||
|  | ||||
|     println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
		Reference in New Issue
	
	Block a user