//use failure::*; use tokio_threadpool; use std::io::Read; use futures::Async; use futures::stream::Stream; pub struct WrappedReaderStream { reader: R, buffer: Vec, } impl WrappedReaderStream { pub fn new(reader: R) -> Self { let mut buffer = Vec::with_capacity(64*1024); unsafe { buffer.set_len(buffer.capacity()); } Self { reader, buffer } } } fn blocking_err() -> std::io::Error { std::io::Error::new( std::io::ErrorKind::Other, "`blocking` annotated I/O must be called from the context of the Tokio runtime.") } impl Stream for WrappedReaderStream { type Item = Vec; type Error = std::io::Error; fn poll(&mut self) -> Result>>, std::io::Error> { match tokio_threadpool::blocking(|| self.reader.read(&mut self.buffer)) { Ok(Async::Ready(Ok(n))) => { if n == 0 { // EOF Ok(Async::Ready(None)) } else { Ok(Async::Ready(Some(self.buffer[..n].to_vec()))) } }, Ok(Async::Ready(Err(err))) => Err(err), Ok(Async::NotReady) => Ok(Async::NotReady), Err(_) => Err(blocking_err()), } } }