server: use generalized commando socket for worker tasks commands

Allows to extend the use of that socket in the future, e.g., for log
rotate re-open signaling.

To reflect this we use a more general name, and change the commandos
to a more clear namespace.

Both are actually somewhat a breaking change, but the single real
world issue it should be able to cause is, that one won't be able to
stop task from older daemons, which still use the older abstract
socket name format.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
This commit is contained in:
Thomas Lamprecht 2020-11-02 19:13:36 +01:00
parent f3df613cb7
commit a68768cf31
5 changed files with 77 additions and 70 deletions

View File

@ -52,6 +52,8 @@ async fn run() -> Result<(), Error> {
let mut config = server::ApiConfig::new(
buildcfg::JS_DIR, &proxmox_backup::api2::ROUTER, RpcEnvironmentType::PRIVILEGED)?;
let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
config.enable_file_log(buildcfg::API_ACCESS_LOG_FN)?;
let rest_server = RestServer::new(config);
@ -79,7 +81,8 @@ async fn run() -> Result<(), Error> {
daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
let init_result: Result<(), Error> = try_block!({
server::create_task_control_socket()?;
server::register_task_control_commands(&mut commando_sock)?;
commando_sock.spawn()?;
server::server_state_init()?;
Ok(())
});

View File

@ -94,6 +94,8 @@ async fn run() -> Result<(), Error> {
config.register_template("index", &indexpath)?;
config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?;
let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
config.enable_file_log(buildcfg::API_ACCESS_LOG_FN)?;
let rest_server = RestServer::new(config);
@ -146,7 +148,8 @@ async fn run() -> Result<(), Error> {
daemon::systemd_notify(daemon::SystemdNotify::Ready)?;
let init_result: Result<(), Error> = try_block!({
server::create_task_control_socket()?;
server::register_task_control_commands(&mut commando_sock)?;
commando_sock.spawn()?;
server::server_state_init()?;
Ok(())
});

View File

@ -4,6 +4,34 @@
//! services. We want async IO, so this is built on top of
//! tokio/hyper.
use lazy_static::lazy_static;
use nix::unistd::Pid;
use proxmox::sys::linux::procfs::PidStat;
use crate::buildcfg;
lazy_static! {
static ref PID: i32 = unsafe { libc::getpid() };
static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime;
}
pub fn pid() -> i32 {
*PID
}
pub fn pstart() -> u64 {
*PSTART
}
pub fn ctrl_sock_from_pid(pid: i32) -> String {
format!("\0{}/control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, pid)
}
pub fn our_ctrl_sock() -> String {
ctrl_sock_from_pid(*PID)
}
mod environment;
pub use environment::*;

View File

@ -8,7 +8,6 @@ use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
use futures::*;
use lazy_static::lazy_static;
use nix::unistd::Pid;
use serde_json::{json, Value};
use serde::{Serialize, Deserialize};
use tokio::sync::oneshot;
@ -20,6 +19,7 @@ use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOpti
use super::UPID;
use crate::buildcfg;
use crate::server;
use crate::tools::logrotate::{LogRotate, LogRotateFiles};
use crate::tools::{FileLogger, FileLogOptions};
use crate::api2::types::Authid;
@ -35,16 +35,16 @@ pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive");
lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
}
static ref MY_PID: i32 = unsafe { libc::getpid() };
static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID))
.unwrap()
.starttime;
/// checks if the task UPID refers to a worker from this process
fn is_local_worker(upid: &UPID) -> bool {
upid.pid == server::pid() && upid.pstart == server::pstart()
}
/// Test if the task is still running
pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
if is_local_worker(upid) {
return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id));
}
@ -52,15 +52,12 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
return Ok(false);
}
let socketname = format!(
"\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, upid.pid);
let sock = server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
"command": "status",
"command": "worker-task-status",
"upid": upid.to_string(),
});
let status = super::send_command(socketname, cmd).await?;
let status = super::send_command(sock, cmd).await?;
if let Some(active) = status.as_bool() {
Ok(active)
@ -71,69 +68,48 @@ pub async fn worker_is_active(upid: &UPID) -> Result<bool, Error> {
/// Test if the task is still running (fast but inaccurate implementation)
///
/// If the task is spanned from a different process, we simply return if
/// If the task is spawned from a different process, we simply return if
/// that process is still running. This information is good enough to detect
/// stale tasks...
pub fn worker_is_active_local(upid: &UPID) -> bool {
if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
if is_local_worker(upid) {
WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)
} else {
procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some()
}
}
pub fn create_task_control_socket() -> Result<(), Error> {
let socketname = format!(
"\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, *MY_PID);
let control_future = super::create_control_socket(socketname, |param| {
let param = param
.as_object()
.ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?;
if param.keys().count() != 2 { bail!("wrong number of parameters"); }
let command = param["command"]
.as_str()
.ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?;
// we have only two commands for now
if !(command == "abort-task" || command == "status") {
bail!("got unknown command '{}'", command);
}
let upid_str = param["upid"]
.as_str()
.ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?;
let upid = upid_str.parse::<UPID>()?;
if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) {
pub fn register_task_control_commands(
commando_sock: &mut super::CommandoSocket,
) -> Result<(), Error> {
fn get_upid(args: Option<&Value>) -> Result<UPID, Error> {
let args = if let Some(args) = args { args } else { bail!("missing args") };
let upid = match args.get("upid") {
Some(Value::String(upid)) => upid.parse::<UPID>()?,
None => bail!("no upid in args"),
_ => bail!("unable to parse upid"),
};
if !is_local_worker(&upid) {
bail!("upid does not belong to this process");
}
Ok(upid)
}
let hash = WORKER_TASK_LIST.lock().unwrap();
commando_sock.register_command("worker-task-abort".into(), move |args| {
let upid = get_upid(args)?;
match command {
"abort-task" => {
if let Some(ref worker) = hash.get(&upid.task_id) {
worker.request_abort();
} else {
// assume task is already stopped
}
Ok(Value::Null)
}
"status" => {
let active = hash.contains_key(&upid.task_id);
Ok(active.into())
}
_ => {
bail!("got unknown command '{}'", command);
}
if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) {
worker.request_abort();
}
Ok(Value::Null)
})?;
commando_sock.register_command("worker-task-status".into(), move |args| {
let upid = get_upid(args)?;
tokio::spawn(control_future);
let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id);
Ok(active.into())
})?;
Ok(())
}
@ -148,17 +124,12 @@ pub fn abort_worker_async(upid: UPID) {
pub async fn abort_worker(upid: UPID) -> Result<(), Error> {
let target_pid = upid.pid;
let socketname = format!(
"\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, target_pid);
let sock = server::ctrl_sock_from_pid(upid.pid);
let cmd = json!({
"command": "abort-task",
"command": "worker-task-abort",
"upid": upid.to_string(),
});
super::send_command(socketname, cmd).map_ok(|_| ()).await
super::send_command(sock, cmd).map_ok(|_| ()).await
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<TaskState>), Error> {

View File

@ -42,8 +42,10 @@ fn worker_task_abort() -> Result<(), Error> {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock());
let init_result: Result<(), Error> = try_block!({
server::create_task_control_socket()?;
server::register_task_control_commands(&mut commando_sock)?;
server::server_state_init()?;
Ok(())
});