diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs index f1d9adec..185ac2fc 100644 --- a/src/tools/parallel_handler.rs +++ b/src/tools/parallel_handler.rs @@ -10,19 +10,19 @@ pub struct SendHandle { abort: Arc>>, } -impl SendHandle { - /// Returns the first error happened, if any - pub fn check_abort(&self) -> Result<(), Error> { - let guard = self.abort.lock().unwrap(); - if let Some(err_msg) = &*guard { - return Err(format_err!("{}", err_msg)); - } - Ok(()) +/// Returns the first error happened, if any +pub fn check_abort(abort: Arc>>) -> Result<(), Error> { + let guard = abort.lock().unwrap(); + if let Some(err_msg) = &*guard { + return Err(format_err!("{}", err_msg)); } + Ok(()) +} +impl SendHandle { /// Send data to the worker threads pub fn send(&self, input: I) -> Result<(), Error> { - self.check_abort()?; + check_abort(Arc::clone(&self.abort))?; match self.input.send(input) { Ok(()) => Ok(()), Err(_) => bail!("send failed - channel closed"), @@ -121,12 +121,16 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { /// Wait for worker threads to complete and check for errors pub fn complete(mut self) -> Result<(), Error> { - self.input.as_ref().unwrap().check_abort()?; - drop(self.input.take()); + let input = self.input.take().unwrap(); + let abort = Arc::clone(&input.abort); + check_abort(Arc::clone(&abort))?; + drop(input); let msg_list = self.join_threads(); if msg_list.is_empty() { + // an error might be encountered while waiting for the join + check_abort(abort)?; return Ok(()); } Err(format_err!("{}", msg_list.join("\n")))