From 321070b4fa1c052b5afc2046fc939bcc259cb07f Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Wed, 10 Apr 2019 12:42:24 +0200 Subject: [PATCH] src/server/worker_task.rs: implement abort_worker (via command_socket) --- src/server/command_socket.rs | 49 ++++++++++++++++++++++++++++++++++++ src/server/worker_task.rs | 30 ++++++++++++++++++++-- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs index 92a1322c..57a54a3a 100644 --- a/src/server/command_socket.rs +++ b/src/server/command_socket.rs @@ -70,3 +70,52 @@ pub fn create_control_socket(path: P, f: F) -> Result( + path: P, + params: Value +) -> impl Future + where P: Into, + +{ + let path: PathBuf = path.into(); + + tokio::net::UnixStream::connect(path) + .map_err(move |err| format_err!("control socket connect failed - {}", err)) + .and_then(move |conn| { + + let (rx, tx) = conn.split(); + + let mut command_string = params.to_string(); + command_string.push('\n'); + + tokio::io::write_all(tx, command_string) + .and_then(|(tx,_)| tokio::io::shutdown(tx)) + .map_err(|err| format_err!("control socket write error - {}", err)) + .and_then(move |_| { + tokio::io::lines(std::io::BufReader::new(rx)) + .into_future() + .then(|test| { + match test { + Ok((Some(data), _)) => { + if data.starts_with("OK: ") { + match data[4..].parse::() { + Ok(v) => Ok(v), + Err(err) => bail!("unable to parse json response - {}", err), + } + } else if data.starts_with("ERROR: ") { + bail!("{}", &data[7..]); + } else { + bail!("unable to parse response: {}", data); + } + } + Ok((None, _)) => { + bail!("no response"); + } + Err((err, _)) => Err(Error::from(err)), + } + }) + }) + }) +} diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index fd8a1440..6140a463 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::io::{BufRead, BufReader}; use std::fs::File; -use serde_json::Value; +use serde_json::{json, Value}; use super::UPID; @@ -57,7 +57,7 @@ pub fn create_task_control_socket() -> Result<(), Error> { let control_future = super::create_control_socket(socketname, |param| { let param = param.as_object() .ok_or(format_err!("unable to parse parameters (expected json object)"))?; - if param.keys().count() != 2 { bail!("worng number of parameters"); } + if param.keys().count() != 2 { bail!("wrong number of parameters"); } let command = param.get("command") .ok_or(format_err!("unable to parse parameters (missing command)"))?; @@ -88,6 +88,32 @@ pub fn create_task_control_socket() -> Result<(), Error> { Ok(()) } +pub fn abort_worker_async(upid: UPID) { + let task = abort_worker(upid); + + tokio::spawn(task.then(|res| { + if let Err(err) = res { + eprintln!("abort worker failed - {}", err); + } + Ok(()) + })); +} + +pub fn abort_worker(upid: UPID) -> impl Future { + + let target_pid = upid.pid; + + let socketname = format!( + "\0{}/proxmox-task-control-{}.sock", PROXMOX_BACKUP_VAR_RUN_DIR, target_pid); + + let cmd = json!({ + "command": "abort-task", + "upid": upid.to_string(), + }); + + super::send_command(socketname, cmd).map(|_| {}) +} + fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option<(i64, String)>), Error> { let data = line.splitn(3, ' ').collect::>();