From 75bc49bed70333e5856b4b520b1420fdf2519c1f Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Mon, 15 Apr 2019 09:38:05 +0200 Subject: [PATCH] src/server/worker_task.rs: implement abort channel --- src/api2/admin/datastore/pxar.rs | 13 ++++++++++++- src/server/worker_task.rs | 25 +++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/api2/admin/datastore/pxar.rs b/src/api2/admin/datastore/pxar.rs index a44f75f5..051c55e5 100644 --- a/src/api2/admin/datastore/pxar.rs +++ b/src/api2/admin/datastore/pxar.rs @@ -93,10 +93,21 @@ fn upload_pxar( let upload = UploadPxar { stream: req_body, index, count: 0}; let worker = server::WorkerTask::new("upload", Some(worker_id), &rpcenv.get_user().unwrap(), false)?; + let worker1 = worker.clone(); + let abort_future = worker.abort_future(); let resp = upload + .select(abort_future.map_err(|_| {}) + .then(move |_| { + worker1.log("aborting task..."); + bail!("task aborted"); + }) + ) .then(move |result| { - worker.log_result(result); + match result { + Ok((result,_)) => worker.log_result(Ok(result)), + Err((err, _)) => worker.log_result(Err(err)), + } Ok(()) }) .and_then(|_| { diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index 11aeba3f..a4a84e01 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -341,6 +341,7 @@ impl std::fmt::Display for WorkerTask { struct WorkerTaskData { logger: FileLogger, progress: f64, // 0..1 + pub abort_listeners: Vec>, } impl Drop for WorkerTask { @@ -383,6 +384,7 @@ impl WorkerTask { data: Mutex::new(WorkerTaskData { logger, progress: 0.0, + abort_listeners: vec![], }), }); @@ -492,6 +494,16 @@ impl WorkerTask { pub fn request_abort(&self) { eprintln!("set abort flag for worker {}", self.upid); self.abort_requested.store(true, Ordering::SeqCst); + // noitify listeners + let mut data = self.data.lock().unwrap(); + loop { + match data.abort_listeners.pop() { + None => { break; }, + Some(ch) => { + let _ = ch.send(()); // ignore erros here + }, + } + } } /// Test if abort was requested. @@ -506,4 +518,17 @@ impl WorkerTask { } Ok(()) } + + /// Get a future which resolves on task abort + pub fn abort_future(&self) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel::<()>(); + + let mut data = self.data.lock().unwrap(); + if self.abort_requested() { + let _ = tx.send(()); + } else { + data.abort_listeners.push(tx); + } + rx + } }