parallel_handler: formatting cleanup, doc comment typo fixup

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2020-09-28 09:43:03 +02:00
parent b56c111e93
commit ae3cfa8f0d
1 changed files with 25 additions and 29 deletions

View File

@ -1,16 +1,16 @@
use std::thread::{JoinHandle};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crossbeam_channel::{bounded, Sender}; use std::thread::JoinHandle;
use anyhow::{bail, format_err, Error};
/// A handle to send data toö the worker thread (implements clone) use anyhow::{bail, format_err, Error};
use crossbeam_channel::{bounded, Sender};
/// A handle to send data to the worker thread (implements clone)
pub struct SendHandle<I> { pub struct SendHandle<I> {
input: Sender<I>, input: Sender<I>,
abort: Arc<Mutex<Option<String>>>, abort: Arc<Mutex<Option<String>>>,
} }
impl <I: Send> SendHandle<I> { impl<I: Send> SendHandle<I> {
/// Returns the first error happened, if any /// Returns the first error happened, if any
pub fn check_abort(&self) -> Result<(), Error> { pub fn check_abort(&self) -> Result<(), Error> {
let guard = self.abort.lock().unwrap(); let guard = self.abort.lock().unwrap();
@ -45,21 +45,19 @@ pub struct ParallelHandler<'a, I> {
_marker: std::marker::PhantomData<&'a ()>, _marker: std::marker::PhantomData<&'a ()>,
} }
impl <I> Clone for SendHandle<I> { impl<I> Clone for SendHandle<I> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { input: self.input.clone(), abort: self.abort.clone() } Self {
input: self.input.clone(),
abort: self.abort.clone(),
}
} }
} }
impl <'a, I: Send + 'static> ParallelHandler<'a, I> { impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
/// Create a new thread pool, each thread processing incoming data /// Create a new thread pool, each thread processing incoming data
/// with 'handler_fn'. /// with 'handler_fn'.
pub fn new<F>( pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
name: &str,
threads: usize,
handler_fn: F,
) -> Self
where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a, where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
{ {
let mut handles = Vec::new(); let mut handles = Vec::new();
@ -81,19 +79,17 @@ impl <'a, I: Send + 'static> ParallelHandler<'a, I> {
handles.push( handles.push(
std::thread::Builder::new() std::thread::Builder::new()
.name(format!("{} ({})", name, i)) .name(format!("{} ({})", name, i))
.spawn(move || { .spawn(move || loop {
loop { let data = match input_rx.recv() {
let data = match input_rx.recv() { Ok(data) => data,
Ok(data) => data, Err(_) => return,
Err(_) => return, };
}; match (handler_fn)(data) {
match (handler_fn)(data) { Ok(()) => (),
Ok(()) => {}, Err(err) => {
Err(err) => { let mut guard = abort.lock().unwrap();
let mut guard = abort.lock().unwrap(); if guard.is_none() {
if guard.is_none() { *guard = Some(err.to_string());
*guard = Some(err.to_string());
}
} }
} }
} }
@ -163,7 +159,7 @@ impl <'a, I: Send + 'static> ParallelHandler<'a, I> {
} }
// Note: We make sure that all threads will be joined // Note: We make sure that all threads will be joined
impl <'a, I> Drop for ParallelHandler<'a, I> { impl<'a, I> Drop for ParallelHandler<'a, I> {
fn drop(&mut self) { fn drop(&mut self) {
drop(self.input.take()); drop(self.input.take());
loop { loop {