daemon: simlify code (make it easier to use)

This commit is contained in:
Dietmar Maurer 2021-09-29 11:21:32 +02:00
parent 01a080215d
commit d265420025
4 changed files with 35 additions and 43 deletions

View File

@ -2,13 +2,11 @@ use std::sync::{Arc, Mutex};
use std::collections::HashMap; use std::collections::HashMap;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
use futures::{FutureExt, TryFutureExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use proxmox::api::{api, router::SubdirMap, Router, RpcEnvironmentType, UserInformation}; use proxmox::api::{api, router::SubdirMap, Router, RpcEnvironmentType, UserInformation};
use proxmox::list_subdirs_api_method; use proxmox::list_subdirs_api_method;
use proxmox_rest_server::{ApiAuth, ApiConfig, AuthError, RestServer}; use proxmox_rest_server::{ApiAuth, ApiConfig, AuthError, RestServer};
// Create a Dummy User info and auth system // Create a Dummy User info and auth system
// Normally this would check and authenticate the user // Normally this would check and authenticate the user
struct DummyUserInfo; struct DummyUserInfo;
@ -197,16 +195,17 @@ async fn run() -> Result<(), Error> {
// the api to clients // the api to clients
proxmox_rest_server::daemon::create_daemon( proxmox_rest_server::daemon::create_daemon(
([127, 0, 0, 1], 65000).into(), ([127, 0, 0, 1], 65000).into(),
move |listener, ready| { move |listener| {
let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?; let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
Ok(ready Ok(async move {
.and_then(|_| hyper::Server::builder(incoming)
hyper::Server::builder(incoming)
.serve(rest_server) .serve(rest_server)
.map_err(Error::from) .await?;
)
.map_err(|err| eprintln!("ERR: {}", err)) Ok(())
.map(|test| println!("OK: {}", test.is_ok()))) })
}, },
"example_server", "example_server",
).await?; ).await?;

View File

@ -7,8 +7,6 @@ use std::os::raw::{c_char, c_uchar, c_int};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
use std::panic::UnwindSafe; use std::panic::UnwindSafe;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::path::PathBuf; use std::path::PathBuf;
use anyhow::{bail, format_err, Error}; use anyhow::{bail, format_err, Error};
@ -240,31 +238,21 @@ impl Reloadable for tokio::net::TcpListener {
} }
} }
pub struct NotifyReady;
impl Future for NotifyReady {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Error>> {
systemd_notify(SystemdNotify::Ready)?;
Poll::Ready(Ok(()))
}
}
/// This creates a future representing a daemon which reloads itself when receiving a SIGHUP. /// This creates a future representing a daemon which reloads itself when receiving a SIGHUP.
/// If this is started regularly, a listening socket is created. In this case, the file descriptor /// If this is started regularly, a listening socket is created. In this case, the file descriptor
/// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`. /// number will be remembered in `PROXMOX_BACKUP_LISTEN_FD`.
/// If the variable already exists, its contents will instead be used to restore the listening /// If the variable already exists, its contents will instead be used to restore the listening
/// socket. The finished listening socket is then passed to the `create_service` function which /// socket. The finished listening socket is then passed to the `create_service` function which
/// can be used to setup the TLS and the HTTP daemon. /// can be used to setup the TLS and the HTTP daemon. The returned future has to call
/// [systemd_notify] with [SystemdNotify::Ready] when the service is ready.
pub async fn create_daemon<F, S>( pub async fn create_daemon<F, S>(
address: std::net::SocketAddr, address: std::net::SocketAddr,
create_service: F, create_service: F,
service_name: &str, service_name: &str,
) -> Result<(), Error> ) -> Result<(), Error>
where where
F: FnOnce(tokio::net::TcpListener, NotifyReady) -> Result<S, Error>, F: FnOnce(tokio::net::TcpListener) -> Result<S, Error>,
S: Future<Output = ()> + Unpin, S: Future<Output = Result<(), Error>>,
{ {
let mut reloader = Reloader::new()?; let mut reloader = Reloader::new()?;
@ -273,7 +261,15 @@ where
move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) }, move || async move { Ok(tokio::net::TcpListener::bind(&address).await?) },
).await?; ).await?;
let server_future = create_service(listener, NotifyReady)?; let service = create_service(listener)?;
let service = async move {
if let Err(err) = service.await {
log::error!("server error: {}", err);
}
};
let server_future = Box::pin(service);
let shutdown_future = crate::shutdown_future(); let shutdown_future = crate::shutdown_future();
let finish_future = match future::select(server_future, shutdown_future).await { let finish_future = match future::select(server_future, shutdown_future).await {

View File

@ -106,21 +106,18 @@ async fn run() -> Result<(), Error> {
// http server future: // http server future:
let server = daemon::create_daemon( let server = daemon::create_daemon(
([127,0,0,1], 82).into(), ([127,0,0,1], 82).into(),
move |listener, ready| { move |listener| {
let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?; let incoming = hyper::server::conn::AddrIncoming::from_listener(listener)?;
Ok(ready Ok(async {
.and_then(|_| hyper::Server::builder(incoming) daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
hyper::Server::builder(incoming)
.serve(rest_server) .serve(rest_server)
.with_graceful_shutdown(proxmox_rest_server::shutdown_future()) .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
.map_err(Error::from) .map_err(Error::from)
) .await
.map(|e| {
if let Err(e) = e {
eprintln!("server error: {}", e);
}
}) })
)
}, },
"proxmox-backup.service", "proxmox-backup.service",
); );

View File

@ -247,20 +247,20 @@ async fn run() -> Result<(), Error> {
let server = daemon::create_daemon( let server = daemon::create_daemon(
([0,0,0,0,0,0,0,0], 8007).into(), ([0,0,0,0,0,0,0,0], 8007).into(),
move |listener, ready| { move |listener| {
let connections = accept_connections(listener, acceptor, debug); let connections = accept_connections(listener, acceptor, debug);
let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections)); let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections));
Ok(ready Ok(async {
.and_then(|_| hyper::Server::builder(connections) daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
hyper::Server::builder(connections)
.serve(rest_server) .serve(rest_server)
.with_graceful_shutdown(proxmox_rest_server::shutdown_future()) .with_graceful_shutdown(proxmox_rest_server::shutdown_future())
.map_err(Error::from) .map_err(Error::from)
) .await
.map_err(|err| eprintln!("server error: {}", err)) })
.map(|_| ())
)
}, },
"proxmox-backup-proxy.service", "proxmox-backup-proxy.service",
); );