src/api2/node/tasks.rs: start upid API

This commit is contained in:
Dietmar Maurer 2019-04-07 14:36:57 +02:00
parent b75b968147
commit 5a12c0e2fb

View File

@ -1,11 +1,85 @@
use failure::*;
//use crate::tools;
use crate::tools;
use crate::api_schema::*;
use crate::api_schema::router::*;
use serde_json::{json, Value};
use std::sync::Arc;
use std::fs::File;
use std::io::{BufRead,BufReader};
use crate::server;
use crate::server::{self, UPID};
fn get_task_status(
param: Value,
_info: &ApiMethod,
_rpcenv: &mut RpcEnvironment,
) -> Result<Value, Error> {
let upid = extract_upid(&param)?;
let result = if upid.is_active() {
json!({
"status": "running",
})
} else {
json!({
"status": "running",
})
};
Ok(result)
}
fn extract_upid(param: &Value) -> Result<UPID, Error> {
let upid_str = tools::required_string_param(&param, "upid")?;
let upid = match upid_str.parse::<UPID>() {
Ok(v) => v,
Err(err) => bail!("unable to parse UPID '{}' - {}", upid_str, err),
};
Ok(upid)
}
fn read_task_log(
param: Value,
_info: &ApiMethod,
_rpcenv: &mut RpcEnvironment,
) -> Result<Value, Error> {
let upid = extract_upid(&param)?;
let start = param["start"].as_u64().unwrap_or(0);
let mut limit = param["limit"].as_u64().unwrap_or(50);
let mut count: u64 = 0;
let path = upid.log_path();
let file = File::open(path)?;
let mut lines: Vec<Value> = vec![];
for line in BufReader::new(file).lines() {
match line {
Ok(line) => {
count += 1;
if count < start { continue };
if limit <= 0 { continue };
lines.push(json!({ "n": count, "t": line }));
limit -= 1;
}
Err(err) => {
log::error!("reading task log failed: {}", err);
break;
}
}
}
Ok(json!(lines))
}
fn list_tasks(
param: Value,
@ -67,6 +141,57 @@ fn list_tasks(
pub fn router() -> Router {
let upid_schema : Arc<Schema> = Arc::new(
StringSchema::new("Unique Process/Task ID.")
.max_length(256)
.into()
);
let upid_api = Router::new()
.get(ApiMethod::new(
|_,_,_| {
let mut result = vec![];
for cmd in &["log", "status"] {
result.push(json!({"subdir": cmd }));
}
Ok(Value::from(result))
},
ObjectSchema::new("Directory index.")
.required("upid", upid_schema.clone()))
)
.subdir(
"log", Router::new()
.get(
ApiMethod::new(
read_task_log,
ObjectSchema::new("Read task log.")
.required("upid", upid_schema.clone())
.optional(
"start",
IntegerSchema::new("Start at this line.")
.minimum(0)
.default(0)
)
.optional(
"limit",
IntegerSchema::new("Only list this amount of lines.")
.minimum(0)
.default(50)
)
)
)
)
.subdir(
"status", Router::new()
.get(
ApiMethod::new(
get_task_status,
ObjectSchema::new("Get task status.")
.required("upid", upid_schema.clone()))
)
);
let route = Router::new()
.get(ApiMethod::new(
list_tasks,
@ -92,7 +217,8 @@ pub fn router() -> Router {
StringSchema::new("Only list tasks from this user.")
)
)
);
)
.match_all("upid", upid_api);
route
}