diff --git a/src/tools.rs b/src/tools.rs index da1c95f4..baf8cc4b 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -20,6 +20,7 @@ use std::os::unix::io::AsRawFd; use serde_json::Value; pub mod timer; +pub mod wrapped_reader_stream; /// The `BufferedReader` trait provides a single function /// `buffered_read`. It returns a reference to an internal buffer. The diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs new file mode 100644 index 00000000..d930e022 --- /dev/null +++ b/src/tools/wrapped_reader_stream.rs @@ -0,0 +1,55 @@ +use failure::*; +use tokio::io::{AsyncRead}; +use std::io::Read; +use futures::Async; +use futures::stream::Stream; + +pub struct WrappedReaderStream { + reader: R, +} + +impl WrappedReaderStream { + + pub fn new(reader: R) -> Self { + Self { reader } + } +} + +impl Read for WrappedReaderStream { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + //tokio::io::would_block(|| self.reader.read(buf)) + // fixme: howto?? + self.reader.read(buf) + } +} + +impl AsyncRead for WrappedReaderStream { + // fixme:???!!? + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } +} + +impl Stream for WrappedReaderStream { + + type Item = Vec; + type Error = std::io::Error; + + fn poll(&mut self) -> Result>>, std::io::Error> { + let mut buf = [0u8;64*1024]; + match self.poll_read(&mut buf) { + Ok(Async::Ready(n)) => { + // By convention, if an AsyncRead says that it read 0 bytes, + // we should assume that it has got to the end, so we signal that + // the Stream is done in this case by returning None: + if n == 0 { + Ok(Async::Ready(None)) + } else { + Ok(Async::Ready(Some(buf[..n].to_vec()))) + } + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => Err(e) + } + } +}