diff --git a/Cargo.toml b/Cargo.toml index 5f305f00..bec61bb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ futures-preview = "0.3.0-alpha" h2 = { version = "0.2.0-alpha", git = "https://github.com/hyperium/h2", features = ["stream"] } http = "0.1" hyper = { version = "0.13.0-a.0", git = "https://github.com/hyperium/hyper" } -hyper-openssl = { version = "0.7", path = "hyper-openssl" } lazy_static = "1.3" libc = "0.2" log = "0.4" @@ -39,9 +38,9 @@ shellwords = "1.0" siphasher = "0.3" syslog = "4.0" textwrap = "0.11" -tokio = { version = "0.2.0-alpha.2" } -tokio-executor = { version = "0.2.0-alpha.2" } -tokio-net = { version = "0.2.0-alpha.2", features = ["signal"] } +tokio = { version = "0.2.0-alpha.4" } +tokio-executor = { version = "0.2.0-alpha.4" } +tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] } tokio-openssl = "0.4.0-alpha.2" tower-service = "0.3.0-alpha.1" url = "1.7" diff --git a/src/bin/h2client.rs b/src/bin/h2client.rs index dde90589..6abb014b 100644 --- a/src/bin/h2client.rs +++ b/src/bin/h2client.rs @@ -74,7 +74,7 @@ async fn main() -> Result<(), Error> { let start = std::time::SystemTime::now(); - let conn = TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()) + let conn = TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))) .await?; let (client, h2) = h2::client::Builder::new() diff --git a/src/bin/h2s-client.rs b/src/bin/h2s-client.rs index 3eec67fe..70bb088e 100644 --- a/src/bin/h2s-client.rs +++ b/src/bin/h2s-client.rs @@ -71,7 +71,8 @@ fn send_request( async fn main() -> Result<(), Error> { let start = std::time::SystemTime::now(); - let conn = tokio::net::TcpStream::connect(&"127.0.0.1:8008".parse().unwrap()).await?; + let conn = + tokio::net::TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; conn.set_nodelay(true).unwrap(); conn.set_recv_buffer_size(1024*1024).unwrap(); diff --git a/src/bin/h2s-server.rs b/src/bin/h2s-server.rs index 42c9cc19..b8c7926a 100644 --- a/src/bin/h2s-server.rs +++ b/src/bin/h2s-server.rs @@ -24,7 +24,7 @@ async fn main() -> Result<(), Error> { let acceptor = Arc::new(acceptor.build()); - let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap(); + let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; println!("listening on {:?}", listener.local_addr()); diff --git a/src/bin/h2server.rs b/src/bin/h2server.rs index b275c0ca..8477ec71 100644 --- a/src/bin/h2server.rs +++ b/src/bin/h2server.rs @@ -10,7 +10,7 @@ use proxmox_backup::client::pipe_to_stream::PipeToSendStream; #[tokio::main] async fn main() -> Result<(), Error> { - let listener = TcpListener::bind(&"127.0.0.1:8008".parse().unwrap()).unwrap(); + let listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; println!("listening on {:?}", listener.local_addr()); diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index d09955a4..1537503f 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -55,18 +55,21 @@ async fn run() -> Result<(), Error> { // http server future: let server = daemon::create_daemon( ([127,0,0,1], 82).into(), - move |listener| { - Ok(hyper::Server::builder(listener.incoming()) - .serve(rest_server) - .with_graceful_shutdown(server::shutdown_future()) - .map(|e| { - if let Err(e) = e { - eprintln!("server error: {}", e); - } - }) + move |listener, ready| { + Ok(ready + .and_then(|_| hyper::Server::builder(listener.incoming()) + .serve(rest_server) + .with_graceful_shutdown(server::shutdown_future()) + .map_err(Error::from) + ) + .map(|e| { + if let Err(e) = e { + eprintln!("server error: {}", e); + } + }) ) }, - )?; + ); daemon::systemd_notify(daemon::SystemdNotify::Ready)?; @@ -80,7 +83,7 @@ async fn run() -> Result<(), Error> { bail!("unable to start daemon - {}", err); } - server.await; + server.await?; log::info!("done - exit server"); diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 03ae821e..9c052f09 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -71,7 +71,7 @@ async fn run() -> Result<(), Error> { let server = daemon::create_daemon( ([0,0,0,0,0,0,0,0], 8007).into(), - |listener| { + |listener, ready| { let connections = listener .incoming() .map_err(Error::from) @@ -87,14 +87,18 @@ async fn run() -> Result<(), Error> { ) } }); - Ok(hyper::Server::builder(connections) - .serve(rest_server) - .with_graceful_shutdown(server::shutdown_future()) - .map_err(|err| eprintln!("server error: {}", err)) - .map(|_| ()) + + Ok(ready + .and_then(|_| hyper::Server::builder(connections) + .serve(rest_server) + .with_graceful_shutdown(server::shutdown_future()) + .map_err(Error::from) + ) + .map_err(|err| eprintln!("server error: {}", err)) + .map(|_| ()) ) }, - )?; + ); daemon::systemd_notify(daemon::SystemdNotify::Ready)?; @@ -108,7 +112,7 @@ async fn run() -> Result<(), Error> { bail!("unable to start daemon - {}", err); } - server.await; + server.await?; log::info!("done - exit server"); Ok(()) diff --git a/src/tools/daemon.rs b/src/tools/daemon.rs index 08fcee90..97215d54 100644 --- a/src/tools/daemon.rs +++ b/src/tools/daemon.rs @@ -1,13 +1,15 @@ //! Helpers for daemons/services. use std::ffi::CString; +use std::future::Future; use std::os::raw::{c_char, c_int}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::ffi::OsStrExt; use std::panic::UnwindSafe; +use std::pin::Pin; +use std::task::{Context, Poll}; use failure::*; -use tokio::prelude::*; use proxmox::tools::io::{ReadExt, WriteExt}; @@ -48,14 +50,15 @@ impl Reloader { /// the function provided in the `or_create` parameter to instantiate the new "first" instance. /// /// Values created via this method will be remembered for later re-execution. - pub fn restore(&mut self, name: &'static str, or_create: F) -> Result + pub async fn restore(&mut self, name: &'static str, or_create: F) -> Result where T: Reloadable, - F: FnOnce() -> Result, + F: FnOnce() -> U, + U: Future>, { let res = match std::env::var(name) { Ok(varstr) => T::restore(&varstr)?, - Err(std::env::VarError::NotPresent) => or_create()?, + Err(std::env::VarError::NotPresent) => or_create().await?, Err(_) => bail!("variable {} has invalid value", name), }; @@ -194,48 +197,56 @@ impl Reloadable for tokio::net::TcpListener { } } +pub struct NotifyReady; + +impl Future for NotifyReady { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + systemd_notify(SystemdNotify::Ready)?; + Poll::Ready(Ok(())) + } +} + /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP. /// If this is started regularly, a listening socket is created. In this case, the file descriptor /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`. /// If the variable already exists, its contents will instead be used to restore the listening /// socket. The finished listening socket is then passed to the `create_service` function which /// can be used to setup the TLS and the HTTP daemon. -pub fn create_daemon( +pub async fn create_daemon( address: std::net::SocketAddr, create_service: F, -) -> Result, Error> +) -> Result<(), Error> where - F: FnOnce(tokio::net::TcpListener) -> Result, + F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result, S: Future, { let mut reloader = Reloader::new(); let listener: tokio::net::TcpListener = reloader.restore( "PROXMOX_BACKUP_LISTEN_FD", - move || Ok(tokio::net::TcpListener::bind(&address)?), - )?; + move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) }, + ).await?; - let service = create_service(listener)?; + create_service(listener, NotifyReady)?.await; let mut reloader = Some(reloader); - Ok(service - .map(move |_| { - crate::tools::request_shutdown(); // make sure we are in shutdown mode - if server::is_reload_request() { - log::info!("daemon reload..."); - if let Err(e) = systemd_notify(SystemdNotify::Reloading) { - log::error!("failed to notify systemd about the state change: {}", e); - } - if let Err(e) = reloader.take().unwrap().fork_restart() { - log::error!("error during reload: {}", e); - let _ = systemd_notify(SystemdNotify::Status(format!("error during reload"))); - } - } else { - log::info!("daemon shutting down..."); - } - }) - ) + crate::tools::request_shutdown(); // make sure we are in shutdown mode + if server::is_reload_request() { + log::info!("daemon reload..."); + if let Err(e) = systemd_notify(SystemdNotify::Reloading) { + log::error!("failed to notify systemd about the state change: {}", e); + } + if let Err(e) = reloader.take().unwrap().fork_restart() { + log::error!("error during reload: {}", e); + let _ = systemd_notify(SystemdNotify::Status(format!("error during reload"))); + } + } else { + log::info!("daemon shutting down..."); + } + Ok(()) } #[link(name = "systemd")]