src/bin/h2client.rs: switch to async
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
		| @ -1,10 +1,14 @@ | ||||
| use std::future::Future; | ||||
| use std::pin::Pin; | ||||
| use std::task::{Context, Poll}; | ||||
|  | ||||
| use failure::*; | ||||
| use futures::*; | ||||
| use futures::future::TryFutureExt; | ||||
| use futures::stream::Stream; | ||||
| use tokio::net::TcpStream; | ||||
|  | ||||
| // Simple H2 client to test H2 download speed using h2server.rs | ||||
|  | ||||
| use tokio::net::TcpStream; | ||||
|  | ||||
| struct Process { | ||||
|     body: h2::RecvStream, | ||||
|     trailers: bool, | ||||
| @ -12,28 +16,32 @@ struct Process { | ||||
| } | ||||
|  | ||||
| impl Future for Process { | ||||
|     type Item = usize; | ||||
|     type Error = Error; | ||||
|     type Output = Result<usize, Error>; | ||||
|  | ||||
|     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | ||||
|         let this = self.get_mut(); | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<usize, Error> { | ||||
|         loop { | ||||
|             if self.trailers { | ||||
|                 let trailers = try_ready!(self.body.poll_trailers()); | ||||
|                 if let Some(trailers) = trailers { | ||||
|                     println!("trailers: {:?}", trailers); | ||||
|             if this.trailers { | ||||
|                 match futures::ready!(this.body.poll_trailers(cx)) { | ||||
|                     Ok(Some(trailers)) => println!("trailers: {:?}", trailers), | ||||
|                     Ok(None) => (), | ||||
|                     Err(err) => return Poll::Ready(Err(Error::from(err))), | ||||
|                 } | ||||
|                 println!("Received {} bytes", self.bytes); | ||||
|  | ||||
|                 return Ok(Async::Ready(self.bytes)); | ||||
|                 println!("Received {} bytes", this.bytes); | ||||
|  | ||||
|                 return Poll::Ready(Ok(this.bytes)); | ||||
|             } else { | ||||
|                 match try_ready!(self.body.poll()) { | ||||
|                     Some(chunk) => { | ||||
|                         self.body.release_capacity().release_capacity(chunk.len())?; | ||||
|                         self.bytes += chunk.len(); | ||||
|                 match futures::ready!(Pin::new(&mut this.body).poll_next(cx)) { | ||||
|                     Some(Ok(chunk)) => { | ||||
|                         this.body.release_capacity().release_capacity(chunk.len())?; | ||||
|                         this.bytes += chunk.len(); | ||||
|                         // println!("GOT FRAME {}", chunk.len()); | ||||
|                     }, | ||||
|                     Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), | ||||
|                     None => { | ||||
|                         self.trailers = true; | ||||
|                         this.trailers = true; | ||||
|                     }, | ||||
|                 } | ||||
|             } | ||||
| @ -41,7 +49,9 @@ impl Future for Process { | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn send_request(mut client: h2::client::SendRequest<bytes::Bytes>) -> impl Future<Item=usize, Error=Error> { | ||||
| fn send_request( | ||||
|     mut client: h2::client::SendRequest<bytes::Bytes>, | ||||
| ) -> impl Future<Output = Result<usize, Error>> { | ||||
|  | ||||
|     println!("sending request"); | ||||
|  | ||||
| @ -59,52 +69,37 @@ fn send_request(mut client: h2::client::SendRequest<bytes::Bytes>) -> impl Futur | ||||
|         }) | ||||
| } | ||||
|  | ||||
| pub fn main() -> Result<(), Error> { | ||||
|  | ||||
|     let tcp_stream = TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()); | ||||
| #[tokio::main] | ||||
| async fn main() -> Result<(), Error> { | ||||
|  | ||||
|     let start = std::time::SystemTime::now(); | ||||
|  | ||||
|     let tcp = tcp_stream | ||||
|         .map_err(Error::from) | ||||
|         .and_then(|c| { | ||||
|             h2::client::Builder::new() | ||||
|                 .initial_connection_window_size(1024*1024*1024) | ||||
|                 .initial_window_size(1024*1024*1024) | ||||
|                 .max_frame_size(4*1024*1024) | ||||
|                 .handshake(c) | ||||
|                 .map_err(Error::from) | ||||
|         }) | ||||
|         .and_then(|(client, h2)| { | ||||
|     let conn = TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()) | ||||
|         .await?; | ||||
|  | ||||
|             // Spawn a task to run the conn... | ||||
|             tokio::spawn(h2.map_err(|e| println!("GOT ERR={:?}", e))); | ||||
|     let (client, h2) = h2::client::Builder::new() | ||||
|         .initial_connection_window_size(1024*1024*1024) | ||||
|         .initial_window_size(1024*1024*1024) | ||||
|         .max_frame_size(4*1024*1024) | ||||
|         .handshake(conn) | ||||
|         .await?; | ||||
|  | ||||
|             futures::stream::repeat(()) | ||||
|                 .take(2000) | ||||
|                 .and_then(move |_| send_request(client.clone())) | ||||
|                 .fold(0, move |mut acc, size| { | ||||
|                     acc += size; | ||||
|                     Ok::<_, Error>(acc) | ||||
|                 }) | ||||
|         }) | ||||
|         .then(move |result| { | ||||
|             match result { | ||||
|                 Err(err) => { | ||||
|                     println!("ERROR {}", err); | ||||
|                 } | ||||
|                 Ok(bytes) => { | ||||
|                     let elapsed = start.elapsed().unwrap(); | ||||
|                     let elapsed = (elapsed.as_secs() as f64) + | ||||
|                         (elapsed.subsec_millis() as f64)/1000.0; | ||||
|     tokio::spawn(async move { | ||||
|         if let Err(err) = h2.await { | ||||
|             println!("GOT ERR={:?}", err); | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|                     println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); | ||||
|                 } | ||||
|             } | ||||
|             Ok(()) | ||||
|         }); | ||||
|     let mut bytes = 0; | ||||
|     for _ in 0..2000 { | ||||
|         bytes += send_request(client.clone()).await?; | ||||
|     } | ||||
|  | ||||
|     tokio::run(tcp); | ||||
|     let elapsed = start.elapsed().unwrap(); | ||||
|     let elapsed = (elapsed.as_secs() as f64) + | ||||
|         (elapsed.subsec_millis() as f64)/1000.0; | ||||
|  | ||||
|     println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
		Reference in New Issue
	
	Block a user