From dec00364b315f5e20ab5f59a6c6b971bc2c809c8 Mon Sep 17 00:00:00 2001 From: Stefan Reiter Date: Thu, 1 Oct 2020 11:38:42 +0200 Subject: [PATCH] ParallelHandler: check for errors during thread join Fix a potential bug where errors that happen after the SendHandle has been dropped while doing the thread join might have been ignored. Requires internal check_abort to be moved out of 'impl SendHandle' since we only have the Mutex left, not the SendHandle. Signed-off-by: Stefan Reiter --- src/tools/parallel_handler.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs index f1d9adec..185ac2fc 100644 --- a/src/tools/parallel_handler.rs +++ b/src/tools/parallel_handler.rs @@ -10,19 +10,19 @@ pub struct SendHandle { abort: Arc>>, } -impl SendHandle { - /// Returns the first error happened, if any - pub fn check_abort(&self) -> Result<(), Error> { - let guard = self.abort.lock().unwrap(); - if let Some(err_msg) = &*guard { - return Err(format_err!("{}", err_msg)); - } - Ok(()) +/// Returns the first error happened, if any +pub fn check_abort(abort: Arc>>) -> Result<(), Error> { + let guard = abort.lock().unwrap(); + if let Some(err_msg) = &*guard { + return Err(format_err!("{}", err_msg)); } + Ok(()) +} +impl SendHandle { /// Send data to the worker threads pub fn send(&self, input: I) -> Result<(), Error> { - self.check_abort()?; + check_abort(Arc::clone(&self.abort))?; match self.input.send(input) { Ok(()) => Ok(()), Err(_) => bail!("send failed - channel closed"), @@ -121,12 +121,16 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { /// Wait for worker threads to complete and check for errors pub fn complete(mut self) -> Result<(), Error> { - self.input.as_ref().unwrap().check_abort()?; - drop(self.input.take()); + let input = self.input.take().unwrap(); + let abort = Arc::clone(&input.abort); + check_abort(Arc::clone(&abort))?; + drop(input); let msg_list = self.join_threads(); if msg_list.is_empty() { + // an error might be encountered while waiting for the join + check_abort(abort)?; return Ok(()); } Err(format_err!("{}", msg_list.join("\n")))