diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs index bc807700..199bb97f 100644 --- a/src/tools/parallel_handler.rs +++ b/src/tools/parallel_handler.rs @@ -1,16 +1,16 @@ -use std::thread::{JoinHandle}; use std::sync::{Arc, Mutex}; -use crossbeam_channel::{bounded, Sender}; -use anyhow::{bail, format_err, Error}; +use std::thread::JoinHandle; -/// A handle to send data toƶ the worker thread (implements clone) +use anyhow::{bail, format_err, Error}; +use crossbeam_channel::{bounded, Sender}; + +/// A handle to send data to the worker thread (implements clone) pub struct SendHandle { input: Sender, abort: Arc>>, } -impl SendHandle { - +impl SendHandle { /// Returns the first error happened, if any pub fn check_abort(&self) -> Result<(), Error> { let guard = self.abort.lock().unwrap(); @@ -45,21 +45,19 @@ pub struct ParallelHandler<'a, I> { _marker: std::marker::PhantomData<&'a ()>, } -impl Clone for SendHandle { +impl Clone for SendHandle { fn clone(&self) -> Self { - Self { input: self.input.clone(), abort: self.abort.clone() } + Self { + input: self.input.clone(), + abort: self.abort.clone(), + } } } -impl <'a, I: Send + 'static> ParallelHandler<'a, I> { - +impl<'a, I: Send + 'static> ParallelHandler<'a, I> { /// Create a new thread pool, each thread processing incoming data /// with 'handler_fn'. - pub fn new( - name: &str, - threads: usize, - handler_fn: F, - ) -> Self + pub fn new(name: &str, threads: usize, handler_fn: F) -> Self where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a, { let mut handles = Vec::new(); @@ -81,19 +79,17 @@ impl <'a, I: Send + 'static> ParallelHandler<'a, I> { handles.push( std::thread::Builder::new() .name(format!("{} ({})", name, i)) - .spawn(move || { - loop { - let data = match input_rx.recv() { - Ok(data) => data, - Err(_) => return, - }; - match (handler_fn)(data) { - Ok(()) => {}, - Err(err) => { - let mut guard = abort.lock().unwrap(); - if guard.is_none() { - *guard = Some(err.to_string()); - } + .spawn(move || loop { + let data = match input_rx.recv() { + Ok(data) => data, + Err(_) => return, + }; + match (handler_fn)(data) { + Ok(()) => (), + Err(err) => { + let mut guard = abort.lock().unwrap(); + if guard.is_none() { + *guard = Some(err.to_string()); } } } @@ -163,7 +159,7 @@ impl <'a, I: Send + 'static> ParallelHandler<'a, I> { } // Note: We make sure that all threads will be joined -impl <'a, I> Drop for ParallelHandler<'a, I> { +impl<'a, I> Drop for ParallelHandler<'a, I> { fn drop(&mut self) { drop(self.input.take()); loop {