From 490be29ed6d4bb4914e1c8fd8ce0440cfd089f8a Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 26 Apr 2019 17:56:41 +0200 Subject: [PATCH] src/tools/broadcast_future.rs: new helper class --- src/tools.rs | 3 ++ src/tools/broadcast_future.rs | 86 +++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 src/tools/broadcast_future.rs diff --git a/src/tools.rs b/src/tools.rs index 43a9bfe8..5e0a6c01 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -41,6 +41,9 @@ pub use process_locker::*; mod file_logger; pub use file_logger::*; +mod broadcast_future; +pub use broadcast_future::*; + /// Macro to write error-handling blocks (like perl eval {}) /// /// #### Example: diff --git a/src/tools/broadcast_future.rs b/src/tools/broadcast_future.rs new file mode 100644 index 00000000..917c343e --- /dev/null +++ b/src/tools/broadcast_future.rs @@ -0,0 +1,86 @@ +use failure::*; +use std::sync::{Mutex, Arc}; + +use futures::*; +use tokio::sync::oneshot; + +struct BroadcastData { + result: Option>, + listeners: Vec>>, + source: Option + Send >>, +} + +/// Broadcast future results to registered listeners +pub struct BroadcastFuture { + inner: Arc>>, +} + +impl BroadcastFuture { + + /// Create instance for specified source future. + /// + /// The result of the future is sent to all registered listeners. + pub fn new(source: Box + Send>) -> Self { + let data = BroadcastData { + result: None, + listeners: vec![], + source: Some(source), + }; + Self { inner: Arc::new(Mutex::new(data)) } + } + + fn update(inner: Arc>>, result: Result) { + let mut data = inner.lock().unwrap(); + + data.result = Some(result.clone()); + + loop { + match data.listeners.pop() { + None => { break; }, + Some(ch) => { + match &result { + Ok(result) => { let _ = ch.send(Ok(result.clone())); }, + Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); }, + } + }, + } + } + } + + fn spawn(inner: Arc>>) -> impl Future { + + let mut data = inner.lock().unwrap(); + + match &data.result { + None => {}, + Some(Ok(result)) => return futures::future::Either::A(futures::future::ok(result.clone())), + Some(Err(err)) => return futures::future::Either::A(futures::future::err(format_err!("{}", err))), + } + + let (tx, rx) = oneshot::channel::>(); + + data.listeners.push(tx); + + if let Some(source) = data.source.take() { + + let inner1 = inner.clone(); + + let task = source.then(move |value| { + match value { + Ok(value) => Self::update(inner1, Ok(value.clone())), + Err(err) => Self::update(inner1, Err(err.to_string())), + } + Ok(()) + }); + tokio::spawn(task); + } + + futures::future::Either::B(rx.map_err(Error::from).and_then(|result| { result })) + } + + /// Register a listener + pub fn listen(&self) -> impl Future { + let inner2 = self.inner.clone(); + futures::future::lazy(move || { Self::spawn(inner2) }) + } +}