From e8edbbd49cdb7c9061459b3732b391172d06793a Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Wed, 16 Jan 2019 10:15:39 +0100 Subject: [PATCH] client/catar_backup_stream.rs: new helper for catar uploads to server --- src/client/catar_backup_stream.rs | 86 +++++++++++++++++++++++++++++++ src/lib.rs | 5 ++ 2 files changed, 91 insertions(+) create mode 100644 src/client/catar_backup_stream.rs diff --git a/src/client/catar_backup_stream.rs b/src/client/catar_backup_stream.rs new file mode 100644 index 00000000..f61fad37 --- /dev/null +++ b/src/client/catar_backup_stream.rs @@ -0,0 +1,86 @@ +use failure::*; + +use std::thread; +use std::os::unix::io::FromRawFd; + +use futures::{Async, Poll}; +use futures::stream::Stream; + +use nix::fcntl::OFlag; +use nix::sys::stat::Mode; + +use crate::catar::encoder::*; + +pub struct CaTarBackupStream { + pipe: Option, + buffer: Vec, + child: Option>, +} + +impl Drop for CaTarBackupStream { + + fn drop(&mut self) { + drop(self.pipe.take()); + self.child.take().unwrap().join().unwrap(); + } +} + +impl CaTarBackupStream { + + pub fn new(dirname: &str) -> Result { + let mut buffer = Vec::with_capacity(4096); + unsafe { buffer.set_len(buffer.capacity()); } + + let (rx, tx) = nix::unistd::pipe()?; + + let mut dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; + let path = std::path::PathBuf::from(dirname); + + let child = thread::spawn(move|| { + let mut writer = unsafe { std::fs::File::from_raw_fd(tx) }; + if let Err(err) = CaTarEncoder::encode(path, &mut dir, None, &mut writer) { + eprintln!("catar encode failed - {}", err); + } + }); + + let pipe = unsafe { std::fs::File::from_raw_fd(rx) }; + + Ok(Self { pipe: Some(pipe), buffer, child: Some(child) }) + } +} + +impl Stream for CaTarBackupStream { + + type Item = Vec; + type Error = Error; + + // Note: This is not async!! + + fn poll(&mut self) -> Poll>, Error> { + + use std::io::Read; + + loop { + let pipe = match self.pipe { + Some(ref mut pipe) => pipe, + None => unreachable!(), + }; + match pipe.read(&mut self.buffer) { + Ok(n) => { + if n == 0 { + return Ok(Async::Ready(None)) + } else { + let data = self.buffer[..n].to_vec(); + return Ok(Async::Ready(Some(data))) + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => { + // try again + } + Err(err) => { + return Err(err.into()) + } + }; + } + } +} diff --git a/src/lib.rs b/src/lib.rs index af2b3870..725d8e7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,3 +56,8 @@ pub mod cli { pub mod api3; + +pub mod client { + + pub mod catar_backup_stream; +}