ParallelHandler: check for errors during thread join

Fix a potential bug where errors that happen after the SendHandle has
been dropped while doing the thread join might have been ignored.
Requires internal check_abort to be moved out of 'impl SendHandle' since
we only have the Mutex left, not the SendHandle.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
This commit is contained in:
Stefan Reiter 2020-10-01 11:38:42 +02:00 committed by Dietmar Maurer
parent 5637087cc9
commit dec00364b3
1 changed files with 15 additions and 11 deletions

View File

@ -10,19 +10,19 @@ pub struct SendHandle<I> {
abort: Arc<Mutex<Option<String>>>, abort: Arc<Mutex<Option<String>>>,
} }
impl<I: Send> SendHandle<I> {
/// Returns the first error happened, if any /// Returns the first error happened, if any
pub fn check_abort(&self) -> Result<(), Error> { pub fn check_abort(abort: Arc<Mutex<Option<String>>>) -> Result<(), Error> {
let guard = self.abort.lock().unwrap(); let guard = abort.lock().unwrap();
if let Some(err_msg) = &*guard { if let Some(err_msg) = &*guard {
return Err(format_err!("{}", err_msg)); return Err(format_err!("{}", err_msg));
} }
Ok(()) Ok(())
} }
impl<I: Send> SendHandle<I> {
/// Send data to the worker threads /// Send data to the worker threads
pub fn send(&self, input: I) -> Result<(), Error> { pub fn send(&self, input: I) -> Result<(), Error> {
self.check_abort()?; check_abort(Arc::clone(&self.abort))?;
match self.input.send(input) { match self.input.send(input) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(_) => bail!("send failed - channel closed"), Err(_) => bail!("send failed - channel closed"),
@ -121,12 +121,16 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
/// Wait for worker threads to complete and check for errors /// Wait for worker threads to complete and check for errors
pub fn complete(mut self) -> Result<(), Error> { pub fn complete(mut self) -> Result<(), Error> {
self.input.as_ref().unwrap().check_abort()?; let input = self.input.take().unwrap();
drop(self.input.take()); let abort = Arc::clone(&input.abort);
check_abort(Arc::clone(&abort))?;
drop(input);
let msg_list = self.join_threads(); let msg_list = self.join_threads();
if msg_list.is_empty() { if msg_list.is_empty() {
// an error might be encountered while waiting for the join
check_abort(abort)?;
return Ok(()); return Ok(());
} }
Err(format_err!("{}", msg_list.join("\n"))) Err(format_err!("{}", msg_list.join("\n")))