daemon: simplify daemon creation
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
		| @ -4,7 +4,7 @@ extern crate proxmox_backup; | ||||
| use proxmox_backup::api_schema::router::*; | ||||
| use proxmox_backup::api_schema::config::*; | ||||
| use proxmox_backup::server::rest::*; | ||||
| use proxmox_backup::tools::daemon::Reloader; | ||||
| use proxmox_backup::tools::daemon; | ||||
| use proxmox_backup::auth_helpers::*; | ||||
| use proxmox_backup::config; | ||||
|  | ||||
| @ -12,12 +12,9 @@ use failure::*; | ||||
| use lazy_static::lazy_static; | ||||
|  | ||||
| use futures::future::Future; | ||||
| use tokio::prelude::*; | ||||
|  | ||||
| use hyper; | ||||
|  | ||||
| static mut QUIT_MAIN: bool = false; | ||||
|  | ||||
| fn main() { | ||||
|  | ||||
|     if let Err(err) = run() { | ||||
| @ -27,9 +24,6 @@ fn main() { | ||||
| } | ||||
|  | ||||
| fn run() -> Result<(), Error> { | ||||
|     // This manages data for reloads: | ||||
|     let mut reloader = Reloader::new(); | ||||
|  | ||||
|     if let Err(err) = syslog::init( | ||||
|         syslog::Facility::LOG_DAEMON, | ||||
|         log::LevelFilter::Info, | ||||
| @ -59,75 +53,17 @@ fn run() -> Result<(), Error> { | ||||
|     let rest_server = RestServer::new(config); | ||||
|  | ||||
|     // http server future: | ||||
|  | ||||
|     let listener: tokio::net::TcpListener = reloader.restore( | ||||
|         "PROXMOX_BACKUP_LISTEN_FD", | ||||
|         || { | ||||
|             let addr = ([127,0,0,1], 82).into(); | ||||
|             Ok(tokio::net::TcpListener::bind(&addr)?) | ||||
|     let server = daemon::create_daemon( | ||||
|         ([127,0,0,1], 82).into(), | ||||
|         |listener| { | ||||
|             Ok(hyper::Server::builder(listener.incoming()) | ||||
|                 .serve(rest_server) | ||||
|                 .map_err(|e| eprintln!("server error: {}", e)) | ||||
|             ) | ||||
|         }, | ||||
|     )?; | ||||
|  | ||||
|     let mut http_server = hyper::Server::builder(listener.incoming()) | ||||
|         .serve(rest_server) | ||||
|         .map_err(|e| eprintln!("server error: {}", e)); | ||||
|  | ||||
|     // signalfd future: | ||||
|  | ||||
|     let signal_handler = | ||||
|         proxmox_backup::tools::daemon::default_signalfd_stream( | ||||
|             reloader, | ||||
|             || { | ||||
|                 unsafe { QUIT_MAIN = true; } | ||||
|                 Ok(()) | ||||
|             }, | ||||
|         )? | ||||
|         .map(|si| { | ||||
|             // debugging... | ||||
|             eprintln!("received signal: {}", si.ssi_signo); | ||||
|         }) | ||||
|         .map_err(|e| { | ||||
|             eprintln!("error from signalfd: {}, shutting down...", e); | ||||
|             unsafe { | ||||
|                 QUIT_MAIN = true; | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|  | ||||
|     // Combined future for signalfd & http server, we want to quit as soon as either of them ends. | ||||
|     // Neither of them is supposed to end unless some weird error happens, so just bail out if is | ||||
|     // the case... | ||||
|     let mut signal_handler = signal_handler.into_future(); | ||||
|     let main = futures::future::poll_fn(move || { | ||||
|         // Helper for some diagnostic error messages: | ||||
|         fn poll_helper<S: Future>(stream: &mut S, name: &'static str) -> bool { | ||||
|             match stream.poll() { | ||||
|                 Ok(Async::Ready(_)) => { | ||||
|                     eprintln!("{} ended, shutting down", name); | ||||
|                     true | ||||
|                 } | ||||
|                 Err(_) => { | ||||
|                     eprintln!("{} error, shutting down", name); | ||||
|                     true | ||||
|                 }, | ||||
|                 _ => false, | ||||
|             } | ||||
|         } | ||||
|         if poll_helper(&mut http_server, "http server") || | ||||
|            poll_helper(&mut signal_handler, "signalfd handler") | ||||
|         { | ||||
|             return Ok(Async::Ready(())); | ||||
|         } | ||||
|  | ||||
|         if unsafe { QUIT_MAIN } { | ||||
|             eprintln!("shutdown requested"); | ||||
|             Ok(Async::Ready(())) | ||||
|         } else { | ||||
|             Ok(Async::NotReady) | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|     hyper::rt::run(main); | ||||
|     hyper::rt::run(server); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| @ -1,6 +1,6 @@ | ||||
| use proxmox_backup::configdir; | ||||
| use proxmox_backup::tools; | ||||
| use proxmox_backup::tools::daemon::Reloader; | ||||
| use proxmox_backup::tools::daemon; | ||||
| use proxmox_backup::api_schema::router::*; | ||||
| use proxmox_backup::api_schema::config::*; | ||||
| use proxmox_backup::server::rest::*; | ||||
| @ -14,8 +14,6 @@ use tokio::prelude::*; | ||||
|  | ||||
| use hyper; | ||||
|  | ||||
| static mut QUIT_MAIN: bool = false; | ||||
|  | ||||
| fn main() { | ||||
|  | ||||
|     if let Err(err) = run() { | ||||
| @ -63,101 +61,41 @@ fn run() -> Result<(), Error> { | ||||
|         Err(err) => bail!("unabled to decode pkcs12 identity {} - {}", cert_path, err), | ||||
|     }; | ||||
|  | ||||
|     // This manages data for reloads: | ||||
|     let mut reloader = Reloader::new(); | ||||
|  | ||||
|     // http server future: | ||||
|  | ||||
|     let listener: tokio::net::TcpListener = reloader.restore( | ||||
|         "PROXMOX_BACKUP_LISTEN_FD", | ||||
|         || { | ||||
|             let addr = ([0,0,0,0,0,0,0,0], 8007).into(); | ||||
|             Ok(tokio::net::TcpListener::bind(&addr)?) | ||||
|     let server = daemon::create_daemon( | ||||
|         ([0,0,0,0,0,0,0,0], 8007).into(), | ||||
|         |listener| { | ||||
|             let acceptor = native_tls::TlsAcceptor::new(identity)?; | ||||
|             let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor)); | ||||
|             let connections = listener | ||||
|                 .incoming() | ||||
|                 .map_err(Error::from) | ||||
|                 .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into())) | ||||
|                 .then(|r| match r { | ||||
|                     // accept()s can fail here with an Err() when eg. the client rejects | ||||
|                     // the cert and closes the connection, so we follow up with mapping | ||||
|                     // it to an option and then filtering None with filter_map | ||||
|                     Ok(c) => Ok::<_, Error>(Some(c)), | ||||
|                     Err(e) => { | ||||
|                         if let Some(_io) = e.downcast_ref::<std::io::Error>() { | ||||
|                             // "real" IO errors should not simply be ignored | ||||
|                             bail!("shutting down..."); | ||||
|                         } else { | ||||
|                             // handshake errors just get filtered by filter_map() below: | ||||
|                             Ok(None) | ||||
|                         } | ||||
|                     } | ||||
|                 }) | ||||
|                 .filter_map(|r| { | ||||
|                     // Filter out the Nones | ||||
|                     r | ||||
|                 }); | ||||
|             Ok(hyper::Server::builder(connections) | ||||
|                 .serve(rest_server) | ||||
|                 .map_err(|e| eprintln!("server error: {}", e)) | ||||
|             ) | ||||
|         }, | ||||
|     )?; | ||||
|     let acceptor = native_tls::TlsAcceptor::new(identity)?; | ||||
|     let acceptor = std::sync::Arc::new(tokio_tls::TlsAcceptor::from(acceptor)); | ||||
|     let connections = listener | ||||
|         .incoming() | ||||
|         .map_err(Error::from) | ||||
|         .and_then(move |sock| acceptor.accept(sock).map_err(|e| e.into())) | ||||
|         .then(|r| match r { | ||||
|             // accept()s can fail here with an Err() when eg. the client rejects | ||||
|             // the cert and closes the connection, so we follow up with mapping | ||||
|             // it to an option and then filtering None with filter_map | ||||
|             Ok(c) => Ok::<_, Error>(Some(c)), | ||||
|             Err(e) => { | ||||
|                 if let Some(_io) = e.downcast_ref::<std::io::Error>() { | ||||
|                     // "real" IO errors should not simply be ignored | ||||
|                     bail!("shutting down..."); | ||||
|                 } else { | ||||
|                     // handshake errors just get filtered by filter_map() below: | ||||
|                     Ok(None) | ||||
|                 } | ||||
|             } | ||||
|         }) | ||||
|         .filter_map(|r| { | ||||
|             // Filter out the Nones | ||||
|             r | ||||
|         }); | ||||
|  | ||||
|     let mut http_server = hyper::Server::builder(connections) | ||||
|         .serve(rest_server) | ||||
|         .map_err(|e| eprintln!("server error: {}", e)); | ||||
|  | ||||
|     // signalfd future: | ||||
|     let signal_handler = | ||||
|         proxmox_backup::tools::daemon::default_signalfd_stream( | ||||
|             reloader, | ||||
|             || { | ||||
|                 unsafe { QUIT_MAIN = true; } | ||||
|                 Ok(()) | ||||
|             }, | ||||
|         )? | ||||
|         .map(|si| { | ||||
|             // debugging... | ||||
|             eprintln!("received signal: {}", si.ssi_signo); | ||||
|         }) | ||||
|         .map_err(|e| { | ||||
|             eprintln!("error from signalfd: {}, shutting down...", e); | ||||
|             unsafe { | ||||
|                 QUIT_MAIN = true; | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|     // Combined future for signalfd & http server, we want to quit as soon as either of them ends. | ||||
|     // Neither of them is supposed to end unless some weird error happens, so just bail out if is | ||||
|     // the case... | ||||
|     let mut signal_handler = signal_handler.into_future(); | ||||
|     let main = futures::future::poll_fn(move || { | ||||
|         // Helper for some diagnostic error messages: | ||||
|         fn poll_helper<S: Future>(stream: &mut S, name: &'static str) -> bool { | ||||
|             match stream.poll() { | ||||
|                 Ok(Async::Ready(_)) => { | ||||
|                     eprintln!("{} ended, shutting down", name); | ||||
|                     true | ||||
|                 } | ||||
|                 Err(_) => { | ||||
|                     eprintln!("{} error, shutting down", name); | ||||
|                     true | ||||
|                 }, | ||||
|                 _ => false, | ||||
|             } | ||||
|         } | ||||
|         if poll_helper(&mut http_server, "http server") || | ||||
|            poll_helper(&mut signal_handler, "signalfd handler") | ||||
|         { | ||||
|             return Ok(Async::Ready(())); | ||||
|         } | ||||
|  | ||||
|         if unsafe { QUIT_MAIN } { | ||||
|             eprintln!("shutdown requested"); | ||||
|             Ok(Async::Ready(())) | ||||
|         } else { | ||||
|             Ok(Async::NotReady) | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|     hyper::rt::run(main); | ||||
|     hyper::rt::run(server); | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| @ -6,7 +6,8 @@ use std::os::unix::ffi::OsStrExt; | ||||
| use std::panic::UnwindSafe; | ||||
|  | ||||
| use failure::*; | ||||
| use nix::sys::signalfd::siginfo; | ||||
| use futures::future::poll_fn; | ||||
| use futures::try_ready; | ||||
| use tokio::prelude::*; | ||||
|  | ||||
| use crate::tools::fd_change_cloexec; | ||||
| @ -120,58 +121,6 @@ impl Reloader { | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Provide a default signal handler for daemons (daemon & proxy). | ||||
| /// When the first `SIGHUP` is received, the `reloader`'s `fork_restart` method will be | ||||
| /// triggered. Any further `SIGHUP` is "passed through". | ||||
| pub fn default_signalfd_stream<F>( | ||||
|     reloader: Reloader, | ||||
|     before_reload: F, | ||||
| ) -> Result<impl Stream<Item = siginfo, Error = Error>, Error> | ||||
| where | ||||
|     F: FnOnce() -> Result<(), Error>, | ||||
| { | ||||
|     use nix::sys::signal::{SigmaskHow, Signal, sigprocmask}; | ||||
|  | ||||
|     // Block SIGHUP for *all* threads and use it for a signalfd handler: | ||||
|     let mut sigs = SigSet::empty(); | ||||
|     sigs.add(Signal::SIGHUP); | ||||
|     sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigs), None)?; | ||||
|  | ||||
|     let sigfdstream = SignalFd::new(&sigs)?; | ||||
|     let mut reloader = Some(reloader); | ||||
|     let mut before_reload = Some(before_reload); | ||||
|  | ||||
|     Ok(sigfdstream | ||||
|         .filter_map(move |si| { | ||||
|             // FIXME: logging should be left to the user of this: | ||||
|             eprintln!("received signal: {}", si.ssi_signo); | ||||
|  | ||||
|             if si.ssi_signo == Signal::SIGHUP as u32 { | ||||
|                 // The firs time this happens we will try to start a new process which should take | ||||
|                 // over. | ||||
|                 if let Some(reloader) = reloader.take() { | ||||
|                     if let Err(e) = (before_reload.take().unwrap())() { | ||||
|                         return Some(Err(e)); | ||||
|                     } | ||||
|  | ||||
|                     match reloader.fork_restart() { | ||||
|                         Ok(_) => return None, | ||||
|                         Err(e) => return Some(Err(e)), | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             // pass the rest through: | ||||
|             Some(Ok(si)) | ||||
|         }) | ||||
|         // filter_map cannot produce errors, so we create Result<> items instead, iow: | ||||
|         //   before: Stream<Item = siginfo, Error> | ||||
|         //   after:  Stream<Item = Result<siginfo, Error>, Error>. | ||||
|         // use and_then to lift out the wrapped result: | ||||
|         .and_then(|si_res| si_res) | ||||
|     ) | ||||
| } | ||||
|  | ||||
| // For now all we need to do is store and reuse a tcp listening socket: | ||||
| impl Reloadable for tokio::net::TcpListener { | ||||
|     // NOTE: The socket must not be closed when the store-function is called: | ||||
| @ -196,3 +145,62 @@ impl Reloadable for tokio::net::TcpListener { | ||||
|         )?) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP. | ||||
| /// If this is started regularly, a listening socket is created. In this case, the file descriptor | ||||
| /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`. | ||||
| /// If the variable already exists, its contents will instead be used to restore the listening | ||||
| /// socket.  The finished listening socket is then passed to the `create_service` function which | ||||
| /// can be used to setup the TLS and the HTTP daemon. | ||||
| pub fn create_daemon<F, S>( | ||||
|     address: std::net::SocketAddr, | ||||
|     create_service: F, | ||||
| ) -> Result<impl Future<Item = (), Error = ()>, Error> | ||||
| where | ||||
|     F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>, | ||||
|     S: Future<Item = (), Error = ()>, | ||||
| { | ||||
|     let mut reloader = Reloader::new(); | ||||
|  | ||||
|     let listener: tokio::net::TcpListener = reloader.restore( | ||||
|         "PROXMOX_BACKUP_LISTEN_FD", | ||||
|         move || Ok(tokio::net::TcpListener::bind(&address)?), | ||||
|     )?; | ||||
|  | ||||
|     let service = create_service(listener)?; | ||||
|  | ||||
|     // Block SIGHUP for *all* threads and use it for a signalfd handler: | ||||
|     use nix::sys::signal; | ||||
|     let mut sigs = SigSet::empty(); | ||||
|     sigs.add(signal::Signal::SIGHUP); | ||||
|     signal::sigprocmask(signal::SigmaskHow::SIG_BLOCK, Some(&sigs), None)?; | ||||
|  | ||||
|     let mut sigfdstream = SignalFd::new(&sigs)? | ||||
|         .map_err(|e| log::error!("error in signal handler: {}", e)); | ||||
|  | ||||
|     let mut reloader = Some(reloader); | ||||
|  | ||||
|     // Use a Future instead of a Stream for ease-of-use: Poll until we receive a SIGHUP. | ||||
|     let signal_handler = poll_fn(move || { | ||||
|         match try_ready!(sigfdstream.poll()) { | ||||
|             Some(si) => { | ||||
|                 log::info!("received signal {}", si.ssi_signo); | ||||
|                 if si.ssi_signo == signal::Signal::SIGHUP as u32 { | ||||
|                     if let Err(e) = reloader.take().unwrap().fork_restart() { | ||||
|                         log::error!("error during reload: {}", e); | ||||
|                     } | ||||
|                     Ok(Async::Ready(())) | ||||
|                 } else { | ||||
|                     Ok(Async::NotReady) | ||||
|                 } | ||||
|             } | ||||
|             // or the stream ended (which it can't, really) | ||||
|             None => Ok(Async::Ready(())) | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|     Ok(service.select(signal_handler) | ||||
|         .map(|_| log::info!("daemon shutting down...")) | ||||
|         .map_err(|_| ()) | ||||
|     ) | ||||
| } | ||||
|  | ||||
		Reference in New Issue
	
	Block a user