move ApiConfig, FileLogger and CommandoSocket to proxmox-rest-server workspace
ApiConfig: avoid using pbs_config::backup_user() CommandoSocket: avoid using pbs_config::backup_user() FileLogger: avoid using pbs_config::backup_user() - use atomic_open_or_create_file() Auth Trait: moved definitions to proxmox-rest-server/src/lib.rs - removed CachedUserInfo patrameter - return user as String (not Authid) Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
committed by
Thomas Lamprecht
parent
037f6b6d5e
commit
fd6d243843
@ -1505,7 +1505,7 @@ pub fn pxar_file_download(
|
||||
EntryKind::Directory => {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(100);
|
||||
let channelwriter = AsyncChannelWriter::new(sender, 1024 * 1024);
|
||||
crate::server::spawn_internal_task(
|
||||
proxmox_rest_server::spawn_internal_task(
|
||||
create_zip(channelwriter, decoder, path.clone(), false)
|
||||
);
|
||||
Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
|
||||
|
@ -295,12 +295,12 @@ fn upgrade_to_websocket(
|
||||
|
||||
let (ws, response) = WebSocket::new(parts.headers.clone())?;
|
||||
|
||||
crate::server::spawn_internal_task(async move {
|
||||
proxmox_rest_server::spawn_internal_task(async move {
|
||||
let conn: Upgraded = match hyper::upgrade::on(Request::from_parts(parts, req_body))
|
||||
.map_err(Error::from)
|
||||
.await
|
||||
{
|
||||
Ok(upgraded) => upgraded,
|
||||
Ok(upgraded) => upgraded,
|
||||
_ => bail!("error"),
|
||||
};
|
||||
|
||||
|
@ -29,8 +29,7 @@ use pbs_tools::format::HumanByte;
|
||||
use pbs_tools::fs::{lock_dir_noblock, DirLockGuard};
|
||||
use pbs_tools::process_locker::ProcessLockSharedGuard;
|
||||
use pbs_config::{open_backup_lockfile, BackupLockGuard};
|
||||
|
||||
use crate::tools::fail_on_shutdown;
|
||||
use proxmox_rest_server::fail_on_shutdown;
|
||||
|
||||
lazy_static! {
|
||||
static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
|
||||
|
@ -172,7 +172,7 @@ fn verify_index_chunks(
|
||||
let check_abort = |pos: usize| -> Result<(), Error> {
|
||||
if pos & 1023 == 0 {
|
||||
verify_worker.worker.check_abort()?;
|
||||
crate::tools::fail_on_shutdown()?;
|
||||
proxmox_rest_server::fail_on_shutdown()?;
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
@ -184,7 +184,7 @@ fn verify_index_chunks(
|
||||
|
||||
for (pos, _) in chunk_list {
|
||||
verify_worker.worker.check_abort()?;
|
||||
crate::tools::fail_on_shutdown()?;
|
||||
proxmox_rest_server::fail_on_shutdown()?;
|
||||
|
||||
let info = index.chunk_info(pos).unwrap();
|
||||
|
||||
@ -376,7 +376,7 @@ pub fn verify_backup_dir_with_lock(
|
||||
});
|
||||
|
||||
verify_worker.worker.check_abort()?;
|
||||
crate::tools::fail_on_shutdown()?;
|
||||
proxmox_rest_server::fail_on_shutdown()?;
|
||||
|
||||
if let Err(err) = result {
|
||||
task_log!(
|
||||
|
@ -3,8 +3,10 @@ use futures::*;
|
||||
|
||||
use proxmox::try_block;
|
||||
use proxmox::api::RpcEnvironmentType;
|
||||
use proxmox::tools::fs::CreateOptions;
|
||||
|
||||
use pbs_tools::auth::private_auth_key;
|
||||
use proxmox_rest_server::ApiConfig;
|
||||
|
||||
use proxmox_backup::server::{
|
||||
self,
|
||||
@ -57,16 +59,25 @@ async fn run() -> Result<(), Error> {
|
||||
}
|
||||
let _ = csrf_secret(); // load with lazy_static
|
||||
|
||||
let mut config = server::ApiConfig::new(
|
||||
let mut config = ApiConfig::new(
|
||||
pbs_buildcfg::JS_DIR,
|
||||
&proxmox_backup::api2::ROUTER,
|
||||
RpcEnvironmentType::PRIVILEGED,
|
||||
default_api_auth(),
|
||||
)?;
|
||||
|
||||
let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
|
||||
|
||||
config.enable_file_log(pbs_buildcfg::API_ACCESS_LOG_FN, &mut commando_sock)?;
|
||||
let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
|
||||
let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
|
||||
|
||||
config.enable_file_log(
|
||||
pbs_buildcfg::API_ACCESS_LOG_FN,
|
||||
Some(dir_opts),
|
||||
Some(file_opts),
|
||||
&mut commando_sock,
|
||||
)?;
|
||||
|
||||
let rest_server = RestServer::new(config);
|
||||
|
||||
@ -78,7 +89,7 @@ async fn run() -> Result<(), Error> {
|
||||
Ok(ready
|
||||
.and_then(|_| hyper::Server::builder(incoming)
|
||||
.serve(rest_server)
|
||||
.with_graceful_shutdown(server::shutdown_future())
|
||||
.with_graceful_shutdown(proxmox_rest_server::shutdown_future())
|
||||
.map_err(Error::from)
|
||||
)
|
||||
.map(|e| {
|
||||
@ -97,7 +108,7 @@ async fn run() -> Result<(), Error> {
|
||||
let init_result: Result<(), Error> = try_block!({
|
||||
server::register_task_control_commands(&mut commando_sock)?;
|
||||
commando_sock.spawn()?;
|
||||
server::server_state_init()?;
|
||||
proxmox_rest_server::server_state_init()?;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
@ -107,7 +118,7 @@ async fn run() -> Result<(), Error> {
|
||||
|
||||
server.await?;
|
||||
log::info!("server shutting down, waiting for active workers to complete");
|
||||
proxmox_backup::server::last_worker_future().await?;
|
||||
proxmox_rest_server::last_worker_future().await?;
|
||||
|
||||
log::info!("done - exit server");
|
||||
|
||||
|
@ -12,13 +12,15 @@ use serde_json::Value;
|
||||
use proxmox::try_block;
|
||||
use proxmox::api::RpcEnvironmentType;
|
||||
use proxmox::sys::linux::socket::set_tcp_keepalive;
|
||||
use proxmox::tools::fs::CreateOptions;
|
||||
|
||||
use proxmox_rest_server::ApiConfig;
|
||||
|
||||
use proxmox_backup::{
|
||||
backup::DataStore,
|
||||
server::{
|
||||
auth::default_api_auth,
|
||||
WorkerTask,
|
||||
ApiConfig,
|
||||
rest::*,
|
||||
jobstate::{
|
||||
self,
|
||||
@ -106,9 +108,18 @@ async fn run() -> Result<(), Error> {
|
||||
config.register_template("index", &indexpath)?;
|
||||
config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
|
||||
|
||||
let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
let mut commando_sock = proxmox_rest_server::CommandoSocket::new(crate::server::our_ctrl_sock(), backup_user.gid);
|
||||
|
||||
config.enable_file_log(pbs_buildcfg::API_ACCESS_LOG_FN, &mut commando_sock)?;
|
||||
let dir_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
|
||||
let file_opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
|
||||
|
||||
config.enable_file_log(
|
||||
pbs_buildcfg::API_ACCESS_LOG_FN,
|
||||
Some(dir_opts),
|
||||
Some(file_opts),
|
||||
&mut commando_sock,
|
||||
)?;
|
||||
|
||||
let rest_server = RestServer::new(config);
|
||||
|
||||
@ -158,7 +169,7 @@ async fn run() -> Result<(), Error> {
|
||||
Ok(ready
|
||||
.and_then(|_| hyper::Server::builder(connections)
|
||||
.serve(rest_server)
|
||||
.with_graceful_shutdown(server::shutdown_future())
|
||||
.with_graceful_shutdown(proxmox_rest_server::shutdown_future())
|
||||
.map_err(Error::from)
|
||||
)
|
||||
.map_err(|err| eprintln!("server error: {}", err))
|
||||
@ -174,7 +185,7 @@ async fn run() -> Result<(), Error> {
|
||||
let init_result: Result<(), Error> = try_block!({
|
||||
server::register_task_control_commands(&mut commando_sock)?;
|
||||
commando_sock.spawn()?;
|
||||
server::server_state_init()?;
|
||||
proxmox_rest_server::server_state_init()?;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
@ -187,7 +198,7 @@ async fn run() -> Result<(), Error> {
|
||||
|
||||
server.await?;
|
||||
log::info!("server shutting down, waiting for active workers to complete");
|
||||
proxmox_backup::server::last_worker_future().await?;
|
||||
proxmox_rest_server::last_worker_future().await?;
|
||||
log::info!("done - exit server");
|
||||
|
||||
Ok(())
|
||||
@ -304,14 +315,14 @@ async fn accept_connection(
|
||||
}
|
||||
|
||||
fn start_stat_generator() {
|
||||
let abort_future = server::shutdown_future();
|
||||
let abort_future = proxmox_rest_server::shutdown_future();
|
||||
let future = Box::pin(run_stat_generator());
|
||||
let task = futures::future::select(future, abort_future);
|
||||
tokio::spawn(task.map(|_| ()));
|
||||
}
|
||||
|
||||
fn start_task_scheduler() {
|
||||
let abort_future = server::shutdown_future();
|
||||
let abort_future = proxmox_rest_server::shutdown_future();
|
||||
let future = Box::pin(run_task_scheduler());
|
||||
let task = futures::future::select(future, abort_future);
|
||||
tokio::spawn(task.map(|_| ()));
|
||||
@ -706,12 +717,12 @@ async fn schedule_task_log_rotate() {
|
||||
async fn command_reopen_logfiles() -> Result<(), Error> {
|
||||
// only care about the most recent daemon instance for each, proxy & api, as other older ones
|
||||
// should not respond to new requests anyway, but only finish their current one and then exit.
|
||||
let sock = server::our_ctrl_sock();
|
||||
let f1 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
|
||||
let sock = crate::server::our_ctrl_sock();
|
||||
let f1 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
|
||||
|
||||
let pid = server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
|
||||
let sock = server::ctrl_sock_from_pid(pid);
|
||||
let f2 = server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
|
||||
let pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_API_PID_FN)?;
|
||||
let sock = crate::server::ctrl_sock_from_pid(pid);
|
||||
let f2 = proxmox_rest_server::send_command(sock, "{\"command\":\"api-access-log-reopen\"}\n");
|
||||
|
||||
match futures::join!(f1, f2) {
|
||||
(Err(e1), Err(e2)) => Err(format_err!("reopen commands failed, proxy: {}; api: {}", e1, e2)),
|
||||
|
@ -15,9 +15,11 @@ use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use proxmox::api::RpcEnvironmentType;
|
||||
use proxmox_backup::server::{rest::*, ApiConfig};
|
||||
|
||||
use pbs_client::DEFAULT_VSOCK_PORT;
|
||||
use proxmox_rest_server::ApiConfig;
|
||||
|
||||
use proxmox_backup::server::rest::*;
|
||||
|
||||
mod proxmox_restore_daemon;
|
||||
use proxmox_restore_daemon::*;
|
||||
|
@ -4,10 +4,7 @@ use std::io::prelude::*;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
|
||||
use pbs_api_types::Authid;
|
||||
|
||||
use pbs_config::CachedUserInfo;
|
||||
use proxmox_backup::server::auth::{ApiAuth, AuthError};
|
||||
use proxmox_rest_server::{ApiAuth, AuthError};
|
||||
|
||||
const TICKET_FILE: &str = "/ticket";
|
||||
|
||||
@ -20,11 +17,10 @@ impl ApiAuth for StaticAuth {
|
||||
&self,
|
||||
headers: &http::HeaderMap,
|
||||
_method: &hyper::Method,
|
||||
_user_info: &CachedUserInfo,
|
||||
) -> Result<Authid, AuthError> {
|
||||
) -> Result<String, AuthError> {
|
||||
match headers.get(hyper::header::AUTHORIZATION) {
|
||||
Some(header) if header.to_str().unwrap_or("") == &self.ticket => {
|
||||
Ok(Authid::root_auth_id().to_owned())
|
||||
Ok(String::from("root@pam"))
|
||||
}
|
||||
_ => {
|
||||
return Err(AuthError::Generic(format_err!(
|
||||
|
@ -1,11 +1,12 @@
|
||||
//! Provides authentication primitives for the HTTP server
|
||||
use anyhow::{format_err, Error};
|
||||
use anyhow::format_err;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use pbs_tools::ticket::{self, Ticket};
|
||||
use pbs_config::{token_shadow, CachedUserInfo};
|
||||
use pbs_api_types::{Authid, Userid};
|
||||
use proxmox_rest_server::{ApiAuth, AuthError};
|
||||
|
||||
use crate::auth_helpers::*;
|
||||
use crate::tools;
|
||||
@ -13,26 +14,6 @@ use crate::tools;
|
||||
use hyper::header;
|
||||
use percent_encoding::percent_decode_str;
|
||||
|
||||
pub enum AuthError {
|
||||
Generic(Error),
|
||||
NoData,
|
||||
}
|
||||
|
||||
impl From<Error> for AuthError {
|
||||
fn from(err: Error) -> Self {
|
||||
AuthError::Generic(err)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ApiAuth {
|
||||
fn check_auth(
|
||||
&self,
|
||||
headers: &http::HeaderMap,
|
||||
method: &hyper::Method,
|
||||
user_info: &CachedUserInfo,
|
||||
) -> Result<Authid, AuthError>;
|
||||
}
|
||||
|
||||
struct UserAuthData {
|
||||
ticket: String,
|
||||
csrf_token: Option<String>,
|
||||
@ -80,8 +61,10 @@ impl ApiAuth for UserApiAuth {
|
||||
&self,
|
||||
headers: &http::HeaderMap,
|
||||
method: &hyper::Method,
|
||||
user_info: &CachedUserInfo,
|
||||
) -> Result<Authid, AuthError> {
|
||||
) -> Result<String, AuthError> {
|
||||
|
||||
let user_info = CachedUserInfo::new()?;
|
||||
|
||||
let auth_data = Self::extract_auth_data(headers);
|
||||
match auth_data {
|
||||
Some(AuthData::User(user_auth_data)) => {
|
||||
@ -111,7 +94,7 @@ impl ApiAuth for UserApiAuth {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(auth_id)
|
||||
Ok(auth_id.to_string())
|
||||
}
|
||||
Some(AuthData::ApiToken(api_token)) => {
|
||||
let mut parts = api_token.splitn(2, ':');
|
||||
@ -133,7 +116,7 @@ impl ApiAuth for UserApiAuth {
|
||||
|
||||
token_shadow::verify_secret(&tokenid, &tokensecret)?;
|
||||
|
||||
Ok(tokenid)
|
||||
Ok(tokenid.to_string())
|
||||
}
|
||||
None => Err(AuthError::NoData),
|
||||
}
|
||||
|
@ -1,220 +0,0 @@
|
||||
use anyhow::{bail, format_err, Error};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::path::{PathBuf, Path};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::*;
|
||||
use tokio::net::UnixListener;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use nix::sys::socket;
|
||||
|
||||
/// Listens on a Unix Socket to handle simple command asynchronously
|
||||
fn create_control_socket<P, F>(path: P, func: F) -> Result<impl Future<Output = ()>, Error>
|
||||
where
|
||||
P: Into<PathBuf>,
|
||||
F: Fn(Value) -> Result<Value, Error> + Send + Sync + 'static,
|
||||
{
|
||||
let path: PathBuf = path.into();
|
||||
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
let backup_gid = backup_user.gid.as_raw();
|
||||
|
||||
let socket = UnixListener::bind(&path)?;
|
||||
|
||||
let func = Arc::new(func);
|
||||
|
||||
let control_future = async move {
|
||||
loop {
|
||||
let (conn, _addr) = match socket.accept().await {
|
||||
Ok(data) => data,
|
||||
Err(err) => {
|
||||
eprintln!("failed to accept on control socket {:?}: {}", path, err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let opt = socket::sockopt::PeerCredentials {};
|
||||
let cred = match socket::getsockopt(conn.as_raw_fd(), opt) {
|
||||
Ok(cred) => cred,
|
||||
Err(err) => {
|
||||
eprintln!("no permissions - unable to read peer credential - {}", err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// check permissions (same gid, root user, or backup group)
|
||||
let mygid = unsafe { libc::getgid() };
|
||||
if !(cred.uid() == 0 || cred.gid() == mygid || cred.gid() == backup_gid) {
|
||||
eprintln!("no permissions for {:?}", cred);
|
||||
continue;
|
||||
}
|
||||
|
||||
let (rx, mut tx) = tokio::io::split(conn);
|
||||
|
||||
let abort_future = super::last_worker_future().map(|_| ());
|
||||
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
|
||||
let func = Arc::clone(&func);
|
||||
let path = path.clone();
|
||||
tokio::spawn(futures::future::select(
|
||||
async move {
|
||||
let mut rx = tokio::io::BufReader::new(rx);
|
||||
let mut line = String::new();
|
||||
loop {
|
||||
line.clear();
|
||||
match rx.read_line({ line.clear(); &mut line }).await {
|
||||
Ok(0) => break,
|
||||
Ok(_) => (),
|
||||
Err(err) => {
|
||||
eprintln!("control socket {:?} read error: {}", path, err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let response = match line.parse::<Value>() {
|
||||
Ok(param) => match func(param) {
|
||||
Ok(res) => format!("OK: {}\n", res),
|
||||
Err(err) => format!("ERROR: {}\n", err),
|
||||
}
|
||||
Err(err) => format!("ERROR: {}\n", err),
|
||||
};
|
||||
|
||||
if let Err(err) = tx.write_all(response.as_bytes()).await {
|
||||
eprintln!("control socket {:?} write response error: {}", path, err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}.boxed(),
|
||||
abort_future,
|
||||
).map(|_| ()));
|
||||
}
|
||||
}.boxed();
|
||||
|
||||
let abort_future = super::last_worker_future().map_err(|_| {});
|
||||
let task = futures::future::select(
|
||||
control_future,
|
||||
abort_future,
|
||||
).map(|_: futures::future::Either<(Result<(), Error>, _), _>| ());
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
|
||||
pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
T: ?Sized + Serialize,
|
||||
{
|
||||
let mut command_string = serde_json::to_string(params)?;
|
||||
command_string.push('\n');
|
||||
send_raw_command(path.as_ref(), &command_string).await
|
||||
}
|
||||
|
||||
pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
|
||||
|
||||
let mut conn = tokio::net::UnixStream::connect(path)
|
||||
.map_err(move |err| format_err!("control socket connect failed - {}", err))
|
||||
.await?;
|
||||
|
||||
conn.write_all(command_string.as_bytes()).await?;
|
||||
if !command_string.as_bytes().ends_with(b"\n") {
|
||||
conn.write_all(b"\n").await?;
|
||||
}
|
||||
|
||||
AsyncWriteExt::shutdown(&mut conn).await?;
|
||||
let mut rx = tokio::io::BufReader::new(conn);
|
||||
let mut data = String::new();
|
||||
if rx.read_line(&mut data).await? == 0 {
|
||||
bail!("no response");
|
||||
}
|
||||
if let Some(res) = data.strip_prefix("OK: ") {
|
||||
match res.parse::<Value>() {
|
||||
Ok(v) => Ok(v),
|
||||
Err(err) => bail!("unable to parse json response - {}", err),
|
||||
}
|
||||
} else if let Some(err) = data.strip_prefix("ERROR: ") {
|
||||
bail!("{}", err);
|
||||
} else {
|
||||
bail!("unable to parse response: {}", data);
|
||||
}
|
||||
}
|
||||
|
||||
/// A callback for a specific commando socket.
|
||||
pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
|
||||
|
||||
/// Tooling to get a single control command socket where one can register multiple commands
|
||||
/// dynamically.
|
||||
/// You need to call `spawn()` to make the socket active.
|
||||
pub struct CommandoSocket {
|
||||
socket: PathBuf,
|
||||
commands: HashMap<String, CommandoSocketFn>,
|
||||
}
|
||||
|
||||
impl CommandoSocket {
|
||||
pub fn new<P>(path: P) -> Self
|
||||
where P: Into<PathBuf>,
|
||||
{
|
||||
CommandoSocket {
|
||||
socket: path.into(),
|
||||
commands: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the socket and consume self, meaning you cannot register commands anymore after
|
||||
/// calling this.
|
||||
pub fn spawn(self) -> Result<(), Error> {
|
||||
let control_future = create_control_socket(self.socket.to_owned(), move |param| {
|
||||
let param = param
|
||||
.as_object()
|
||||
.ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
|
||||
|
||||
let command = match param.get("command") {
|
||||
Some(Value::String(command)) => command.as_str(),
|
||||
None => bail!("no command"),
|
||||
_ => bail!("unable to parse command"),
|
||||
};
|
||||
|
||||
if !self.commands.contains_key(command) {
|
||||
bail!("got unknown command '{}'", command);
|
||||
}
|
||||
|
||||
match self.commands.get(command) {
|
||||
None => bail!("got unknown command '{}'", command),
|
||||
Some(handler) => {
|
||||
let args = param.get("args"); //.unwrap_or(&Value::Null);
|
||||
(handler)(args)
|
||||
},
|
||||
}
|
||||
})?;
|
||||
|
||||
tokio::spawn(control_future);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Register a new command with a callback.
|
||||
pub fn register_command<F>(
|
||||
&mut self,
|
||||
command: String,
|
||||
handler: F,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
F: Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static,
|
||||
{
|
||||
|
||||
if self.commands.contains_key(&command) {
|
||||
bail!("command '{}' already exists!", command);
|
||||
}
|
||||
|
||||
self.commands.insert(command, Box::new(handler));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,171 +0,0 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::time::SystemTime;
|
||||
use std::fs::metadata;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
use anyhow::{bail, Error, format_err};
|
||||
use hyper::Method;
|
||||
use handlebars::Handlebars;
|
||||
use serde::Serialize;
|
||||
|
||||
use proxmox::api::{ApiMethod, Router, RpcEnvironmentType};
|
||||
use proxmox::tools::fs::{create_path, CreateOptions};
|
||||
|
||||
use crate::tools::{FileLogger, FileLogOptions};
|
||||
use super::auth::ApiAuth;
|
||||
|
||||
pub struct ApiConfig {
|
||||
basedir: PathBuf,
|
||||
router: &'static Router,
|
||||
aliases: HashMap<String, PathBuf>,
|
||||
env_type: RpcEnvironmentType,
|
||||
templates: RwLock<Handlebars<'static>>,
|
||||
template_files: RwLock<HashMap<String, (SystemTime, PathBuf)>>,
|
||||
request_log: Option<Arc<Mutex<FileLogger>>>,
|
||||
pub api_auth: Arc<dyn ApiAuth + Send + Sync>,
|
||||
}
|
||||
|
||||
impl ApiConfig {
|
||||
pub fn new<B: Into<PathBuf>>(
|
||||
basedir: B,
|
||||
router: &'static Router,
|
||||
env_type: RpcEnvironmentType,
|
||||
api_auth: Arc<dyn ApiAuth + Send + Sync>,
|
||||
) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
basedir: basedir.into(),
|
||||
router,
|
||||
aliases: HashMap::new(),
|
||||
env_type,
|
||||
templates: RwLock::new(Handlebars::new()),
|
||||
template_files: RwLock::new(HashMap::new()),
|
||||
request_log: None,
|
||||
api_auth,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn find_method(
|
||||
&self,
|
||||
components: &[&str],
|
||||
method: Method,
|
||||
uri_param: &mut HashMap<String, String>,
|
||||
) -> Option<&'static ApiMethod> {
|
||||
|
||||
self.router.find_method(components, method, uri_param)
|
||||
}
|
||||
|
||||
pub fn find_alias(&self, components: &[&str]) -> PathBuf {
|
||||
|
||||
let mut prefix = String::new();
|
||||
let mut filename = self.basedir.clone();
|
||||
let comp_len = components.len();
|
||||
if comp_len >= 1 {
|
||||
prefix.push_str(components[0]);
|
||||
if let Some(subdir) = self.aliases.get(&prefix) {
|
||||
filename.push(subdir);
|
||||
components.iter().skip(1).for_each(|comp| filename.push(comp));
|
||||
} else {
|
||||
components.iter().for_each(|comp| filename.push(comp));
|
||||
}
|
||||
}
|
||||
filename
|
||||
}
|
||||
|
||||
pub fn add_alias<S, P>(&mut self, alias: S, path: P)
|
||||
where S: Into<String>,
|
||||
P: Into<PathBuf>,
|
||||
{
|
||||
self.aliases.insert(alias.into(), path.into());
|
||||
}
|
||||
|
||||
pub fn env_type(&self) -> RpcEnvironmentType {
|
||||
self.env_type
|
||||
}
|
||||
|
||||
pub fn register_template<P>(&self, name: &str, path: P) -> Result<(), Error>
|
||||
where
|
||||
P: Into<PathBuf>
|
||||
{
|
||||
if self.template_files.read().unwrap().contains_key(name) {
|
||||
bail!("template already registered");
|
||||
}
|
||||
|
||||
let path: PathBuf = path.into();
|
||||
let metadata = metadata(&path)?;
|
||||
let mtime = metadata.modified()?;
|
||||
|
||||
self.templates.write().unwrap().register_template_file(name, &path)?;
|
||||
self.template_files.write().unwrap().insert(name.to_string(), (mtime, path));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Checks if the template was modified since the last rendering
|
||||
/// if yes, it loads a the new version of the template
|
||||
pub fn render_template<T>(&self, name: &str, data: &T) -> Result<String, Error>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let path;
|
||||
let mtime;
|
||||
{
|
||||
let template_files = self.template_files.read().unwrap();
|
||||
let (old_mtime, old_path) = template_files.get(name).ok_or_else(|| format_err!("template not found"))?;
|
||||
|
||||
mtime = metadata(old_path)?.modified()?;
|
||||
if mtime <= *old_mtime {
|
||||
return self.templates.read().unwrap().render(name, data).map_err(|err| format_err!("{}", err));
|
||||
}
|
||||
path = old_path.to_path_buf();
|
||||
}
|
||||
|
||||
{
|
||||
let mut template_files = self.template_files.write().unwrap();
|
||||
let mut templates = self.templates.write().unwrap();
|
||||
|
||||
templates.register_template_file(name, &path)?;
|
||||
template_files.insert(name.to_string(), (mtime, path));
|
||||
|
||||
templates.render(name, data).map_err(|err| format_err!("{}", err))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enable_file_log<P>(
|
||||
&mut self,
|
||||
path: P,
|
||||
commando_sock: &mut super::CommandoSocket,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
P: Into<PathBuf>
|
||||
{
|
||||
let path: PathBuf = path.into();
|
||||
if let Some(base) = path.parent() {
|
||||
if !base.exists() {
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
let opts = CreateOptions::new().owner(backup_user.uid).group(backup_user.gid);
|
||||
create_path(base, None, Some(opts)).map_err(|err| format_err!("{}", err))?;
|
||||
}
|
||||
}
|
||||
|
||||
let logger_options = FileLogOptions {
|
||||
append: true,
|
||||
owned_by_backup: true,
|
||||
..Default::default()
|
||||
};
|
||||
let request_log = Arc::new(Mutex::new(FileLogger::new(&path, logger_options)?));
|
||||
self.request_log = Some(Arc::clone(&request_log));
|
||||
|
||||
commando_sock.register_command("api-access-log-reopen".into(), move |_args| {
|
||||
println!("re-opening log file");
|
||||
request_log.lock().unwrap().reopen()?;
|
||||
Ok(serde_json::Value::Null)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_file_log(&self) -> Option<&Arc<Mutex<FileLogger>>> {
|
||||
self.request_log.as_ref()
|
||||
}
|
||||
}
|
@ -52,21 +52,12 @@ pub use environment::*;
|
||||
mod upid;
|
||||
pub use upid::*;
|
||||
|
||||
mod state;
|
||||
pub use state::*;
|
||||
|
||||
mod command_socket;
|
||||
pub use command_socket::*;
|
||||
|
||||
mod worker_task;
|
||||
pub use worker_task::*;
|
||||
|
||||
mod h2service;
|
||||
pub use h2service::*;
|
||||
|
||||
pub mod config;
|
||||
pub use config::*;
|
||||
|
||||
pub mod formatter;
|
||||
|
||||
#[macro_use]
|
||||
@ -98,7 +89,7 @@ pub mod pull;
|
||||
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
|
||||
let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
|
||||
let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
|
||||
let _: Value = crate::server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
|
||||
let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"reload-certificate\"}\n")
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@ -106,7 +97,7 @@ pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
|
||||
pub(crate) async fn notify_datastore_removed() -> Result<(), Error> {
|
||||
let proxy_pid = crate::server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
|
||||
let sock = crate::server::ctrl_sock_from_pid(proxy_pid);
|
||||
let _: Value = crate::server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
|
||||
let _: Value = proxmox_rest_server::send_raw_command(sock, "{\"command\":\"datastore-removed\"}\n")
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -29,21 +29,20 @@ use proxmox::api::{
|
||||
RpcEnvironmentType,
|
||||
};
|
||||
use proxmox::http_err;
|
||||
use proxmox::tools::fs::CreateOptions;
|
||||
|
||||
use pbs_tools::compression::{DeflateEncoder, Level};
|
||||
use pbs_tools::stream::AsyncReaderStream;
|
||||
use pbs_api_types::{Authid, Userid};
|
||||
use proxmox_rest_server::{ApiConfig, FileLogger, FileLogOptions, AuthError};
|
||||
|
||||
use super::auth::AuthError;
|
||||
use super::environment::RestEnvironment;
|
||||
use super::formatter::*;
|
||||
use super::ApiConfig;
|
||||
|
||||
use crate::auth_helpers::*;
|
||||
use pbs_config::CachedUserInfo;
|
||||
use crate::tools;
|
||||
use crate::tools::compression::CompressionMethod;
|
||||
use crate::tools::FileLogger;
|
||||
|
||||
extern "C" {
|
||||
fn tzset();
|
||||
@ -196,10 +195,16 @@ fn log_response(
|
||||
}
|
||||
}
|
||||
pub fn auth_logger() -> Result<FileLogger, Error> {
|
||||
let logger_options = tools::FileLogOptions {
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
|
||||
let file_opts = CreateOptions::new()
|
||||
.owner(backup_user.uid)
|
||||
.group(backup_user.gid);
|
||||
|
||||
let logger_options = FileLogOptions {
|
||||
append: true,
|
||||
prefix_time: true,
|
||||
owned_by_backup: true,
|
||||
file_opts,
|
||||
..Default::default()
|
||||
};
|
||||
FileLogger::new(pbs_buildcfg::API_AUTH_LOG_FN, logger_options)
|
||||
@ -681,7 +686,6 @@ async fn handle_request(
|
||||
|
||||
rpcenv.set_client_ip(Some(*peer));
|
||||
|
||||
let user_info = CachedUserInfo::new()?;
|
||||
let auth = &api.api_auth;
|
||||
|
||||
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
|
||||
@ -708,8 +712,8 @@ async fn handle_request(
|
||||
}
|
||||
|
||||
if auth_required {
|
||||
match auth.check_auth(&parts.headers, &method, &user_info) {
|
||||
Ok(authid) => rpcenv.set_auth_id(Some(authid.to_string())),
|
||||
match auth.check_auth(&parts.headers, &method) {
|
||||
Ok(authid) => rpcenv.set_auth_id(Some(authid)),
|
||||
Err(auth_err) => {
|
||||
let err = match auth_err {
|
||||
AuthError::Generic(err) => err,
|
||||
@ -738,6 +742,8 @@ async fn handle_request(
|
||||
}
|
||||
Some(api_method) => {
|
||||
let auth_id = rpcenv.get_auth_id();
|
||||
let user_info = CachedUserInfo::new()?;
|
||||
|
||||
if !check_api_permission(
|
||||
api_method.access.permission,
|
||||
auth_id.as_deref(),
|
||||
@ -779,8 +785,9 @@ async fn handle_request(
|
||||
|
||||
if comp_len == 0 {
|
||||
let language = extract_lang_header(&parts.headers);
|
||||
match auth.check_auth(&parts.headers, &method, &user_info) {
|
||||
match auth.check_auth(&parts.headers, &method) {
|
||||
Ok(auth_id) => {
|
||||
let auth_id: Authid = auth_id.parse()?;
|
||||
if !auth_id.is_token() {
|
||||
let userid = auth_id.user();
|
||||
let new_csrf_token = assemble_csrf_prevention_token(csrf_secret(), userid);
|
||||
|
@ -1,142 +0,0 @@
|
||||
use anyhow::{Error};
|
||||
use lazy_static::lazy_static;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use futures::*;
|
||||
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
use pbs_tools::broadcast_future::BroadcastData;
|
||||
|
||||
#[derive(PartialEq, Copy, Clone, Debug)]
|
||||
pub enum ServerMode {
|
||||
Normal,
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
pub struct ServerState {
|
||||
pub mode: ServerMode,
|
||||
pub shutdown_listeners: BroadcastData<()>,
|
||||
pub last_worker_listeners: BroadcastData<()>,
|
||||
pub worker_count: usize,
|
||||
pub internal_task_count: usize,
|
||||
pub reload_request: bool,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref SERVER_STATE: Mutex<ServerState> = Mutex::new(ServerState {
|
||||
mode: ServerMode::Normal,
|
||||
shutdown_listeners: BroadcastData::new(),
|
||||
last_worker_listeners: BroadcastData::new(),
|
||||
worker_count: 0,
|
||||
internal_task_count: 0,
|
||||
reload_request: false,
|
||||
});
|
||||
}
|
||||
|
||||
pub fn server_state_init() -> Result<(), Error> {
|
||||
|
||||
let mut stream = signal(SignalKind::interrupt())?;
|
||||
|
||||
let future = async move {
|
||||
while stream.recv().await.is_some() {
|
||||
println!("got shutdown request (SIGINT)");
|
||||
SERVER_STATE.lock().unwrap().reload_request = false;
|
||||
crate::tools::request_shutdown();
|
||||
}
|
||||
}.boxed();
|
||||
|
||||
let abort_future = last_worker_future().map_err(|_| {});
|
||||
let task = futures::future::select(future, abort_future);
|
||||
|
||||
tokio::spawn(task.map(|_| ()));
|
||||
|
||||
let mut stream = signal(SignalKind::hangup())?;
|
||||
|
||||
let future = async move {
|
||||
while stream.recv().await.is_some() {
|
||||
println!("got reload request (SIGHUP)");
|
||||
SERVER_STATE.lock().unwrap().reload_request = true;
|
||||
crate::tools::request_shutdown();
|
||||
}
|
||||
}.boxed();
|
||||
|
||||
let abort_future = last_worker_future().map_err(|_| {});
|
||||
let task = futures::future::select(future, abort_future);
|
||||
|
||||
tokio::spawn(task.map(|_| ()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_reload_request() -> bool {
|
||||
let data = SERVER_STATE.lock().unwrap();
|
||||
|
||||
data.mode == ServerMode::Shutdown && data.reload_request
|
||||
}
|
||||
|
||||
pub fn server_shutdown() {
|
||||
let mut data = SERVER_STATE.lock().unwrap();
|
||||
|
||||
println!("SET SHUTDOWN MODE");
|
||||
|
||||
data.mode = ServerMode::Shutdown;
|
||||
|
||||
data.shutdown_listeners.notify_listeners(Ok(()));
|
||||
|
||||
drop(data); // unlock
|
||||
|
||||
check_last_worker();
|
||||
}
|
||||
|
||||
pub fn shutdown_future() -> impl Future<Output = ()> {
|
||||
let mut data = SERVER_STATE.lock().unwrap();
|
||||
data
|
||||
.shutdown_listeners
|
||||
.listen()
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
|
||||
let mut data = SERVER_STATE.lock().unwrap();
|
||||
data.last_worker_listeners.listen()
|
||||
}
|
||||
|
||||
pub fn set_worker_count(count: usize) {
|
||||
SERVER_STATE.lock().unwrap().worker_count = count;
|
||||
|
||||
check_last_worker();
|
||||
}
|
||||
|
||||
pub fn check_last_worker() {
|
||||
let mut data = SERVER_STATE.lock().unwrap();
|
||||
|
||||
if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }
|
||||
|
||||
data.last_worker_listeners.notify_listeners(Ok(()));
|
||||
}
|
||||
|
||||
/// Spawns a tokio task that will be tracked for reload
|
||||
/// and if it is finished, notify the last_worker_listener if we
|
||||
/// are in shutdown mode
|
||||
pub fn spawn_internal_task<T>(task: T)
|
||||
where
|
||||
T: Future + Send + 'static,
|
||||
T::Output: Send + 'static,
|
||||
{
|
||||
let mut data = SERVER_STATE.lock().unwrap();
|
||||
data.internal_task_count += 1;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::spawn(task).await; // ignore errors
|
||||
|
||||
{ // drop mutex
|
||||
let mut data = SERVER_STATE.lock().unwrap();
|
||||
if data.internal_task_count > 0 {
|
||||
data.internal_task_count -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
check_last_worker();
|
||||
});
|
||||
}
|
@ -20,12 +20,10 @@ use pbs_buildcfg;
|
||||
use pbs_tools::logrotate::{LogRotate, LogRotateFiles};
|
||||
use pbs_api_types::{Authid, TaskStateType, UPID};
|
||||
use pbs_config::{open_backup_lockfile, BackupLockGuard};
|
||||
use proxmox_rest_server::{CommandoSocket, FileLogger, FileLogOptions};
|
||||
|
||||
use super::UPIDExt;
|
||||
|
||||
use crate::server;
|
||||
use crate::tools::{FileLogger, FileLogOptions};
|
||||
|
||||
macro_rules! taskdir {
|
||||
($subdir:expr) => (concat!(pbs_buildcfg::PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks", $subdir))
|
||||
}
|
||||
@ -41,7 +39,7 @@ lazy_static! {
|
||||
|
||||
/// checks if the task UPID refers to a worker from this process
|
||||
fn is_local_worker(upid: &UPID) -> bool {
|
||||
upid.pid == server::pid() && upid.pstart == server::pstart()
|
||||
upid.pid == crate::server::pid() && upid.pstart == crate::server::pstart()
|
||||
}
|
||||
|
||||
/// Test if the task is still running
|
||||
@ -54,14 +52,14 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let sock = server::ctrl_sock_from_pid(upid.pid);
|
||||
let sock = crate::server::ctrl_sock_from_pid(upid.pid);
|
||||
let cmd = json!({
|
||||
"command": "worker-task-status",
|
||||
"args": {
|
||||
"upid": upid.to_string(),
|
||||
},
|
||||
});
|
||||
let status = super::send_command(sock, &cmd).await?;
|
||||
let status = proxmox_rest_server::send_command(sock, &cmd).await?;
|
||||
|
||||
if let Some(active) = status.as_bool() {
|
||||
Ok(active)
|
||||
@ -84,7 +82,7 @@ pub fn worker_is_active_local(upid: &UPID) -> bool {
|
||||
}
|
||||
|
||||
pub fn register_task_control_commands(
|
||||
commando_sock: &mut super::CommandoSocket,
|
||||
commando_sock: &mut CommandoSocket,
|
||||
) -> Result<(), Error> {
|
||||
fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
|
||||
let args = if let Some(args) = args { args } else { bail!("missing args") };
|
||||
@ -128,14 +126,14 @@ pub fn abort_worker_async(upid: UPID) {
|
||||
|
||||
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
|
||||
|
||||
let sock = server::ctrl_sock_from_pid(upid.pid);
|
||||
let sock = crate::server::ctrl_sock_from_pid(upid.pid);
|
||||
let cmd = json!({
|
||||
"command": "worker-task-abort",
|
||||
"args": {
|
||||
"upid": upid.to_string(),
|
||||
},
|
||||
});
|
||||
super::send_command(sock, &cmd).map_ok(|_| ()).await
|
||||
proxmox_rest_server::send_command(sock, &cmd).map_ok(|_| ()).await
|
||||
}
|
||||
|
||||
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {
|
||||
@ -579,7 +577,6 @@ impl Iterator for TaskListInfoIterator {
|
||||
/// task/future. Each task can `log()` messages, which are stored
|
||||
/// persistently to files. Task should poll the `abort_requested`
|
||||
/// flag, and stop execution when requested.
|
||||
#[derive(Debug)]
|
||||
pub struct WorkerTask {
|
||||
upid: UPID,
|
||||
data: Mutex<WorkerTaskData>,
|
||||
@ -593,7 +590,6 @@ impl std::fmt::Display for WorkerTask {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct WorkerTaskData {
|
||||
logger: FileLogger,
|
||||
progress: f64, // 0..1
|
||||
@ -642,7 +638,7 @@ impl WorkerTask {
|
||||
{
|
||||
let mut hash = WORKER_TASK_LIST.lock().unwrap();
|
||||
hash.insert(task_id, worker.clone());
|
||||
super::set_worker_count(hash.len());
|
||||
proxmox_rest_server::set_worker_count(hash.len());
|
||||
}
|
||||
|
||||
update_active_workers(Some(&upid))?;
|
||||
@ -729,7 +725,7 @@ impl WorkerTask {
|
||||
|
||||
WORKER_TASK_LIST.lock().unwrap().remove(&self.upid.task_id);
|
||||
let _ = update_active_workers(None);
|
||||
super::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
|
||||
proxmox_rest_server::set_worker_count(WORKER_TASK_LIST.lock().unwrap().len());
|
||||
}
|
||||
|
||||
/// Log a message.
|
||||
|
@ -16,7 +16,6 @@ use futures::future::{self, Either};
|
||||
|
||||
use proxmox::tools::io::{ReadExt, WriteExt};
|
||||
|
||||
use crate::server;
|
||||
use crate::tools::{fd_change_cloexec, self};
|
||||
|
||||
#[link(name = "systemd")]
|
||||
@ -274,11 +273,11 @@ where
|
||||
).await?;
|
||||
|
||||
let server_future = create_service(listener, NotifyReady)?;
|
||||
let shutdown_future = server::shutdown_future();
|
||||
let shutdown_future = proxmox_rest_server::shutdown_future();
|
||||
|
||||
let finish_future = match future::select(server_future, shutdown_future).await {
|
||||
Either::Left((_, _)) => {
|
||||
crate::tools::request_shutdown(); // make sure we are in shutdown mode
|
||||
proxmox_rest_server::request_shutdown(); // make sure we are in shutdown mode
|
||||
None
|
||||
}
|
||||
Either::Right((_, server_future)) => Some(server_future),
|
||||
@ -286,7 +285,7 @@ where
|
||||
|
||||
let mut reloader = Some(reloader);
|
||||
|
||||
if server::is_reload_request() {
|
||||
if proxmox_rest_server::is_reload_request() {
|
||||
log::info!("daemon reload...");
|
||||
if let Err(e) = systemd_notify(SystemdNotify::Reloading) {
|
||||
log::error!("failed to notify systemd about the state change: {}", e);
|
||||
@ -305,7 +304,7 @@ where
|
||||
}
|
||||
|
||||
// FIXME: this is a hack, replace with sd_notify_barrier when available
|
||||
if server::is_reload_request() {
|
||||
if proxmox_rest_server::is_reload_request() {
|
||||
wait_service_is_not_state(service_name, "reloading").await?;
|
||||
}
|
||||
|
||||
|
@ -1,142 +0,0 @@
|
||||
use anyhow::Error;
|
||||
use std::io::Write;
|
||||
|
||||
/// Log messages with optional automatically added timestamps into files
|
||||
///
|
||||
/// Logs messages to file, and optionally to standard output.
|
||||
///
|
||||
///
|
||||
/// #### Example:
|
||||
/// ```
|
||||
/// # use anyhow::{bail, format_err, Error};
|
||||
/// use proxmox_backup::flog;
|
||||
/// use proxmox_backup::tools::{FileLogger, FileLogOptions};
|
||||
///
|
||||
/// # std::fs::remove_file("test.log");
|
||||
/// let options = FileLogOptions {
|
||||
/// to_stdout: true,
|
||||
/// exclusive: true,
|
||||
/// ..Default::default()
|
||||
/// };
|
||||
/// let mut log = FileLogger::new("test.log", options).unwrap();
|
||||
/// flog!(log, "A simple log: {}", "Hello!");
|
||||
/// # std::fs::remove_file("test.log");
|
||||
/// ```
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
/// Options to control the behavior of a ['FileLogger'] instance
|
||||
pub struct FileLogOptions {
|
||||
/// Open underlying log file in append mode, useful when multiple concurrent processes
|
||||
/// want to log to the same file (e.g., HTTP access log). Note that it is only atomic
|
||||
/// for writes smaller than the PIPE_BUF (4k on Linux).
|
||||
/// Inside the same process you may need to still use an mutex, for shared access.
|
||||
pub append: bool,
|
||||
/// Open underlying log file as readable
|
||||
pub read: bool,
|
||||
/// If set, ensure that the file is newly created or error out if already existing.
|
||||
pub exclusive: bool,
|
||||
/// Duplicate logged messages to STDOUT, like tee
|
||||
pub to_stdout: bool,
|
||||
/// Prefix messages logged to the file with the current local time as RFC 3339
|
||||
pub prefix_time: bool,
|
||||
/// if set, the file is tried to be chowned by the backup:backup user/group
|
||||
/// Note, this is not designed race free as anybody could set it to another user afterwards
|
||||
/// anyway. It must thus be used by all processes which doe not run as backup uid/gid.
|
||||
pub owned_by_backup: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FileLogger {
|
||||
file: std::fs::File,
|
||||
file_name: std::path::PathBuf,
|
||||
options: FileLogOptions,
|
||||
}
|
||||
|
||||
/// Log messages to [`FileLogger`](tools/struct.FileLogger.html)
|
||||
#[macro_export]
|
||||
macro_rules! flog {
|
||||
($log:expr, $($arg:tt)*) => ({
|
||||
$log.log(format!($($arg)*));
|
||||
})
|
||||
}
|
||||
|
||||
impl FileLogger {
|
||||
pub fn new<P: AsRef<std::path::Path>>(
|
||||
file_name: P,
|
||||
options: FileLogOptions,
|
||||
) -> Result<Self, Error> {
|
||||
let file = Self::open(&file_name, &options)?;
|
||||
|
||||
let file_name: std::path::PathBuf = file_name.as_ref().to_path_buf();
|
||||
|
||||
Ok(Self { file, file_name, options })
|
||||
}
|
||||
|
||||
pub fn reopen(&mut self) -> Result<&Self, Error> {
|
||||
let file = Self::open(&self.file_name, &self.options)?;
|
||||
self.file = file;
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn open<P: AsRef<std::path::Path>>(
|
||||
file_name: P,
|
||||
options: &FileLogOptions,
|
||||
) -> Result<std::fs::File, Error> {
|
||||
let file = std::fs::OpenOptions::new()
|
||||
.read(options.read)
|
||||
.write(true)
|
||||
.append(options.append)
|
||||
.create_new(options.exclusive)
|
||||
.create(!options.exclusive)
|
||||
.open(&file_name)?;
|
||||
|
||||
if options.owned_by_backup {
|
||||
let backup_user = pbs_config::backup_user()?;
|
||||
nix::unistd::chown(file_name.as_ref(), Some(backup_user.uid), Some(backup_user.gid))?;
|
||||
}
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
pub fn log<S: AsRef<str>>(&mut self, msg: S) {
|
||||
let msg = msg.as_ref();
|
||||
|
||||
if self.options.to_stdout {
|
||||
let mut stdout = std::io::stdout();
|
||||
stdout.write_all(msg.as_bytes()).unwrap();
|
||||
stdout.write_all(b"\n").unwrap();
|
||||
}
|
||||
|
||||
let line = if self.options.prefix_time {
|
||||
let now = proxmox::tools::time::epoch_i64();
|
||||
let rfc3339 = match proxmox::tools::time::epoch_to_rfc3339(now) {
|
||||
Ok(rfc3339) => rfc3339,
|
||||
Err(_) => "1970-01-01T00:00:00Z".into(), // for safety, should really not happen!
|
||||
};
|
||||
format!("{}: {}\n", rfc3339, msg)
|
||||
} else {
|
||||
format!("{}\n", msg)
|
||||
};
|
||||
if let Err(err) = self.file.write_all(line.as_bytes()) {
|
||||
// avoid panicking, log methods should not do that
|
||||
// FIXME: or, return result???
|
||||
eprintln!("error writing to log file - {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::io::Write for FileLogger {
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||
if self.options.to_stdout {
|
||||
let _ = std::io::stdout().write(buf);
|
||||
}
|
||||
self.file.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> Result<(), std::io::Error> {
|
||||
if self.options.to_stdout {
|
||||
let _ = std::io::stdout().flush();
|
||||
}
|
||||
self.file.flush()
|
||||
}
|
||||
}
|
@ -31,9 +31,6 @@ pub mod ticket;
|
||||
pub mod parallel_handler;
|
||||
pub use parallel_handler::ParallelHandler;
|
||||
|
||||
mod file_logger;
|
||||
pub use file_logger::{FileLogger, FileLogOptions};
|
||||
|
||||
/// Shortcut for md5 sums.
|
||||
pub fn md5sum(data: &[u8]) -> Result<DigestBytes, Error> {
|
||||
hash(MessageDigest::md5(), data).map_err(Error::from)
|
||||
@ -123,27 +120,6 @@ pub fn fd_change_cloexec(fd: RawFd, on: bool) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
static mut SHUTDOWN_REQUESTED: bool = false;
|
||||
|
||||
pub fn request_shutdown() {
|
||||
unsafe {
|
||||
SHUTDOWN_REQUESTED = true;
|
||||
}
|
||||
crate::server::server_shutdown();
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn shutdown_requested() -> bool {
|
||||
unsafe { SHUTDOWN_REQUESTED }
|
||||
}
|
||||
|
||||
pub fn fail_on_shutdown() -> Result<(), Error> {
|
||||
if shutdown_requested() {
|
||||
bail!("Server shutdown requested - aborting task");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// safe wrapper for `nix::sys::socket::socketpair` defaulting to `O_CLOEXEC` and guarding the file
|
||||
/// descriptors.
|
||||
pub fn socketpair() -> Result<(Fd, Fd), Error> {
|
||||
|
Reference in New Issue
Block a user