src/bin/proxmox-backup-proxy.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-29 09:45:34 +02:00
parent e9722f8bde
commit fda5797b8a

View File

@ -11,23 +11,21 @@ use proxmox::tools::try_block;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use futures::*; use futures::*;
use futures::stream::Stream;
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
use std::sync::Arc; use std::sync::Arc;
use tokio_openssl::SslAcceptorExt;
use hyper; use hyper;
fn main() { #[tokio::main]
async fn main() {
if let Err(err) = run() { if let Err(err) = run().await {
eprintln!("Error: {}", err); eprintln!("Error: {}", err);
std::process::exit(-1); std::process::exit(-1);
} }
} }
fn run() -> Result<(), Error> { async fn run() -> Result<(), Error> {
if let Err(err) = syslog::init( if let Err(err) = syslog::init(
syslog::Facility::LOG_DAEMON, syslog::Facility::LOG_DAEMON,
log::LevelFilter::Info, log::LevelFilter::Info,
@ -77,61 +75,41 @@ fn run() -> Result<(), Error> {
let connections = listener let connections = listener
.incoming() .incoming()
.map_err(Error::from) .map_err(Error::from)
.and_then(move |sock| { .try_filter_map(move |sock| {
sock.set_nodelay(true).unwrap(); let acceptor = Arc::clone(&acceptor);
sock.set_send_buffer_size(1024*1024).unwrap(); async move {
sock.set_recv_buffer_size(1024*1024).unwrap(); sock.set_nodelay(true).unwrap();
acceptor.accept_async(sock).map_err(|e| e.into()) sock.set_send_buffer_size(1024*1024).unwrap();
}) sock.set_recv_buffer_size(1024*1024).unwrap();
.then(|r| match r { Ok(tokio_openssl::accept(&acceptor, sock)
// accept()s can fail here with an Err() when eg. the client rejects .await
// the cert and closes the connection, so we follow up with mapping .ok() // handshake errors aren't be fatal, so return None to filter
// it to an option and then filtering None with filter_map )
Ok(c) => Ok::<_, Error>(Some(c)),
Err(e) => {
if let Some(_io) = e.downcast_ref::<std::io::Error>() {
// "real" IO errors should not simply be ignored
bail!("shutting down...");
} else {
// handshake errors just get filtered by filter_map() below:
Ok(None)
}
} }
})
.filter_map(|r| {
// Filter out the Nones
r
}); });
Ok(hyper::Server::builder(connections) Ok(hyper::Server::builder(connections)
.serve(rest_server) .serve(rest_server)
.with_graceful_shutdown(server::shutdown_future()) .with_graceful_shutdown(server::shutdown_future())
.map_err(|err| eprintln!("server error: {}", err)) .map_err(|err| eprintln!("server error: {}", err))
.map(|_| ())
) )
}, },
)?; )?;
daemon::systemd_notify(daemon::SystemdNotify::Ready)?; daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
tokio::run(lazy(|| { let init_result: Result<(), Error> = try_block!({
server::create_task_control_socket()?;
let init_result: Result<(), Error> = try_block!({ server::server_state_init()?;
server::create_task_control_socket()?;
server::server_state_init()?;
Ok(())
});
if let Err(err) = init_result {
eprintln!("unable to start daemon - {}", err);
} else {
tokio::spawn(server.then(|_| {
log::info!("done - exit server");
Ok(())
}));
}
Ok(()) Ok(())
})); });
if let Err(err) = init_result {
bail!("unable to start daemon - {}", err);
}
server.await;
log::info!("done - exit server");
Ok(()) Ok(())
} }