src/server/worker_task.rs: implement abort channel

This commit is contained in:
Dietmar Maurer 2019-04-15 09:38:05 +02:00
parent 912524752e
commit 75bc49bed7
2 changed files with 37 additions and 1 deletions

View File

@ -93,10 +93,21 @@ fn upload_pxar(
let upload = UploadPxar { stream: req_body, index, count: 0}; let upload = UploadPxar { stream: req_body, index, count: 0};
let worker = server::WorkerTask::new("upload", Some(worker_id), &rpcenv.get_user().unwrap(), false)?; let worker = server::WorkerTask::new("upload", Some(worker_id), &rpcenv.get_user().unwrap(), false)?;
let worker1 = worker.clone();
let abort_future = worker.abort_future();
let resp = upload let resp = upload
.select(abort_future.map_err(|_| {})
.then(move |_| {
worker1.log("aborting task...");
bail!("task aborted");
})
)
.then(move |result| { .then(move |result| {
worker.log_result(result); match result {
Ok((result,_)) => worker.log_result(Ok(result)),
Err((err, _)) => worker.log_result(Err(err)),
}
Ok(()) Ok(())
}) })
.and_then(|_| { .and_then(|_| {

View File

@ -341,6 +341,7 @@ impl std::fmt::Display for WorkerTask {
struct WorkerTaskData { struct WorkerTaskData {
logger: FileLogger, logger: FileLogger,
progress: f64, // 0..1 progress: f64, // 0..1
pub abort_listeners: Vec<oneshot::Sender<()>>,
} }
impl Drop for WorkerTask { impl Drop for WorkerTask {
@ -383,6 +384,7 @@ impl WorkerTask {
data: Mutex::new(WorkerTaskData { data: Mutex::new(WorkerTaskData {
logger, logger,
progress: 0.0, progress: 0.0,
abort_listeners: vec![],
}), }),
}); });
@ -492,6 +494,16 @@ impl WorkerTask {
pub fn request_abort(&self) { pub fn request_abort(&self) {
eprintln!("set abort flag for worker {}", self.upid); eprintln!("set abort flag for worker {}", self.upid);
self.abort_requested.store(true, Ordering::SeqCst); self.abort_requested.store(true, Ordering::SeqCst);
// noitify listeners
let mut data = self.data.lock().unwrap();
loop {
match data.abort_listeners.pop() {
None => { break; },
Some(ch) => {
let _ = ch.send(()); // ignore erros here
},
}
}
} }
/// Test if abort was requested. /// Test if abort was requested.
@ -506,4 +518,17 @@ impl WorkerTask {
} }
Ok(()) Ok(())
} }
/// Get a future which resolves on task abort
pub fn abort_future(&self) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel::<()>();
let mut data = self.data.lock().unwrap();
if self.abort_requested() {
let _ = tx.send(());
} else {
data.abort_listeners.push(tx);
}
rx
}
} }