broadcast_future: refactor broadcast/future binding

into its own, private struct.

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
Fabian Grünbichler 2021-01-25 14:42:47 +01:00 committed by Wolfgang Bumiller
parent 432fe44187
commit 905a570489

View File

@ -62,14 +62,16 @@ impl <T: Clone> BroadcastData<T> {
} }
} }
type SourceFuture<T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>;
struct BroadCastFutureBinding<T> {
broadcast: BroadcastData<T>,
future: Option<SourceFuture<T>>,
}
/// Broadcast future results to registered listeners /// Broadcast future results to registered listeners
pub struct BroadcastFuture<T> { pub struct BroadcastFuture<T> {
inner: Arc< inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
Mutex<(
BroadcastData<T>,
Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
)>,
>,
} }
impl<T: Clone + Send + 'static> BroadcastFuture<T> { impl<T: Clone + Send + 'static> BroadcastFuture<T> {
@ -77,7 +79,11 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
/// ///
/// The result of the future is sent to all registered listeners. /// The result of the future is sent to all registered listeners.
pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self { pub fn new(source: Box<dyn Future<Output = Result<T, Error>> + Send>) -> Self {
Self { inner: Arc::new(Mutex::new((BroadcastData::new(), Some(Pin::from(source))))) } let inner = BroadCastFutureBinding {
broadcast: BroadcastData::new(),
future: Some(Pin::from(source)),
};
Self { inner: Arc::new(Mutex::new(inner)) }
} }
/// Creates a new instance with a oneshot channel as trigger /// Creates a new instance with a oneshot channel as trigger
@ -92,29 +98,17 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
} }
fn notify_listeners( fn notify_listeners(
inner: Arc< inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
Mutex<(
BroadcastData<T>,
Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
)>,
>,
result: Result<T, String>, result: Result<T, String>,
) { ) {
let mut data = inner.lock().unwrap(); let mut data = inner.lock().unwrap();
data.0.notify_listeners(result); data.broadcast.notify_listeners(result);
} }
fn spawn( fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
inner: Arc<
Mutex<(
BroadcastData<T>,
Option<Pin<Box<dyn Future<Output = Result<T, Error>> + Send>>>,
)>,
>,
) -> impl Future<Output = Result<T, Error>> {
let mut data = inner.lock().unwrap(); let mut data = inner.lock().unwrap();
if let Some(source) = data.1.take() { if let Some(source) = data.future.take() {
let inner1 = inner.clone(); let inner1 = inner.clone();
@ -127,7 +121,7 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
tokio::spawn(task); tokio::spawn(task);
} }
data.0.listen() data.broadcast.listen()
} }
/// Register a listener /// Register a listener