src/server.rs: improve crate layout

This commit is contained in:
Dietmar Maurer 2019-04-06 09:17:25 +02:00
parent 4b01c983f0
commit 882594c5e9
4 changed files with 49 additions and 11 deletions

View File

@ -8,16 +8,7 @@ pub mod tools;
pub mod api_schema; pub mod api_schema;
#[macro_use] #[macro_use]
pub mod server { pub mod server;
pub mod environment;
mod worker_task;
pub use worker_task::*;
pub mod formatter;
#[macro_use]
pub mod rest;
}
pub mod pxar; pub mod pxar;

15
src/server.rs Normal file
View File

@ -0,0 +1,15 @@
//! Proxmox Server/Service framework
//!
//! This code provides basic primitives to build our REST API
//! services. We want async IO, so this is built on top of
//! tokio/hyper.
mod environment;
pub use environment::*;
mod worker_task;
pub use worker_task::*;
pub mod formatter;
#[macro_use]
pub mod rest;

View File

@ -3,6 +3,7 @@ use crate::api_schema::router::*;
use std::collections::HashMap; use std::collections::HashMap;
use serde_json::Value; use serde_json::Value;
/// Encapsulates information about the runtime environment
pub struct RestEnvironment { pub struct RestEnvironment {
env_type: RpcEnvironmentType, env_type: RpcEnvironmentType,
result_attributes: HashMap<String, Value>, result_attributes: HashMap<String, Value>,

View File

@ -24,15 +24,33 @@ lazy_static! {
static WORKER_TASK_NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; static WORKER_TASK_NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
/// Unique Process/Task Identifier
///
/// We use this to uniquely identify worker task. UPIDs have a short
/// string repesentaion, which gives additional information about the
/// type of the task. for example:
///
/// UPID:elsa:00004F37:0039E469:00000000:5CA78B83:garbage_collection::root@pam:
///
/// Please note that we use tokio, so a single thread can run multiple
/// tasks.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct UPID { pub struct UPID {
/// The Unix PID
pub pid: libc::pid_t, pub pid: libc::pid_t,
/// The Unix process start time from `/proc/pid/stat`
pub pstart: u64, pub pstart: u64,
/// The task start time (Epoch)
pub starttime: i64, pub starttime: i64,
/// The task ID (inside the process/thread)
pub task_id: usize, pub task_id: usize,
/// Worker type (arbitrary ASCII string)
pub worker_type: String, pub worker_type: String,
/// Worker ID (arbitrary ASCII string)
pub worker_id: Option<String>, pub worker_id: Option<String>,
/// The user who started the task
pub username: String, pub username: String,
/// The node name.
pub node: String, pub node: String,
} }
@ -82,6 +100,7 @@ impl std::fmt::Display for UPID {
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct WorkerTaskInfo { pub struct WorkerTaskInfo {
upid: UPID, upid: UPID,
@ -290,6 +309,12 @@ fn update_active_workers(new_upid: Option<&UPID>) -> Result<(), Error> {
} }
/// Launch long running worker tasks.
///
/// A worker task can either be a whole thread, or a simply tokio
/// task/future. Each task can `log()` messages, which are stored
/// persistently to files. Task should poll the `abort_requested`
/// flag, and stop execution when requested.
#[derive(Debug)] #[derive(Debug)]
pub struct WorkerTask { pub struct WorkerTask {
upid: UPID, upid: UPID,
@ -364,6 +389,7 @@ impl WorkerTask {
Ok(worker) Ok(worker)
} }
/// Spawn a new tokio task/future.
pub fn spawn<F, T>(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool, f: F) -> Result<(), Error> pub fn spawn<F, T>(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool, f: F) -> Result<(), Error>
where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T,
T: Send + 'static + Future<Item=(), Error=Error>, T: Send + 'static + Future<Item=(), Error=Error>,
@ -381,6 +407,7 @@ impl WorkerTask {
Ok(()) Ok(())
} }
/// Create a new worker thread.
pub fn new_thread<F>(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool, f: F) -> Result<(), Error> pub fn new_thread<F>(worker_type: &str, worker_id: Option<String>, username: &str, to_stdout: bool, f: F) -> Result<(), Error>
where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error> where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> Result<(), Error>
{ {
@ -412,11 +439,13 @@ impl WorkerTask {
} }
} }
/// Log a message.
pub fn log<S: AsRef<str>>(&self, msg: S) { pub fn log<S: AsRef<str>>(&self, msg: S) {
let mut data = self.data.lock().unwrap(); let mut data = self.data.lock().unwrap();
data.logger.log(msg); data.logger.log(msg);
} }
/// Set progress indicator
pub fn progress(&self, progress: f64) { pub fn progress(&self, progress: f64) {
if progress >= 0.0 && progress <= 1.0 { if progress >= 0.0 && progress <= 1.0 {
let mut data = self.data.lock().unwrap(); let mut data = self.data.lock().unwrap();
@ -426,15 +455,17 @@ impl WorkerTask {
} }
} }
// request_abort /// Request abort
pub fn request_abort(self) { pub fn request_abort(self) {
self.abort_requested.store(true, Ordering::SeqCst); self.abort_requested.store(true, Ordering::SeqCst);
} }
/// Test if abort was requested.
pub fn abort_requested(&self) -> bool { pub fn abort_requested(&self) -> bool {
self.abort_requested.load(Ordering::SeqCst) self.abort_requested.load(Ordering::SeqCst)
} }
/// Fail if abort was requested.
pub fn fail_on_abort(&self) -> Result<(), Error> { pub fn fail_on_abort(&self) -> Result<(), Error> {
if self.abort_requested() { if self.abort_requested() {
bail!("task '{}': abort requested - aborting task", self.upid); bail!("task '{}': abort requested - aborting task", self.upid);