diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index 67da7615..20e33c4f 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -4,7 +4,7 @@ extern crate proxmox_backup; use proxmox_backup::api_schema::router::*; use proxmox_backup::api_schema::config::*; use proxmox_backup::server::rest::*; -use proxmox_backup::tools::daemon::Reloader; +use proxmox_backup::tools::daemon; use proxmox_backup::auth_helpers::*; use proxmox_backup::config; @@ -12,12 +12,9 @@ use failure::*; use lazy_static::lazy_static; use futures::future::Future; -use tokio::prelude::*; use hyper; -static mut QUIT_MAIN: bool = false; - fn main() { if let Err(err) = run() { @@ -27,9 +24,6 @@ fn main() { } fn run() -> Result<(), Error> { - // This manages data for reloads: - let mut reloader = Reloader::new(); - if let Err(err) = syslog::init( syslog::Facility::LOG_DAEMON, log::LevelFilter::Info, @@ -59,75 +53,17 @@ fn run() -> Result<(), Error> { let rest_server = RestServer::new(config); // http server future: - - let listener: tokio::net::TcpListener = reloader.restore( - "PROXMOX_BACKUP_LISTEN_FD", - || { - let addr = ([127,0,0,1], 82).into(); - Ok(tokio::net::TcpListener::bind(&addr)?) + let server = daemon::create_daemon( + ([127,0,0,1], 82).into(), + |listener| { + Ok(hyper::Server::builder(listener.incoming()) + .serve(rest_server) + .map_err(|e| eprintln!("server error: {}", e)) + ) }, )?; - let mut http_server = hyper::Server::builder(listener.incoming()) - .serve(rest_server) - .map_err(|e| eprintln!("server error: {}", e)); - - // signalfd future: - - let signal_handler = - proxmox_backup::tools::daemon::default_signalfd_stream( - reloader, - || { - unsafe { QUIT_MAIN = true; } - Ok(()) - }, - )? - .map(|si| { - // debugging... - eprintln!("received signal: {}", si.ssi_signo); - }) - .map_err(|e| { - eprintln!("error from signalfd: {}, shutting down...", e); - unsafe { - QUIT_MAIN = true; - } - }); - - - // Combined future for signalfd & http server, we want to quit as soon as either of them ends. - // Neither of them is supposed to end unless some weird error happens, so just bail out if is - // the case... - let mut signal_handler = signal_handler.into_future(); - let main = futures::future::poll_fn(move || { - // Helper for some diagnostic error messages: - fn poll_helper(stream: &mut S, name: &'static str) -> bool { - match stream.poll() { - Ok(Async::Ready(_)) => { - eprintln!("{} ended, shutting down", name); - true - } - Err(_) => { - eprintln!("{} error, shutting down", name); - true - }, - _ => false, - } - } - if poll_helper(&mut http_server, "http server") || - poll_helper(&mut signal_handler, "signalfd handler") - { - return Ok(Async::Ready(())); - } - - if unsafe { QUIT_MAIN } { - eprintln!("shutdown requested"); - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) - } - }); - - hyper::rt::run(main); + hyper::rt::run(server); Ok(()) } diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index cb906d8e..eded263f 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -1,6 +1,6 @@ use proxmox_backup::configdir; use proxmox_backup::tools; -use proxmox_backup::tools::daemon::Reloader; +use proxmox_backup::tools::daemon; use proxmox_backup::api_schema::router::*; use proxmox_backup::api_schema::config::*; use proxmox_backup::server::rest::*; @@ -14,8 +14,6 @@ use tokio::prelude::*; use hyper; -static mut QUIT_MAIN: bool = false; - fn main() { if let Err(err) = run() { @@ -63,101 +61,41 @@ fn run() -> Result<(), Error> { Err(err) => bail!("unabled to decode pkcs12 identity {} - {}", cert_path, err), }; - // This manages data for reloads: - let mut reloader = Reloader::new(); - - // http server future: - - let listener: tokio::net::TcpListener = reloader.restore( - "PROXMOX_BACKUP_LISTEN_FD", - || { - let addr = ([0,0,0,0,0,0,0,0], 8007).into(); - Ok(tokio::net::TcpListener::bind(&addr)?) + let server = daemon::create_daemon( + ([0,0,0,0,0,0,0,0], 8007).into(), + |listener| { + let acceptor = native_tls::TlsAcceptor::new(identity)?; + let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor)); + let connections = listener + .incoming() + .map_err(Error::from) + .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into())) + .then(|r| match r { + // accept()s can fail here with an Err() when eg. the client rejects + // the cert and closes the connection, so we follow up with mapping + // it to an option and then filtering None with filter_map + Ok(c) => Ok::<_, Error>(Some(c)), + Err(e) => { + if let Some(_io) = e.downcast_ref::() { + // "real" IO errors should not simply be ignored + bail!("shutting down..."); + } else { + // handshake errors just get filtered by filter_map() below: + Ok(None) + } + } + }) + .filter_map(|r| { + // Filter out the Nones + r + }); + Ok(hyper::Server::builder(connections) + .serve(rest_server) + .map_err(|e| eprintln!("server error: {}", e)) + ) }, )?; - let acceptor = native_tls::TlsAcceptor::new(identity)?; - let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor)); - let connections = listener - .incoming() - .map_err(Error::from) - .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into())) - .then(|r| match r { - // accept()s can fail here with an Err() when eg. the client rejects - // the cert and closes the connection, so we follow up with mapping - // it to an option and then filtering None with filter_map - Ok(c) => Ok::<_, Error>(Some(c)), - Err(e) => { - if let Some(_io) = e.downcast_ref::() { - // "real" IO errors should not simply be ignored - bail!("shutting down..."); - } else { - // handshake errors just get filtered by filter_map() below: - Ok(None) - } - } - }) - .filter_map(|r| { - // Filter out the Nones - r - }); - let mut http_server = hyper::Server::builder(connections) - .serve(rest_server) - .map_err(|e| eprintln!("server error: {}", e)); - - // signalfd future: - let signal_handler = - proxmox_backup::tools::daemon::default_signalfd_stream( - reloader, - || { - unsafe { QUIT_MAIN = true; } - Ok(()) - }, - )? - .map(|si| { - // debugging... - eprintln!("received signal: {}", si.ssi_signo); - }) - .map_err(|e| { - eprintln!("error from signalfd: {}, shutting down...", e); - unsafe { - QUIT_MAIN = true; - } - }); - - // Combined future for signalfd & http server, we want to quit as soon as either of them ends. - // Neither of them is supposed to end unless some weird error happens, so just bail out if is - // the case... - let mut signal_handler = signal_handler.into_future(); - let main = futures::future::poll_fn(move || { - // Helper for some diagnostic error messages: - fn poll_helper(stream: &mut S, name: &'static str) -> bool { - match stream.poll() { - Ok(Async::Ready(_)) => { - eprintln!("{} ended, shutting down", name); - true - } - Err(_) => { - eprintln!("{} error, shutting down", name); - true - }, - _ => false, - } - } - if poll_helper(&mut http_server, "http server") || - poll_helper(&mut signal_handler, "signalfd handler") - { - return Ok(Async::Ready(())); - } - - if unsafe { QUIT_MAIN } { - eprintln!("shutdown requested"); - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) - } - }); - - hyper::rt::run(main); + hyper::rt::run(server); Ok(()) } diff --git a/src/tools/daemon.rs b/src/tools/daemon.rs index f51dab1c..db66c316 100644 --- a/src/tools/daemon.rs +++ b/src/tools/daemon.rs @@ -6,7 +6,8 @@ use std::os::unix::ffi::OsStrExt; use std::panic::UnwindSafe; use failure::*; -use nix::sys::signalfd::siginfo; +use futures::future::poll_fn; +use futures::try_ready; use tokio::prelude::*; use crate::tools::fd_change_cloexec; @@ -120,58 +121,6 @@ impl Reloader { } } -/// Provide a default signal handler for daemons (daemon & proxy). -/// When the first `SIGHUP` is received, the `reloader`'s `fork_restart` method will be -/// triggered. Any further `SIGHUP` is "passed through". -pub fn default_signalfd_stream( - reloader: Reloader, - before_reload: F, -) -> Result, Error> -where - F: FnOnce() -> Result<(), Error>, -{ - use nix::sys::signal::{SigmaskHow, Signal, sigprocmask}; - - // Block SIGHUP for *all* threads and use it for a signalfd handler: - let mut sigs = SigSet::empty(); - sigs.add(Signal::SIGHUP); - sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigs), None)?; - - let sigfdstream = SignalFd::new(&sigs)?; - let mut reloader = Some(reloader); - let mut before_reload = Some(before_reload); - - Ok(sigfdstream - .filter_map(move |si| { - // FIXME: logging should be left to the user of this: - eprintln!("received signal: {}", si.ssi_signo); - - if si.ssi_signo == Signal::SIGHUP as u32 { - // The firs time this happens we will try to start a new process which should take - // over. - if let Some(reloader) = reloader.take() { - if let Err(e) = (before_reload.take().unwrap())() { - return Some(Err(e)); - } - - match reloader.fork_restart() { - Ok(_) => return None, - Err(e) => return Some(Err(e)), - } - } - } - - // pass the rest through: - Some(Ok(si)) - }) - // filter_map cannot produce errors, so we create Result<> items instead, iow: - // before: Stream - // after: Stream, Error>. - // use and_then to lift out the wrapped result: - .and_then(|si_res| si_res) - ) -} - // For now all we need to do is store and reuse a tcp listening socket: impl Reloadable for tokio::net::TcpListener { // NOTE: The socket must not be closed when the store-function is called: @@ -196,3 +145,62 @@ impl Reloadable for tokio::net::TcpListener { )?) } } + +/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP. +/// If this is started regularly, a listening socket is created. In this case, the file descriptor +/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`. +/// If the variable already exists, its contents will instead be used to restore the listening +/// socket. The finished listening socket is then passed to the `create_service` function which +/// can be used to setup the TLS and the HTTP daemon. +pub fn create_daemon( + address: std::net::SocketAddr, + create_service: F, +) -> Result, Error> +where + F: FnOnce(tokio::net::TcpListener) -> Result, + S: Future, +{ + let mut reloader = Reloader::new(); + + let listener: tokio::net::TcpListener = reloader.restore( + "PROXMOX_BACKUP_LISTEN_FD", + move || Ok(tokio::net::TcpListener::bind(&address)?), + )?; + + let service = create_service(listener)?; + + // Block SIGHUP for *all* threads and use it for a signalfd handler: + use nix::sys::signal; + let mut sigs = SigSet::empty(); + sigs.add(signal::Signal::SIGHUP); + signal::sigprocmask(signal::SigmaskHow::SIG_BLOCK, Some(&sigs), None)?; + + let mut sigfdstream = SignalFd::new(&sigs)? + .map_err(|e| log::error!("error in signal handler: {}", e)); + + let mut reloader = Some(reloader); + + // Use a Future instead of a Stream for ease-of-use: Poll until we receive a SIGHUP. + let signal_handler = poll_fn(move || { + match try_ready!(sigfdstream.poll()) { + Some(si) => { + log::info!("received signal {}", si.ssi_signo); + if si.ssi_signo == signal::Signal::SIGHUP as u32 { + if let Err(e) = reloader.take().unwrap().fork_restart() { + log::error!("error during reload: {}", e); + } + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } + // or the stream ended (which it can't, really) + None => Ok(Async::Ready(())) + } + }); + + Ok(service.select(signal_handler) + .map(|_| log::info!("daemon shutting down...")) + .map_err(|_| ()) + ) +}