use failure::*; use std::thread; use std::os::unix::io::FromRawFd; use std::path::{Path, PathBuf}; use futures::{Async, Poll}; use futures::stream::Stream; use nix::fcntl::OFlag; use nix::sys::stat::Mode; use nix::dir::Dir; use crate::catar::encoder::*; /// Stream implementation to encode and upload .catar archives. /// /// The hyper client needs an async Stream for file upload, so we /// spawn an extra thread to encode the .catar data and pipe it to the /// consumer. /// /// Note: The currect implementation is not fully ansync and can block. pub struct CaTarBackupStream { pipe: Option, buffer: Vec, child: Option>, } impl Drop for CaTarBackupStream { fn drop(&mut self) { drop(self.pipe.take()); self.child.take().unwrap().join().unwrap(); } } impl CaTarBackupStream { pub fn new(mut dir: Dir, path: PathBuf, verbose: bool) -> Result { let mut buffer = Vec::with_capacity(4096); unsafe { buffer.set_len(buffer.capacity()); } let (rx, tx) = nix::unistd::pipe()?; let child = thread::spawn(move|| { let mut writer = unsafe { std::fs::File::from_raw_fd(tx) }; let device_list = vec![]; if let Err(err) = CaTarEncoder::encode(path, &mut dir, Some(device_list), &mut writer, verbose) { eprintln!("catar encode failed - {}", err); } }); let pipe = unsafe { std::fs::File::from_raw_fd(rx) }; Ok(Self { pipe: Some(pipe), buffer, child: Some(child) }) } pub fn open(dirname: &Path, verbose: bool) -> Result { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; let path = std::path::PathBuf::from(dirname); Self::new(dir, path, verbose) } } impl Stream for CaTarBackupStream { type Item = Vec; type Error = Error; // Note: This is not async!! fn poll(&mut self) -> Poll>, Error> { use std::io::Read; loop { let pipe = match self.pipe { Some(ref mut pipe) => pipe, None => unreachable!(), }; match pipe.read(&mut self.buffer) { Ok(n) => { if n == 0 { return Ok(Async::Ready(None)) } else { let data = self.buffer[..n].to_vec(); return Ok(Async::Ready(Some(data))) } } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => { // try again } Err(err) => { return Err(err.into()) } }; } } }