diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs index 4ad091b7..965163e6 100644 --- a/src/tools/wrapped_reader_stream.rs +++ b/src/tools/wrapped_reader_stream.rs @@ -1,10 +1,8 @@ use failure::*; use tokio_threadpool; -use tokio::io::{AsyncRead}; use std::io::Read; use futures::Async; use futures::stream::Stream; -use std::io::ErrorKind::{Other, WouldBlock}; pub struct WrappedReaderStream { reader: R, @@ -17,24 +15,10 @@ impl WrappedReaderStream { } } -impl Read for WrappedReaderStream { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - //tokio::io::would_block(|| self.reader.read(buf)) - // fixme: howto?? - match tokio_threadpool::blocking(|| self.reader.read(buf)) { - Ok(Async::Ready(res)) => res, - Ok(Async::NotReady) => Err(WouldBlock.into()), - Err(err) => Err(std::io::Error::new(Other, "`blocking` annotated I/O must be called \ - from the context of the Tokio runtime.")), - } - } -} - -impl AsyncRead for WrappedReaderStream { - // fixme:???!!? - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { - false - } +fn blocking_err() -> std::io::Error { + std::io::Error::new( + std::io::ErrorKind::Other, + "`blocking` annotated I/O must be called from the context of the Tokio runtime.") } impl Stream for WrappedReaderStream { @@ -44,19 +28,17 @@ impl Stream for WrappedReaderStream { fn poll(&mut self) -> Result>>, std::io::Error> { let mut buf = [0u8;64*1024]; - match self.poll_read(&mut buf) { - Ok(Async::Ready(n)) => { - // By convention, if an AsyncRead says that it read 0 bytes, - // we should assume that it has got to the end, so we signal that - // the Stream is done in this case by returning None: - if n == 0 { + match tokio_threadpool::blocking(|| self.reader.read(&mut buf)) { + Ok(Async::Ready(Ok(n))) => { + if n == 0 { // EOF Ok(Async::Ready(None)) } else { Ok(Async::Ready(Some(buf[..n].to_vec()))) } }, + Ok(Async::Ready(Err(err))) => Err(err), Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => Err(e) + Err(err) => Err(blocking_err()), } } }