diff --git a/src/backup.rs b/src/backup.rs index 232a9330..ce6d5922 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -146,6 +146,9 @@ pub use data_blob_reader::*; mod data_blob_writer; pub use data_blob_writer::*; +mod catalog_blob; +pub use catalog_blob::*; + mod chunk_stream; pub use chunk_stream::*; diff --git a/src/backup/catalog_blob.rs b/src/backup/catalog_blob.rs new file mode 100644 index 00000000..0d53b3de --- /dev/null +++ b/src/backup/catalog_blob.rs @@ -0,0 +1,225 @@ +use failure::*; +use std::sync::Arc; +use std::ffi::{CStr, CString}; +use std::io::{Read, BufRead, BufReader, Write, Seek}; +use std::convert::TryFrom; + +use chrono::offset::{TimeZone, Local}; + +use proxmox::tools::io::ReadExt; + +use crate::pxar::catalog::{BackupCatalogWriter, CatalogEntryType}; + +use super::{DataBlobWriter, DataBlobReader, CryptConfig}; + +pub struct CatalogBlobWriter { + writer: DataBlobWriter, + level: usize, +} + +impl CatalogBlobWriter { + pub fn new_compressed(writer: W) -> Result { + let writer = DataBlobWriter::new_compressed(writer)?; + Ok(Self { writer, level: 0 }) + } + pub fn new_signed_compressed(writer: W, config: Arc) -> Result { + let writer = DataBlobWriter::new_signed_compressed(writer, config)?; + Ok(Self { writer, level: 0 }) + } + pub fn new_encrypted_compressed(writer: W, config: Arc) -> Result { + let writer = DataBlobWriter::new_encrypted_compressed(writer, config)?; + Ok(Self { writer, level: 0 }) + } + pub fn finish(self) -> Result { + self.writer.finish() + } +} + +impl BackupCatalogWriter for CatalogBlobWriter { + + fn start_directory(&mut self, name: &CStr) -> Result<(), Error> { + self.writer.write(&[CatalogEntryType::Directory as u8])?; + self.writer.write(name.to_bytes_with_nul())?; + self.writer.write(b"{")?; + self.level += 1; + Ok(()) + } + + fn end_directory(&mut self) -> Result<(), Error> { + if self.level == 0 { + bail!("got unexpected end_directory level 0"); + } + self.writer.write(b"}")?; + self.level -= 1; + Ok(()) + } + + fn add_file(&mut self, name: &CStr, size: u64, mtime: u64) -> Result<(), Error> { + self.writer.write(&[CatalogEntryType::File as u8])?; + self.writer.write(&size.to_le_bytes())?; + self.writer.write(&mtime.to_le_bytes())?; + self.writer.write(name.to_bytes_with_nul())?; + Ok(()) + } + + fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> { + self.writer.write(&[CatalogEntryType::Symlink as u8])?; + self.writer.write(name.to_bytes_with_nul())?; + Ok(()) + } + + fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> { + self.writer.write(&[CatalogEntryType::Hardlink as u8])?; + self.writer.write(name.to_bytes_with_nul())?; + Ok(()) + } + + fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> { + self.writer.write(&[CatalogEntryType::BlockDevice as u8])?; + self.writer.write(name.to_bytes_with_nul())?; + Ok(()) + } + + fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> { + self.writer.write(&[CatalogEntryType::CharDevice as u8])?; + self.writer.write(name.to_bytes_with_nul())?; + Ok(()) + } + + fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> { + self.writer.write(&[CatalogEntryType::Fifo as u8])?; + self.writer.write(name.to_bytes_with_nul())?; + Ok(()) + } + + fn add_socket(&mut self, name: &CStr) -> Result<(), Error> { + self.writer.write(&[CatalogEntryType::Socket as u8])?; + self.writer.write(name.to_bytes_with_nul())?; + Ok(()) + } +} + + +pub struct CatalogBlobReader { + reader: BufReader>, + dir_stack: Vec, +} + +impl CatalogBlobReader { + + pub fn new(reader: R, crypt_config: Option>) -> Result { + let dir_stack = Vec::new(); + + let reader = BufReader::new(DataBlobReader::new(reader, crypt_config)?); + + Ok(Self { reader, dir_stack }) + } + + fn read_filename(&mut self) -> Result { + let mut filename = Vec::new(); + self.reader.read_until(0u8, &mut filename)?; + if filename.len() > 0 && filename[filename.len()-1] == 0u8 { + filename.pop(); + } + if filename.len() == 0 { + bail!("got zero length filename"); + } + if filename.iter().find(|b| **b == b'/').is_some() { + bail!("found invalid filename with slashes."); + } + Ok(unsafe { CString::from_vec_unchecked(filename) }) + } + + fn next_byte(&mut self) -> Result { + let mut buf = [0u8; 1]; + self.reader.read_exact(&mut buf)?; + Ok(buf[0]) + } + + fn expect_next(&mut self, expect: u8) -> Result<(), Error> { + let next = self.next_byte()?; + if next != expect { + bail!("got unexpected byte ({} != {})", next, expect); + } + Ok(()) + } + + fn print_entry(&self, etype: CatalogEntryType, filename: &CStr, size: u64, mtime: u64) -> Result<(), Error> { + let mut out = Vec::new(); + + write!(out, "{} ", char::from(etype as u8))?; + + for name in &self.dir_stack { + out.extend(name.to_bytes()); + out.push(b'/'); + } + + out.extend(filename.to_bytes()); + + let dt = Local.timestamp(mtime as i64, 0); + + if etype == CatalogEntryType::File { + write!(out, " {} {}", size, dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false))?; + } + + write!(out, "\n")?; + std::io::stdout().write_all(&out)?; + + Ok(()) + } + + fn parse_entries(&mut self) -> Result<(), Error> { + + loop { + let etype = match self.next_byte() { + Ok(v) => v, + Err(err) => { + if err.kind() == std::io::ErrorKind::UnexpectedEof { + if self.dir_stack.len() == 0 { + break; + } + } + return Err(err.into()); + } + }; + if etype == b'}' { + if self.dir_stack.pop().is_none() { + bail!("got unexpected '}'"); + } + break; + } + + let etype = CatalogEntryType::try_from(etype)?; + match etype { + CatalogEntryType::Directory => { + let filename = self.read_filename()?; + self.print_entry(etype.into(), &filename, 0, 0)?; + self.dir_stack.push(filename); + self.expect_next(b'{')?; + self.parse_entries()?; + } + CatalogEntryType::File => { + let size = unsafe { self.reader.read_le_value::()? }; + let mtime = unsafe { self.reader.read_le_value::()? }; + let filename = self.read_filename()?; + self.print_entry(etype.into(), &filename, size, mtime)?; + } + CatalogEntryType::Symlink | + CatalogEntryType::Hardlink | + CatalogEntryType::Fifo | + CatalogEntryType::Socket | + CatalogEntryType::BlockDevice | + CatalogEntryType::CharDevice => { + let filename = self.read_filename()?; + self.print_entry(etype.into(), &filename, 0, 0)?; + } + } + } + Ok(()) + } + + pub fn dump(&mut self) -> Result<(), Error> { + self.parse_entries()?; + Ok(()) + } +} diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 9b5082d1..f726008b 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -159,7 +159,7 @@ fn backup_directory>( verbose: bool, skip_lost_and_found: bool, crypt_config: Option>, - catalog: Arc>, + catalog: Arc>>, ) -> Result { let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found, catalog)?; @@ -457,9 +457,8 @@ fn dump_catalog( blob_file.seek(SeekFrom::Start(0))?; - let reader = BufReader::new(DataBlobReader::new(blob_file, crypt_config)?); - - let mut catalog_reader = pxar::catalog::SimpleCatalogReader::new(reader); + let reader = BufReader::new(blob_file); + let mut catalog_reader = CatalogBlobReader::new(reader, crypt_config)?; catalog_reader.dump()?; @@ -677,8 +676,14 @@ fn create_backup( let mut file_list = vec![]; - let catalog_filename = format!("/tmp/pbs-catalog-{}.cat", std::process::id()); - let catalog = Arc::new(Mutex::new(SimpleCatalog::new(&catalog_filename)?)); + // fixme: encrypt/sign catalog? + let catalog_file = std::fs::OpenOptions::new() + .write(true) + .read(true) + .custom_flags(libc::O_TMPFILE) + .open("/tmp")?; + + let catalog = Arc::new(Mutex::new(CatalogBlobWriter::new_compressed(catalog_file)?)); let mut upload_catalog = false; for (backup_type, filename, target, size) in upload_list { @@ -731,13 +736,14 @@ fn create_backup( if upload_catalog { let mutex = Arc::try_unwrap(catalog) .map_err(|_| format_err!("unable to get catalog (still used)"))?; - drop(mutex); // close catalog + let mut catalog_file = mutex.into_inner().unwrap().finish()?; let target = "catalog.blob"; - let stats = client.upload_blob_from_file(&catalog_filename, target, crypt_config.clone(), true).wait()?; - file_list.push((target.to_owned(), stats)); - let _ = std::fs::remove_file(&catalog_filename); + catalog_file.seek(SeekFrom::Start(0))?; + + let stats = client.upload_blob(catalog_file, target).wait()?; + file_list.push((target.to_owned(), stats)); } if let Some(rsa_encrypted_key) = rsa_encrypted_key { diff --git a/src/bin/pxar.rs b/src/bin/pxar.rs index d4770171..0f80d94d 100644 --- a/src/bin/pxar.rs +++ b/src/bin/pxar.rs @@ -209,7 +209,7 @@ fn create_archive( feature_flags ^= pxar::flags::WITH_SOCKETS; } - let catalog = None::<&mut pxar::catalog::SimpleCatalog>; + let catalog = None::<&mut pxar::catalog::DummyCatalogWriter>; pxar::Encoder::encode(source, &mut dir, &mut writer, catalog, devices, verbose, false, feature_flags)?; writer.flush()?; diff --git a/src/client/http_client.rs b/src/client/http_client.rs index ab68582a..a190ce53 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -626,6 +626,33 @@ impl BackupClient { self.canceller.cancel(); } + pub fn upload_blob( + &self, + mut reader: R, + file_name: &str, + ) -> impl Future { + + let h2 = self.h2.clone(); + let file_name = file_name.to_owned(); + + futures::future::ok(()) + .and_then(move |_| { + let mut raw_data = Vec::new(); + // fixme: avoid loading into memory + reader.read_to_end(&mut raw_data)?; + Ok(raw_data) + }) + .and_then(move |raw_data| { + let csum = openssl::sha::sha256(&raw_data); + let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); + let size = raw_data.len() as u64; // fixme: should be decoded size instead?? + h2.upload("blob", Some(param), raw_data) + .map(move |_| { + BackupStats { size, csum } + }) + }) + } + pub fn upload_blob_from_data( &self, data: Vec, diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs index 7db60bd3..0e837991 100644 --- a/src/client/pxar_backup_stream.rs +++ b/src/client/pxar_backup_stream.rs @@ -1,5 +1,5 @@ use failure::*; - +use std::io::{Write, Seek}; use std::thread; use std::sync::{Arc, Mutex}; use std::os::unix::io::FromRawFd; @@ -14,6 +14,8 @@ use nix::sys::stat::Mode; use nix::dir::Dir; use crate::pxar; +use crate::backup::CatalogBlobWriter; + use crate::tools::wrapped_reader_stream::WrappedReaderStream; /// Stream implementation to encode and upload .pxar archives. @@ -37,13 +39,13 @@ impl Drop for PxarBackupStream { impl PxarBackupStream { - pub fn new( + pub fn new( mut dir: Dir, path: PathBuf, device_set: Option>, verbose: bool, skip_lost_and_found: bool, - catalog: Arc>, + catalog: Arc>>, ) -> Result { let (rx, tx) = nix::unistd::pipe()?; @@ -74,12 +76,12 @@ impl PxarBackupStream { }) } - pub fn open( + pub fn open( dirname: &Path, device_set: Option>, verbose: bool, skip_lost_and_found: bool, - catalog: Arc>, + catalog: Arc>>, ) -> Result { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; diff --git a/src/pxar/catalog.rs b/src/pxar/catalog.rs index 1a6449ac..fa2db236 100644 --- a/src/pxar/catalog.rs +++ b/src/pxar/catalog.rs @@ -1,18 +1,11 @@ -//! File list Catalog +//! Trait for file list catalog //! //! A file list catalog simply store a directory tree. Such catalogs //! may be used as index to do a fast search for files. use failure::*; - -use std::io::{Read, BufRead, Write}; use std::convert::TryFrom; - -use std::ffi::{CStr, CString}; - -use chrono::offset::{TimeZone, Local}; - -use proxmox::tools::io::ReadExt; +use std::ffi::CStr; #[repr(u8)] #[derive(Copy,Clone,PartialEq)] @@ -57,204 +50,16 @@ pub trait BackupCatalogWriter { fn add_socket(&mut self, name: &CStr) -> Result<(), Error>; } -pub struct SimpleCatalog { - writer: std::fs::File, - level: usize, -} - -impl SimpleCatalog { - pub fn new>(path: P) -> Result { - let writer = std::fs::OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(path.as_ref())?; - - Ok(Self { writer, level: 0 }) - } -} - -impl BackupCatalogWriter for SimpleCatalog { - - fn start_directory(&mut self, name: &CStr) -> Result<(), Error> { - self.writer.write(&[CatalogEntryType::Directory as u8])?; - self.writer.write(name.to_bytes_with_nul())?; - self.writer.write(b"{")?; - self.level += 1; - Ok(()) - } - - fn end_directory(&mut self) -> Result<(), Error> { - if self.level == 0 { - bail!("got unexpected end_directory level 0"); - } - self.writer.write(b"}")?; - self.level -= 1; - Ok(()) - } - - fn add_file(&mut self, name: &CStr, size: u64, mtime: u64) -> Result<(), Error> { - self.writer.write(&[CatalogEntryType::File as u8])?; - self.writer.write(&size.to_le_bytes())?; - self.writer.write(&mtime.to_le_bytes())?; - self.writer.write(name.to_bytes_with_nul())?; - Ok(()) - } - - fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> { - self.writer.write(&[CatalogEntryType::Symlink as u8])?; - self.writer.write(name.to_bytes_with_nul())?; - Ok(()) - } - - fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> { - self.writer.write(&[CatalogEntryType::Hardlink as u8])?; - self.writer.write(name.to_bytes_with_nul())?; - Ok(()) - } - - fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> { - self.writer.write(&[CatalogEntryType::BlockDevice as u8])?; - self.writer.write(name.to_bytes_with_nul())?; - Ok(()) - } - - fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> { - self.writer.write(&[CatalogEntryType::CharDevice as u8])?; - self.writer.write(name.to_bytes_with_nul())?; - Ok(()) - } - - fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> { - self.writer.write(&[CatalogEntryType::Fifo as u8])?; - self.writer.write(name.to_bytes_with_nul())?; - Ok(()) - } - - fn add_socket(&mut self, name: &CStr) -> Result<(), Error> { - self.writer.write(&[CatalogEntryType::Socket as u8])?; - self.writer.write(name.to_bytes_with_nul())?; - Ok(()) - } -} - -pub struct SimpleCatalogReader { - reader: R, - dir_stack: Vec, -} - -impl SimpleCatalogReader { - - pub fn new(reader: R) -> Self { - let dir_stack = Vec::new(); - Self { reader, dir_stack } - } - - fn read_filename(&mut self) -> Result { - let mut filename = Vec::new(); - self.reader.read_until(0u8, &mut filename)?; - if filename.len() > 0 && filename[filename.len()-1] == 0u8 { - filename.pop(); - } - if filename.len() == 0 { - bail!("got zero length filename"); - } - if filename.iter().find(|b| **b == b'/').is_some() { - bail!("found invalid filename with slashes."); - } - Ok(unsafe { CString::from_vec_unchecked(filename) }) - } - - fn next_byte(&mut self) -> Result { - let mut buf = [0u8; 1]; - self.reader.read_exact(&mut buf)?; - Ok(buf[0]) - } - - fn expect_next(&mut self, expect: u8) -> Result<(), Error> { - let next = self.next_byte()?; - if next != expect { - bail!("got unexpected byte ({} != {})", next, expect); - } - Ok(()) - } - - fn print_entry(&self, etype: CatalogEntryType, filename: &CStr, size: u64, mtime: u64) -> Result<(), Error> { - let mut out = Vec::new(); - - write!(out, "{} ", char::from(etype as u8))?; - - for name in &self.dir_stack { - out.extend(name.to_bytes()); - out.push(b'/'); - } - - out.extend(filename.to_bytes()); - - let dt = Local.timestamp(mtime as i64, 0); - - if etype == CatalogEntryType::File { - write!(out, " {} {}", size, dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false))?; - } - - write!(out, "\n")?; - std::io::stdout().write_all(&out)?; - - Ok(()) - } - - fn parse_entries(&mut self) -> Result<(), Error> { - - loop { - let etype = match self.next_byte() { - Ok(v) => v, - Err(err) => { - if err.kind() == std::io::ErrorKind::UnexpectedEof { - if self.dir_stack.len() == 0 { - break; - } - } - return Err(err.into()); - } - }; - if etype == b'}' { - if self.dir_stack.pop().is_none() { - bail!("got unexpected '}'"); - } - break; - } - - let etype = CatalogEntryType::try_from(etype)?; - match etype { - CatalogEntryType::Directory => { - let filename = self.read_filename()?; - self.print_entry(etype.into(), &filename, 0, 0)?; - self.dir_stack.push(filename); - self.expect_next(b'{')?; - self.parse_entries()?; - } - CatalogEntryType::File => { - let size = unsafe { self.reader.read_le_value::()? }; - let mtime = unsafe { self.reader.read_le_value::()? }; - let filename = self.read_filename()?; - self.print_entry(etype.into(), &filename, size, mtime)?; - } - CatalogEntryType::Symlink | - CatalogEntryType::Hardlink | - CatalogEntryType::Fifo | - CatalogEntryType::Socket | - CatalogEntryType::BlockDevice | - CatalogEntryType::CharDevice => { - let filename = self.read_filename()?; - self.print_entry(etype.into(), &filename, 0, 0)?; - } - } - } - Ok(()) - } - - pub fn dump(&mut self) -> Result<(), Error> { - self.parse_entries()?; - Ok(()) - } +pub struct DummyCatalogWriter(); + +impl BackupCatalogWriter for DummyCatalogWriter { + fn start_directory(&mut self, _name: &CStr) -> Result<(), Error> { Ok(()) } + fn end_directory(&mut self) -> Result<(), Error> { Ok(()) } + fn add_file(&mut self, _name: &CStr, _size: u64, _mtime: u64) -> Result<(), Error> { Ok(()) } + fn add_symlink(&mut self, _name: &CStr) -> Result<(), Error> { Ok(()) } + fn add_hardlink(&mut self, _name: &CStr) -> Result<(), Error> { Ok(()) } + fn add_block_device(&mut self, _name: &CStr) -> Result<(), Error> { Ok(()) } + fn add_char_device(&mut self, _name: &CStr) -> Result<(), Error> { Ok(()) } + fn add_fifo(&mut self, _name: &CStr) -> Result<(), Error> { Ok(()) } + fn add_socket(&mut self, _name: &CStr) -> Result<(), Error> { Ok(()) } } diff --git a/tests/catar.rs b/tests/catar.rs index 46b82f17..fdbb7135 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -26,7 +26,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { let path = std::path::PathBuf::from(dir_name); - let catalog = None::<&mut catalog::SimpleCatalog>; + let catalog = None::<&mut catalog::DummyCatalogWriter>; Encoder::encode(path, &mut dir, &mut writer, catalog, None, false, false, flags::DEFAULT)?; Command::new("cmp")