diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs index 185ac2fc..36a7b859 100644 --- a/src/tools/parallel_handler.rs +++ b/src/tools/parallel_handler.rs @@ -11,7 +11,7 @@ pub struct SendHandle { } /// Returns the first error happened, if any -pub fn check_abort(abort: Arc>>) -> Result<(), Error> { +pub fn check_abort(abort: &Mutex>) -> Result<(), Error> { let guard = abort.lock().unwrap(); if let Some(err_msg) = &*guard { return Err(format_err!("{}", err_msg)); @@ -22,7 +22,7 @@ pub fn check_abort(abort: Arc>>) -> Result<(), Error> { impl SendHandle { /// Send data to the worker threads pub fn send(&self, input: I) -> Result<(), Error> { - check_abort(Arc::clone(&self.abort))?; + check_abort(&self.abort)?; match self.input.send(input) { Ok(()) => Ok(()), Err(_) => bail!("send failed - channel closed"), @@ -123,14 +123,15 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { pub fn complete(mut self) -> Result<(), Error> { let input = self.input.take().unwrap(); let abort = Arc::clone(&input.abort); - check_abort(Arc::clone(&abort))?; + check_abort(&abort)?; drop(input); let msg_list = self.join_threads(); + // an error might be encountered while waiting for the join + check_abort(&abort)?; + if msg_list.is_empty() { - // an error might be encountered while waiting for the join - check_abort(abort)?; return Ok(()); } Err(format_err!("{}", msg_list.join("\n")))