proxmox-rest-server: improve docs

And renames abort_worker_async to abort_worker_nowait (avoid confusion,
because the function itself is not async).
This commit is contained in:
Dietmar Maurer 2021-09-30 10:33:57 +02:00
parent c76ff4b472
commit 2e44983a37
6 changed files with 66 additions and 17 deletions

View File

@ -102,7 +102,7 @@ where
Ok(task)
}
/// Send a command to the specified socket
pub async fn send_command<P, T>(path: P, params: &T) -> Result<Value, Error>
where
P: AsRef<Path>,
@ -113,6 +113,7 @@ where
send_raw_command(path.as_ref(), &command_string).await
}
/// Send a raw command (string) to the specified socket
pub async fn send_raw_command<P>(path: P, command_string: &str) -> Result<Value, Error>
where
P: AsRef<Path>,
@ -146,11 +147,12 @@ where
}
}
/// A callback for a specific commando socket.
pub type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
// A callback for a specific commando socket.
type CommandoSocketFn = Box<(dyn Fn(Option<&Value>) -> Result<Value, Error> + Send + Sync + 'static)>;
/// Tooling to get a single control command socket where one can register multiple commands
/// dynamically.
/// Tooling to get a single control command socket where one can
/// register multiple commands dynamically.
///
/// You need to call `spawn()` to make the socket active.
pub struct CommandoSocket {
socket: PathBuf,

View File

@ -40,6 +40,7 @@ pub use worker_task::*;
mod h2service;
pub use h2service::*;
/// Authentification Error
pub enum AuthError {
Generic(Error),
NoData,
@ -51,7 +52,12 @@ impl From<Error> for AuthError {
}
}
/// User Authentification trait
pub trait ApiAuth {
/// Extract user credentials from headers and check them.
///
/// If credenthials are valid, returns the username and a
/// [UserInformation] object to query additional user data.
fn check_auth(
&self,
headers: &http::HeaderMap,
@ -64,47 +70,64 @@ lazy_static::lazy_static!{
static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
}
/// Retruns the current process ID (see [libc::getpid])
///
/// The value is cached at startup (so it is invalid after a fork)
pub fn pid() -> i32 {
*PID
}
/// Returns the starttime of the process (see [PidStat])
///
/// The value is cached at startup (so it is invalid after a fork)
pub fn pstart() -> u64 {
*PSTART
}
/// Helper to write the PID into a file
pub fn write_pid(pid_fn: &str) -> Result<(), Error> {
let pid_str = format!("{}\n", *PID);
proxmox::tools::fs::replace_file(pid_fn, pid_str.as_bytes(), CreateOptions::new())
}
/// Helper to read the PID from a file
pub fn read_pid(pid_fn: &str) -> Result<i32, Error> {
let pid = proxmox::tools::fs::file_get_contents(pid_fn)?;
let pid = std::str::from_utf8(&pid)?.trim();
pid.parse().map_err(|err| format_err!("could not parse pid - {}", err))
}
/// Returns the control socket path for a specific process ID.
///
/// Note: The control socket always uses @/run/proxmox-backup/ as
/// prefix for historic reason. This does not matter because the
/// generated path is unique for each ``pid`` anyways.
pub fn ctrl_sock_from_pid(pid: i32) -> String {
// Note: The control socket always uses @/run/proxmox-backup/ as prefix
// for historc reason.
format!("\0{}/control-{}.sock", "/run/proxmox-backup", pid)
}
/// Returns the control socket path for this server.
pub fn our_ctrl_sock() -> String {
ctrl_sock_from_pid(*PID)
}
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
/// Request a server shutdown (usually called from [catch_shutdown_signal])
pub fn request_shutdown() {
SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
crate::server_shutdown();
}
/// Returns true if there was a shutdown request.
#[inline(always)]
pub fn shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::SeqCst)
}
/// Raise an error if there was a shutdown request.
pub fn fail_on_shutdown() -> Result<(), Error> {
if shutdown_requested() {
bail!("Server shutdown requested - aborting task");

View File

@ -8,6 +8,8 @@ use tokio::signal::unix::{signal, SignalKind};
use pbs_tools::broadcast_future::BroadcastData;
use crate::request_shutdown;
#[derive(PartialEq, Copy, Clone, Debug)]
enum ServerMode {
Normal,
@ -35,6 +37,8 @@ lazy_static! {
}
/// Listen to ``SIGINT`` for server shutdown
///
/// This calls [request_shutdown] when receiving the signal.
pub fn catch_shutdown_signal() -> Result<(), Error> {
let mut stream = signal(SignalKind::interrupt())?;
@ -43,7 +47,7 @@ pub fn catch_shutdown_signal() -> Result<(), Error> {
while stream.recv().await.is_some() {
log::info!("got shutdown request (SIGINT)");
SERVER_STATE.lock().unwrap().reload_request = false;
crate::request_shutdown();
request_shutdown();
}
}.boxed();
@ -56,6 +60,9 @@ pub fn catch_shutdown_signal() -> Result<(), Error> {
}
/// Listen to ``SIGHUP`` for server reload
///
/// This calls [request_shutdown] when receiving the signal, and tries
/// to restart the server.
pub fn catch_reload_signal() -> Result<(), Error> {
let mut stream = signal(SignalKind::hangup())?;
@ -76,13 +83,14 @@ pub fn catch_reload_signal() -> Result<(), Error> {
Ok(())
}
pub fn is_reload_request() -> bool {
pub(crate) fn is_reload_request() -> bool {
let data = SERVER_STATE.lock().unwrap();
data.mode == ServerMode::Shutdown && data.reload_request
}
pub fn server_shutdown() {
pub(crate) fn server_shutdown() {
let mut data = SERVER_STATE.lock().unwrap();
log::info!("request_shutdown");
@ -96,6 +104,7 @@ pub fn server_shutdown() {
check_last_worker();
}
/// Future to signal server shutdown
pub fn shutdown_future() -> impl Future<Output = ()> {
let mut data = SERVER_STATE.lock().unwrap();
data
@ -104,18 +113,19 @@ pub fn shutdown_future() -> impl Future<Output = ()> {
.map(|_| ())
}
/// Future to signal when last worker task finished
pub fn last_worker_future() -> impl Future<Output = Result<(), Error>> {
let mut data = SERVER_STATE.lock().unwrap();
data.last_worker_listeners.listen()
}
pub fn set_worker_count(count: usize) {
pub(crate) fn set_worker_count(count: usize) {
SERVER_STATE.lock().unwrap().worker_count = count;
check_last_worker();
}
pub fn check_last_worker() {
pub(crate) fn check_last_worker() {
let mut data = SERVER_STATE.lock().unwrap();
if !(data.mode == ServerMode::Shutdown && data.worker_count == 0 && data.internal_task_count == 0) { return; }

View File

@ -324,6 +324,14 @@ pub fn worker_is_active_local(upid: &UPID) -> bool {
}
}
/// Register task control command on a [CommandoSocket].
///
/// This create two commands:
///
/// * ``worker-task-abort <UPID>``: calls [abort_local_worker]
///
/// * ``worker-task-status <UPID>``: return true of false, depending on
/// whether the worker is running or stopped.
pub fn register_task_control_commands(
commando_sock: &mut CommandoSocket,
) -> Result<(), Error> {
@ -358,14 +366,20 @@ pub fn register_task_control_commands(
Ok(())
}
pub fn abort_worker_async(upid: UPID) {
/// Try to abort a worker task, but do no wait
///
/// Errors (if any) are simply logged.
pub fn abort_worker_nowait(upid: UPID) {
tokio::spawn(async move {
if let Err(err) = abort_worker(upid).await {
eprintln!("abort worker failed - {}", err);
log::error!("abort worker task failed - {}", err);
}
});
}
/// Abort a worker task
///
/// By sending ``worker-task-abort`` to the control socket.
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
let sock = crate::ctrl_sock_from_pid(upid.pid);
@ -513,7 +527,7 @@ fn read_task_file<R: Read>(reader: R) -> Result<Vec<TaskListInfo>, Error>
state
}),
Err(err) => {
eprintln!("unable to parse worker status '{}' - {}", line, err);
log::warn!("unable to parse worker status '{}' - {}", line, err);
continue;
}
};
@ -536,6 +550,7 @@ where
read_task_file(file)
}
/// Iterate over existing/active worker tasks
pub struct TaskListInfoIterator {
list: VecDeque<TaskListInfo>,
end: bool,
@ -544,6 +559,7 @@ pub struct TaskListInfoIterator {
}
impl TaskListInfoIterator {
/// Creates a new iterator instance.
pub fn new(active_only: bool) -> Result<Self, Error> {
let setup = worker_task_setup()?;
@ -811,8 +827,6 @@ impl WorkerTask {
/// Request abort
pub fn request_abort(&self) {
eprintln!("set abort flag for worker {}", self.upid);
let prev_abort = self.abort_requested.swap(true, Ordering::SeqCst);
if !prev_abort { // log abort one time
self.log_message(format!("received abort request ..."));

View File

@ -374,7 +374,7 @@ fn stop_task(
user_info.check_privs(&auth_id, &["system", "tasks"], PRIV_SYS_MODIFY, false)?;
}
proxmox_rest_server::abort_worker_async(upid);
proxmox_rest_server::abort_worker_nowait(upid);
Ok(Value::Null)
}

View File

@ -95,7 +95,7 @@ fn worker_task_abort() -> Result<(), Error> {
}
Ok(wid) => {
println!("WORKER: {}", wid);
proxmox_rest_server::abort_worker_async(wid.parse::<UPID>().unwrap());
proxmox_rest_server::abort_worker_nowait(wid.parse::<UPID>().unwrap());
proxmox_rest_server::wait_for_local_worker(&wid).await.unwrap();
}
}