src/tools/broadcast_future.rs: add new constructor new_oneshot()

To simplify usage. Also added a test case.
This commit is contained in:
Dietmar Maurer 2019-04-27 10:56:49 +02:00
parent 152764ec15
commit 3dceb9b304
2 changed files with 49 additions and 1 deletions

View File

@ -47,7 +47,8 @@ $(SUBDIRS):
$(MAKE) -C $@
test:
cargo test $(CARGO_BUILD_ARGS)
cargo test test_broadcast_future
#cargo test $(CARGO_BUILD_ARGS)
doc:
cargo doc --no-deps $(CARGO_BUILD_ARGS)

View File

@ -29,6 +29,16 @@ impl <T: Clone + Send + 'static> BroadcastFuture<T> {
Self { inner: Arc::new(Mutex::new(data)) }
}
/// Creates a new instance with a oneshot channel as trigger
pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
let (tx, rx) = oneshot::channel::<Result<T, Error>>();
let rx = rx.map_err(Error::from).flatten();
let test = Box::new(rx);
(Self::new(test), tx)
}
fn update(inner: Arc<Mutex<BroadcastData<T>>>, result: Result<T, String>) {
let mut data = inner.lock().unwrap();
@ -84,3 +94,40 @@ impl <T: Clone + Send + 'static> BroadcastFuture<T> {
futures::future::lazy(move || { Self::spawn(inner2) })
}
}
#[test]
fn test_broadcast_future() {
use std::sync::atomic::{AtomicUsize, Ordering};
static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
let (sender, trigger) = BroadcastFuture::new_oneshot();
let receiver1 = sender.listen()
.and_then(|res| {
CHECKSUM.fetch_add(res, Ordering::SeqCst);
Ok(())
})
.map_err(|err| { panic!("got errror {}", err); });
let receiver2 = sender.listen()
.and_then(|res| {
CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
Ok(())
})
.map_err(|err| { panic!("got errror {}", err); });
tokio::run(futures::future::lazy(move || {
tokio::spawn(receiver1);
tokio::spawn(receiver2);
trigger.send(Ok(1)).unwrap();
Ok(())
}));
let result = CHECKSUM.load(Ordering::SeqCst);
assert!(result == 3);
}