src/server/worker_task.rs: implement task control socket

This commit is contained in:
Dietmar Maurer 2019-04-09 12:15:06 +02:00
parent e201753629
commit d607b8861b
3 changed files with 65 additions and 13 deletions

View File

@ -1,6 +1,5 @@
extern crate proxmox_backup;
//use proxmox_backup::tools;
use proxmox_backup::try_block;
use proxmox_backup::api_schema::router::*;
use proxmox_backup::api_schema::config::*;
use proxmox_backup::server::rest::*;
@ -33,7 +32,7 @@ fn run() -> Result<(), Error> {
bail!("unable to inititialize syslog - {}", err);
}
server::create_task_log_dir()?;
server::create_task_log_dirs()?;
config::create_configdir()?;
@ -69,7 +68,13 @@ fn run() -> Result<(), Error> {
tokio::run(lazy(|| {
if let Err(err) = server::server_state_init() {
let init_result: Result<(), Error> = try_block!({
server::create_task_control_socket()?;
server::server_state_init()?;
Ok(())
});
if let Err(err) = init_result {
eprintln!("unable to start daemon - {}", err);
} else {
tokio::spawn(server);

View File

@ -1,3 +1,4 @@
use proxmox_backup::try_block;
use proxmox_backup::configdir;
use proxmox_backup::tools;
use proxmox_backup::server;
@ -99,7 +100,13 @@ fn run() -> Result<(), Error> {
tokio::run(lazy(|| {
if let Err(err) = server::server_state_init() {
let init_result: Result<(), Error> = try_block!({
server::create_task_control_socket()?;
server::server_state_init()?;
Ok(())
});
if let Err(err) = init_result {
eprintln!("unable to start daemon - {}", err);
} else {
tokio::spawn(server);

View File

@ -9,14 +9,17 @@ use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::io::{BufRead, BufReader};
use std::fs::File;
use serde_json::Value;
use super::UPID;
use crate::tools::{self, FileLogger};
macro_rules! PROXMOX_BACKUP_VAR_RUN_DIR_M { () => ("/var/run/proxmox-backup") }
macro_rules! PROXMOX_BACKUP_LOG_DIR_M { () => ("/var/log/proxmox-backup") }
macro_rules! PROXMOX_BACKUP_TASK_DIR_M { () => (concat!( PROXMOX_BACKUP_LOG_DIR_M!(), "/tasks")) }
pub const PROXMOX_BACKUP_VAR_RUN_DIR: &str = PROXMOX_BACKUP_VAR_RUN_DIR_M!();
pub const PROXMOX_BACKUP_LOG_DIR: &str = PROXMOX_BACKUP_LOG_DIR_M!();
pub const PROXMOX_BACKUP_TASK_DIR: &str = PROXMOX_BACKUP_TASK_DIR_M!();
pub const PROXMOX_BACKUP_TASK_LOCK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_M!(), "/.active.lock");
@ -24,16 +27,14 @@ pub const PROXMOX_BACKUP_ACTIVE_TASK_FN: &str = concat!(PROXMOX_BACKUP_TASK_DIR_
lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
static ref MY_PID: i32 = unsafe { libc::getpid() };
static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime;
}
/// Test if the task is still running
pub fn worker_is_active(upid: &UPID) -> bool {
lazy_static! {
static ref MY_PID: i32 = unsafe { libc::getpid() };
static ref MY_PID_PSTART: u64 = tools::procfs::read_proc_pid_stat(*MY_PID).unwrap().starttime;
}
if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) {
if WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) {
true
@ -48,6 +49,45 @@ pub fn worker_is_active(upid: &UPID) -> bool {
}
}
pub fn create_task_control_socket() -> Result<(), Error> {
let socketname = format!(
"{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, *MY_PID);
let control_future = super::create_control_socket(socketname, true, |param| {
let param = param.as_object()
.ok_or(format_err!("unable to parse parameters (expected json object)"))?;
if param.keys().count() != 2 { bail!("worng number of parameters"); }
let command = param.get("command")
.ok_or(format_err!("unable to parse parameters (missing command)"))?;
// this is the only command for now
if command != "abort-task" { bail!("got unknown command '{}'", command); }
let upid_str = param["upid"].as_str()
.ok_or(format_err!("unable to parse parameters (missing upid)"))?;
let upid = upid_str.parse::<UPID>()?;
if !((upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART)) {
bail!("upid does not belong to this process");
}
let hash = WORKER_TASK_LIST.lock().unwrap();
if let Some(ref worker) = hash.get(&upid.task_id) {
worker.request_abort();
} else {
// assume task is already stopped
}
Ok(Value::Null)
})?;
tokio::spawn(control_future);
Ok(())
}
fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> {
let data = line.splitn(3, ' ').collect::<Vec<&str>>();
@ -65,7 +105,7 @@ fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, St
}
/// Create task log directory with correct permissions
pub fn create_task_log_dir() -> Result<(), Error> {
pub fn create_task_log_dirs() -> Result<(), Error> {
try_block!({
let (backup_uid, backup_gid) = tools::getpwnam_ugid("backup")?;
@ -74,7 +114,7 @@ pub fn create_task_log_dir() -> Result<(), Error> {
tools::create_dir_chown(PROXMOX_BACKUP_LOG_DIR, None, uid, gid)?;
tools::create_dir_chown(PROXMOX_BACKUP_TASK_DIR, None, uid, gid)?;
tools::create_dir_chown(PROXMOX_BACKUP_VAR_RUN_DIR, None, uid, gid)?;
Ok(())
}).map_err(|err: Error| format_err!("unable to create task log dir - {}", err))?;
@ -402,7 +442,7 @@ impl WorkerTask {
}
/// Request abort
pub fn request_abort(self) {
pub fn request_abort(&self) {
self.abort_requested.store(true, Ordering::SeqCst);
}