src/bin/h2s-server.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-28 16:01:10 +02:00
parent 74be6dc9b7
commit e235c8f719

View File

@ -1,26 +1,17 @@
use std::sync::Arc;
use failure::*; use failure::*;
use futures::*; use futures::*;
// Simple H2 server to test H2 speed with h2s-client.rs
use hyper::{Request, Response, Body}; use hyper::{Request, Response, Body};
use tokio::net::TcpListener; use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
use tokio::net::{TcpListener, TcpStream};
use proxmox_backup::configdir; use proxmox_backup::configdir;
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; // Simple H2 server to test H2 speed with h2s-client.rs
use std::sync::Arc;
use tokio_openssl::SslAcceptorExt;
pub fn main() -> Result<(), Error> {
start_h2_server()?;
Ok(())
}
pub fn start_h2_server() -> Result<(), Error> {
#[tokio::main]
async fn main() -> Result<(), Error> {
let key_path = configdir!("/proxy.key"); let key_path = configdir!("/proxy.key");
let cert_path = configdir!("/proxy.pem"); let cert_path = configdir!("/proxy.pem");
@ -37,70 +28,53 @@ pub fn start_h2_server() -> Result<(), Error> {
println!("listening on {:?}", listener.local_addr()); println!("listening on {:?}", listener.local_addr());
let server = listener let mut incoming = listener.incoming();
.incoming() while let Some(socket) = incoming.try_next().await? {
.map_err(Error::from) tokio::spawn(handle_connection(socket, Arc::clone(&acceptor))
.and_then(move |sock| { .map(|res| {
sock.set_nodelay(true).unwrap(); if let Err(err) = res {
sock.set_send_buffer_size(1024*1024).unwrap(); eprintln!("Error: {}", err);
sock.set_recv_buffer_size(1024*1024).unwrap();
acceptor.accept_async(sock).map_err(|e| e.into())
})
.then(|r| match r {
// accept()s can fail here with an Err() when eg. the client rejects
// the cert and closes the connection, so we follow up with mapping
// it to an option and then filtering None with filter_map
Ok(c) => Ok::<_, Error>(Some(c)),
Err(e) => {
if let Some(_io) = e.downcast_ref::<std::io::Error>() {
// "real" IO errors should not simply be ignored
bail!("shutting down...");
} else {
// handshake errors just get filtered by filter_map() below:
Ok(None)
} }
} }));
}) }
.filter_map(|r| {
// Filter out the Nones
r
})
.for_each(move |socket| {
let mut http = hyper::server::conn::Http::new();
http.http2_only(true);
// increase window size: todo - find optiomal size
let max_window_size = (1 << 31) - 2;
http.http2_initial_stream_window_size(max_window_size);
http.http2_initial_connection_window_size(max_window_size);
let service = hyper::service::service_fn(|_req: Request<Body>| {
println!("Got request");
let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...]
let body = Body::from(buffer);
let response = Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/octet-stream")
.body(body)
.unwrap();
Ok::<_, Error>(response)
});
http.serve_connection(socket, service)
.map_err(Error::from)
})
.and_then(|_| {
println!("H2 connection CLOSE !");
Ok(())
})
.then(|res| {
if let Err(e) = res {
println!(" -> err={:?}", e);
}
Ok(())
});
tokio::run(server);
Ok(()) Ok(())
} }
async fn handle_connection(
socket: TcpStream,
acceptor: Arc<SslAcceptor>,
) -> Result<(), Error> {
socket.set_nodelay(true).unwrap();
socket.set_send_buffer_size(1024*1024).unwrap();
socket.set_recv_buffer_size(1024*1024).unwrap();
let socket = tokio_openssl::accept(acceptor.as_ref(), socket).await?;
let mut http = hyper::server::conn::Http::new();
http.http2_only(true);
// increase window size: todo - find optiomal size
let max_window_size = (1 << 31) - 2;
http.http2_initial_stream_window_size(max_window_size);
http.http2_initial_connection_window_size(max_window_size);
let service = hyper::service::service_fn(|_req: Request<Body>| {
println!("Got request");
let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...]
let body = Body::from(buffer);
let response = Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/octet-stream")
.body(body)
.unwrap();
future::ok::<_, Error>(response)
});
http.serve_connection(socket, service)
.map_err(Error::from)
.await?;
println!("H2 connection CLOSE !");
Ok(())
}