diff --git a/src/tools/broadcast_future.rs b/src/tools/broadcast_future.rs index 924b6441..71118da8 100644 --- a/src/tools/broadcast_future.rs +++ b/src/tools/broadcast_future.rs @@ -1,7 +1,9 @@ -use failure::*; -use std::sync::{Mutex, Arc}; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; -use futures::*; +use failure::*; +use futures::future::{FutureExt, TryFutureExt}; use tokio::sync::oneshot; /// Broadcast results to registered listeners using asnyc oneshot channels @@ -36,65 +38,90 @@ impl BroadcastData { } } - pub fn listen(&mut self) -> impl Future { + pub fn listen(&mut self) -> impl Future> { + use futures::future::{ok, Either}; match &self.result { None => {}, - Some(Ok(result)) => return futures::future::Either::A(futures::future::ok(result.clone())), - Some(Err(err)) => return futures::future::Either::A(futures::future::err(format_err!("{}", err))), + Some(Ok(result)) => return Either::Left(ok(result.clone())), + Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))), } let (tx, rx) = oneshot::channel::>(); self.listeners.push(tx); - futures::future::Either::B(rx.flatten()) + Either::Right(rx + .map(|res| match res { + Ok(Ok(t)) => Ok(t), + Ok(Err(e)) => Err(e), + Err(e) => Err(Error::from(e)), + }) + ) } } /// Broadcast future results to registered listeners pub struct BroadcastFuture { - inner: Arc, Option + Send>>)>>, + inner: Arc< + Mutex<( + BroadcastData, + Option> + Send>>>, + )>, + >, } -impl BroadcastFuture { - +impl BroadcastFuture { /// Create instance for specified source future. /// /// The result of the future is sent to all registered listeners. - pub fn new(source: Box + Send>) -> Self { - Self { inner: Arc::new(Mutex::new((BroadcastData::new(), Some(source)))) } + pub fn new(source: Box> + Send>) -> Self { + Self { inner: Arc::new(Mutex::new((BroadcastData::new(), Some(Pin::from(source))))) } } /// Creates a new instance with a oneshot channel as trigger pub fn new_oneshot() -> (Self, oneshot::Sender>) { let (tx, rx) = oneshot::channel::>(); - let rx = rx.map_err(Error::from).flatten(); - let test = Box::new(rx); + let rx = rx + .map_err(Error::from) + .and_then(|res| futures::future::ready(res)); - (Self::new(test), tx) + (Self::new(Box::new(rx)), tx) } - fn notify_listeners(inner: Arc, Option + Send>>)>>, result: Result) { + fn notify_listeners( + inner: Arc< + Mutex<( + BroadcastData, + Option> + Send>>>, + )>, + >, + result: Result, + ) { let mut data = inner.lock().unwrap(); data.0.notify_listeners(result); } - fn spawn(inner: Arc, Option + Send>>)>>) -> impl Future { - + fn spawn( + inner: Arc< + Mutex<( + BroadcastData, + Option> + Send>>>, + )>, + >, + ) -> impl Future> { let mut data = inner.lock().unwrap(); if let Some(source) = data.1.take() { let inner1 = inner.clone(); - let task = source.then(move |value| { + let task = source.map(move |value| { match value { Ok(value) => Self::notify_listeners(inner1, Ok(value.clone())), Err(err) => Self::notify_listeners(inner1, Err(err.to_string())), } - Ok(()) }); tokio::spawn(task); } @@ -103,9 +130,9 @@ impl BroadcastFuture { } /// Register a listener - pub fn listen(&self) -> impl Future { + pub fn listen(&self) -> impl Future> { let inner2 = self.inner.clone(); - futures::future::lazy(move || { Self::spawn(inner2) }) + async move { Self::spawn(inner2).await } } } @@ -118,28 +145,27 @@ fn test_broadcast_future() { let (sender, trigger) = BroadcastFuture::new_oneshot(); let receiver1 = sender.listen() - .and_then(|res| { + .map_ok(|res| { CHECKSUM.fetch_add(res, Ordering::SeqCst); - Ok(()) }) - .map_err(|err| { panic!("got errror {}", err); }); + .map_err(|err| { panic!("got errror {}", err); }) + .map(|_| ()); let receiver2 = sender.listen() - .and_then(|res| { + .map_ok(|res| { CHECKSUM.fetch_add(res*2, Ordering::SeqCst); - Ok(()) }) - .map_err(|err| { panic!("got errror {}", err); }); - - tokio::run(futures::future::lazy(move || { + .map_err(|err| { panic!("got errror {}", err); }) + .map(|_| ()); + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { tokio::spawn(receiver1); tokio::spawn(receiver2); trigger.send(Ok(1)).unwrap(); - - Ok(()) - })); + }); + rt.shutdown_on_idle(); let result = CHECKSUM.load(Ordering::SeqCst);