From 824b5ee4eef2eadc205b746848f6b37c7d19d614 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Tue, 30 Apr 2019 08:40:13 +0200 Subject: [PATCH] src/tools/broadcast_future.rs: cleanup, decompose into two classes In order to make it more usable. --- src/tools/broadcast_future.rs | 92 ++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 39 deletions(-) diff --git a/src/tools/broadcast_future.rs b/src/tools/broadcast_future.rs index 87e19f0f..cca7e4d5 100644 --- a/src/tools/broadcast_future.rs +++ b/src/tools/broadcast_future.rs @@ -4,15 +4,57 @@ use std::sync::{Mutex, Arc}; use futures::*; use tokio::sync::oneshot; -struct BroadcastData { +/// Broadcast results to registered listeners using asnyc oneshot channels +pub struct BroadcastData { result: Option>, listeners: Vec>>, - source: Option + Send >>, +} + +impl BroadcastData { + + pub fn new() -> Self { + Self { + result: None, + listeners: vec![], + } + } + + pub fn notify_listeners(&mut self, result: Result) { + + self.result = Some(result.clone()); + + loop { + match self.listeners.pop() { + None => { break; }, + Some(ch) => { + match &result { + Ok(result) => { let _ = ch.send(Ok(result.clone())); }, + Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); }, + } + }, + } + } + } + + pub fn listen(&mut self) -> impl Future { + + match &self.result { + None => {}, + Some(Ok(result)) => return futures::future::Either::A(futures::future::ok(result.clone())), + Some(Err(err)) => return futures::future::Either::A(futures::future::err(format_err!("{}", err))), + } + + let (tx, rx) = oneshot::channel::>(); + + self.listeners.push(tx); + + futures::future::Either::B(rx.flatten()) + } } /// Broadcast future results to registered listeners pub struct BroadcastFuture { - inner: Arc>>, + inner: Arc, Option + Send>>)>>, } impl BroadcastFuture { @@ -21,12 +63,7 @@ impl BroadcastFuture { /// /// The result of the future is sent to all registered listeners. pub fn new(source: Box + Send>) -> Self { - let data = BroadcastData { - result: None, - listeners: vec![], - source: Some(source), - }; - Self { inner: Arc::new(Mutex::new(data)) } + Self { inner: Arc::new(Mutex::new((BroadcastData::new(), Some(source)))) } } /// Creates a new instance with a oneshot channel as trigger @@ -39,53 +76,30 @@ impl BroadcastFuture { (Self::new(test), tx) } - fn update(inner: Arc>>, result: Result) { + fn notify_listeners(inner: Arc, Option + Send>>)>>, result: Result) { let mut data = inner.lock().unwrap(); - - data.result = Some(result.clone()); - - loop { - match data.listeners.pop() { - None => { break; }, - Some(ch) => { - match &result { - Ok(result) => { let _ = ch.send(Ok(result.clone())); }, - Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); }, - } - }, - } - } + data.0.notify_listeners(result); } - fn spawn(inner: Arc>>) -> impl Future { + fn spawn(inner: Arc, Option + Send>>)>>) -> impl Future { let mut data = inner.lock().unwrap(); - match &data.result { - None => {}, - Some(Ok(result)) => return futures::future::Either::A(futures::future::ok(result.clone())), - Some(Err(err)) => return futures::future::Either::A(futures::future::err(format_err!("{}", err))), - } - - let (tx, rx) = oneshot::channel::>(); - - data.listeners.push(tx); - - if let Some(source) = data.source.take() { + if let Some(source) = data.1.take() { let inner1 = inner.clone(); let task = source.then(move |value| { match value { - Ok(value) => Self::update(inner1, Ok(value.clone())), - Err(err) => Self::update(inner1, Err(err.to_string())), + Ok(value) => Self::notify_listeners(inner1, Ok(value.clone())), + Err(err) => Self::notify_listeners(inner1, Err(err.to_string())), } Ok(()) }); tokio::spawn(task); } - futures::future::Either::B(rx.map_err(Error::from).and_then(|result| { result })) + data.0.listen() } /// Register a listener