display_task_log: make it possible to abort tasks with CTRL-C
This commit is contained in:
parent
bdb6e6b83f
commit
52f7a73009
|
@ -1,22 +1,57 @@
|
||||||
|
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
|
||||||
|
|
||||||
use anyhow::{bail, Error};
|
use anyhow::{bail, Error};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
use tokio::signal::unix::{signal, SignalKind};
|
||||||
|
use futures::*;
|
||||||
|
|
||||||
use super::HttpClient;
|
use super::HttpClient;
|
||||||
use crate::tools;
|
use crate::tools;
|
||||||
|
|
||||||
|
/// Display task log on console
|
||||||
|
///
|
||||||
|
/// This polls the task API and prints the log to the console. It also
|
||||||
|
/// catches interrupt signals, and sends a abort request to the task if
|
||||||
|
/// the user presses CTRL-C. Two interrupts cause an immediate end of
|
||||||
|
/// the loop. The task may still run in that case.
|
||||||
pub async fn display_task_log(
|
pub async fn display_task_log(
|
||||||
client: HttpClient,
|
mut client: HttpClient,
|
||||||
upid_str: &str,
|
upid_str: &str,
|
||||||
strip_date: bool,
|
strip_date: bool,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let path = format!("api2/json/nodes/localhost/tasks/{}/log", tools::percent_encode_component(upid_str));
|
let mut signal_stream = signal(SignalKind::interrupt())?;
|
||||||
|
let abort_count = Arc::new(AtomicUsize::new(0));
|
||||||
|
let abort_count2 = Arc::clone(&abort_count);
|
||||||
|
|
||||||
|
let abort_future = async move {
|
||||||
|
while signal_stream.recv().await.is_some() {
|
||||||
|
println!("got shutdown request (SIGINT)");
|
||||||
|
let prev_count = abort_count2.fetch_add(1, Ordering::SeqCst);
|
||||||
|
if prev_count >= 1 {
|
||||||
|
println!("forced exit (task still running)");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
};
|
||||||
|
|
||||||
|
let request_future = async move {
|
||||||
|
|
||||||
let mut start = 1;
|
let mut start = 1;
|
||||||
let limit = 500;
|
let limit = 500;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
||||||
|
let abort = abort_count.load(Ordering::Relaxed);
|
||||||
|
if abort > 0 {
|
||||||
|
let path = format!("api2/json/nodes/localhost/tasks/{}", tools::percent_encode_component(upid_str));
|
||||||
|
let _ = client.delete(&path, None).await?;
|
||||||
|
}
|
||||||
|
|
||||||
let param = json!({ "start": start, "limit": limit, "test-status": true });
|
let param = json!({ "start": start, "limit": limit, "test-status": true });
|
||||||
|
|
||||||
|
let path = format!("api2/json/nodes/localhost/tasks/{}/log", tools::percent_encode_component(upid_str));
|
||||||
let result = client.get(&path, Some(param)).await?;
|
let result = client.get(&path, Some(param)).await?;
|
||||||
|
|
||||||
let active = result["active"].as_bool().unwrap();
|
let active = result["active"].as_bool().unwrap();
|
||||||
|
@ -40,7 +75,7 @@ pub async fn display_task_log(
|
||||||
|
|
||||||
if start > total {
|
if start > total {
|
||||||
if active {
|
if active {
|
||||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -49,5 +84,13 @@ pub async fn display_task_log(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
futures::select!{
|
||||||
|
request = request_future.fuse() => request?,
|
||||||
|
abort = abort_future.fuse() => abort?,
|
||||||
|
};
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue