From 882594c5e955dd5f721d055020b8dfb56b20b639 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sat, 6 Apr 2019 09:17:25 +0200 Subject: [PATCH] src/server.rs: improve crate layout --- src/lib.rs | 11 +---------- src/server.rs | 15 +++++++++++++++ src/server/environment.rs | 1 + src/server/worker_task.rs | 33 ++++++++++++++++++++++++++++++++- 4 files changed, 49 insertions(+), 11 deletions(-) create mode 100644 src/server.rs diff --git a/src/lib.rs b/src/lib.rs index 56accfe1..4c5e94b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,16 +8,7 @@ pub mod tools; pub mod api_schema; #[macro_use] -pub mod server { - - pub mod environment; - mod worker_task; - pub use worker_task::*; - pub mod formatter; - #[macro_use] - pub mod rest; - -} +pub mod server; pub mod pxar; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 00000000..1c91042b --- /dev/null +++ b/src/server.rs @@ -0,0 +1,15 @@ +//! Proxmox Server/Service framework +//! +//! This code provides basic primitives to build our REST API +//! services. We want async IO, so this is built on top of +//! tokio/hyper. + +mod environment; +pub use environment::*; + +mod worker_task; +pub use worker_task::*; +pub mod formatter; +#[macro_use] +pub mod rest; + diff --git a/src/server/environment.rs b/src/server/environment.rs index 36bbcd9a..b002b24f 100644 --- a/src/server/environment.rs +++ b/src/server/environment.rs @@ -3,6 +3,7 @@ use crate::api_schema::router::*; use std::collections::HashMap; use serde_json::Value; +/// Encapsulates information about the runtime environment pub struct RestEnvironment { env_type: RpcEnvironmentType, result_attributes: HashMap, diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 04180274..ea309d3f 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -24,15 +24,33 @@ lazy_static! { static WORKER_TASK_NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; +/// Unique Process/Task Identifier +/// +/// We use this to uniquely identify worker task. UPIDs have a short +/// string repesentaion, which gives additional information about the +/// type of the task. for example: +/// +/// UPID:elsa:00004F37:0039E469:00000000:5CA78B83:garbage_collection::root@pam: +/// +/// Please note that we use tokio, so a single thread can run multiple +/// tasks. #[derive(Debug, Clone)] pub struct UPID { + /// The Unix PID pub pid: libc::pid_t, + /// The Unix process start time from `/proc/pid/stat` pub pstart: u64, + /// The task start time (Epoch) pub starttime: i64, + /// The task ID (inside the process/thread) pub task_id: usize, + /// Worker type (arbitrary ASCII string) pub worker_type: String, + /// Worker ID (arbitrary ASCII string) pub worker_id: Option, + /// The user who started the task pub username: String, + /// The node name. pub node: String, } @@ -82,6 +100,7 @@ impl std::fmt::Display for UPID { } } + #[derive(Debug)] pub struct WorkerTaskInfo { upid: UPID, @@ -290,6 +309,12 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> { } +/// Launch long running worker tasks. +/// +/// A worker task can either be a whole thread, or a simply tokio +/// task/future. Each task can `log()` messages, which are stored +/// persistently to files. Task should poll the `abort_requested` +/// flag, and stop execution when requested. #[derive(Debug)] pub struct WorkerTask { upid: UPID, @@ -364,6 +389,7 @@ impl WorkerTask { Ok(worker) } + /// Spawn a new tokio task/future. pub fn spawn(worker_type: &str, worker_id: Option, username: &str, to_stdout: bool, f: F) -> Result<(), Error> where F: Send + 'static + FnOnce(Arc) -> T, T: Send + 'static + Future, @@ -381,6 +407,7 @@ impl WorkerTask { Ok(()) } + /// Create a new worker thread. pub fn new_thread(worker_type: &str, worker_id: Option, username: &str, to_stdout: bool, f: F) -> Result<(), Error> where F: Send + 'static + FnOnce(Arc) -> Result<(), Error> { @@ -412,11 +439,13 @@ impl WorkerTask { } } + /// Log a message. pub fn log>(&self, msg: S) { let mut data = self.data.lock().unwrap(); data.logger.log(msg); } + /// Set progress indicator pub fn progress(&self, progress: f64) { if progress >= 0.0 && progress <= 1.0 { let mut data = self.data.lock().unwrap(); @@ -426,15 +455,17 @@ impl WorkerTask { } } - // request_abort + /// Request abort pub fn request_abort(self) { self.abort_requested.store(true, Ordering::SeqCst); } + /// Test if abort was requested. pub fn abort_requested(&self) -> bool { self.abort_requested.load(Ordering::SeqCst) } + /// Fail if abort was requested. pub fn fail_on_abort(&self) -> Result<(), Error> { if self.abort_requested() { bail!("task '{}': abort requested - aborting task", self.upid);