diff --git a/src/backup.rs b/src/backup.rs index 905f60b7..0cb9ad75 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -103,7 +103,8 @@ //! //! Not sure if this is better. TODO -pub const CATALOG_BLOB_NAME: &str = "catalog.blob"; +// Note: .pcat1 => Proxmox Catalog Format version 1 +pub const CATALOG_NAME: &str = "catalog.pcat1.didx"; #[macro_export] macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 { diff --git a/src/backup/catalog_blob.rs b/src/backup/catalog_blob.rs index 478c1a1f..ac6898e3 100644 --- a/src/backup/catalog_blob.rs +++ b/src/backup/catalog_blob.rs @@ -1,7 +1,8 @@ use failure::*; -use std::sync::Arc; use std::ffi::{CStr, CString}; -use std::io::{Read, BufRead, BufReader, Write, Seek}; +use std::os::unix::ffi::OsStringExt; +use std::convert::TryInto; +use std::io::{Read, Write, Seek, SeekFrom}; use std::convert::TryFrom; use chrono::offset::{TimeZone, Local}; @@ -10,97 +11,207 @@ use proxmox::tools::io::ReadExt; use crate::pxar::catalog::{BackupCatalogWriter, CatalogEntryType}; -use super::{DataBlobWriter, DataBlobReader, CryptConfig}; +enum DirEntry { + Directory { name: Vec, start: u64 }, + File { name: Vec, size: u64, mtime: u64 }, + Symlink { name: Vec }, + Hardlink { name: Vec }, + BlockDevice { name: Vec }, + CharDevice { name: Vec }, + Fifo { name: Vec }, + Socket { name: Vec }, +} -pub struct CatalogBlobWriter { - writer: DataBlobWriter, - level: usize, +struct DirInfo { + name: CString, + entries: Vec, +} + +impl DirInfo { + + fn new(name: CString) -> Self { + DirInfo { name, entries: Vec::new() } + } + + fn new_rootdir() -> Self { + DirInfo::new(CString::new(b"/".to_vec()).unwrap()) + } + + fn encode_entry(data: &mut Vec, entry: &DirEntry, pos: u64) { + match entry { + DirEntry::Directory { name, start } => { + data.push(CatalogEntryType::Directory as u8); + data.extend_from_slice(&(name.len() as u32).to_le_bytes()); + data.extend_from_slice(name); + data.extend_from_slice(&(pos-start).to_le_bytes()); + } + DirEntry::File { name, size, mtime } => { + data.push(CatalogEntryType::File as u8); + data.extend_from_slice(&(name.len() as u32).to_le_bytes()); + data.extend_from_slice(name); + data.extend_from_slice(&size.to_le_bytes()); + data.extend_from_slice(&mtime.to_le_bytes()); + } + DirEntry::Symlink { name } => { + data.push(CatalogEntryType::Symlink as u8); + data.extend_from_slice(&(name.len() as u32).to_le_bytes()); + data.extend_from_slice(name); + } + DirEntry::Hardlink { name } => { + data.push(CatalogEntryType::Hardlink as u8); + data.extend_from_slice(&(name.len() as u32).to_le_bytes()); + data.extend_from_slice(name); + } + DirEntry::BlockDevice { name } => { + data.push(CatalogEntryType::BlockDevice as u8); + data.extend_from_slice(&(name.len() as u32).to_le_bytes()); + data.extend_from_slice(name); + } + DirEntry::CharDevice { name } => { + data.push(CatalogEntryType::CharDevice as u8); + data.extend_from_slice(&(name.len() as u32).to_le_bytes()); + data.extend_from_slice(name); + } + DirEntry::Fifo { name } => { + data.push(CatalogEntryType::Fifo as u8); + data.extend_from_slice(&(name.len() as u32).to_le_bytes()); + data.extend_from_slice(name); + } + DirEntry::Socket { name } => { + data.push(CatalogEntryType::Socket as u8); + data.extend_from_slice(&(name.len() as u32).to_le_bytes()); + data.extend_from_slice(name); + } + } + } + + fn encode(self, start: u64) -> Result<(CString, Vec), Error> { + let mut table = Vec::new(); + let count: u32 = self.entries.len().try_into()?; + for entry in self.entries { + Self::encode_entry(&mut table, &entry, start); + } + + let data = Vec::new(); + let mut writer = std::io::Cursor::new(data); + let size: u32 = (4 + 4 + table.len()).try_into()?; + writer.write_all(&size.to_le_bytes())?; + writer.write_all(&count.to_le_bytes())?; + writer.write_all(&table)?; + Ok((self.name, writer.into_inner())) + } +} + +pub struct CatalogWriter { + writer: W, + dirstack: Vec, pos: u64, } -impl CatalogBlobWriter { - pub fn new_compressed(writer: W) -> Result { - let writer = DataBlobWriter::new_compressed(writer)?; - Ok(Self { writer, level: 0, pos: 0 }) +impl CatalogWriter { + + pub fn new(writer: W) -> Result { + Ok(Self { writer, dirstack: vec![ DirInfo::new_rootdir() ], pos: 0 }) } - pub fn new_signed_compressed(writer: W, config: Arc) -> Result { - let writer = DataBlobWriter::new_signed_compressed(writer, config)?; - Ok(Self { writer, level: 0, pos: 0 }) - } - pub fn new_encrypted_compressed(writer: W, config: Arc) -> Result { - let writer = DataBlobWriter::new_encrypted_compressed(writer, config)?; - Ok(Self { writer, level: 0, pos: 0 }) - } - pub fn finish(self) -> Result { - self.writer.finish() + + pub fn finish(&mut self) -> Result<(), Error> { + if self.dirstack.len() != 1 { + bail!("unable to finish catalog at level {}", self.dirstack.len()); + } + + let dir = self.dirstack.pop().unwrap(); + + let start = self.pos; + let (_, data) = dir.encode(start)?; + self.write_all(&data)?; + + self.write_all(&start.to_le_bytes())?; + + self.writer.flush()?; + + Ok(()) } } -impl BackupCatalogWriter for CatalogBlobWriter { +impl BackupCatalogWriter for CatalogWriter { fn start_directory(&mut self, name: &CStr) -> Result<(), Error> { - self.write_all(&[CatalogEntryType::Directory as u8])?; - self.write_all(name.to_bytes_with_nul())?; - self.write_all(b"{")?; - self.level += 1; + let new = DirInfo::new(name.to_owned()); + self.dirstack.push(new); Ok(()) } fn end_directory(&mut self) -> Result<(), Error> { - if self.level == 0 { - bail!("got unexpected end_directory level 0"); - } - self.write_all(b"}")?; - self.level -= 1; + let (start, name) = match self.dirstack.pop() { + Some(dir) => { + let start = self.pos; + let (name, data) = dir.encode(start)?; + self.write_all(&data)?; + (start, name) + } + None => { + bail!("got unexpected end_directory level 0"); + } + }; + + let current = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let name = name.to_bytes().to_vec(); + current.entries.push(DirEntry::Directory { name, start }); + Ok(()) } fn add_file(&mut self, name: &CStr, size: u64, mtime: u64) -> Result<(), Error> { - self.write_all(&[CatalogEntryType::File as u8])?; - self.write_all(&size.to_le_bytes())?; - self.write_all(&mtime.to_le_bytes())?; - self.write_all(name.to_bytes_with_nul())?; + let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let name = name.to_bytes().to_vec(); + dir.entries.push(DirEntry::File { name, size, mtime }); Ok(()) } fn add_symlink(&mut self, name: &CStr) -> Result<(), Error> { - self.write_all(&[CatalogEntryType::Symlink as u8])?; - self.write_all(name.to_bytes_with_nul())?; + let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let name = name.to_bytes().to_vec(); + dir.entries.push(DirEntry::Symlink { name }); Ok(()) } fn add_hardlink(&mut self, name: &CStr) -> Result<(), Error> { - self.write_all(&[CatalogEntryType::Hardlink as u8])?; - self.write_all(name.to_bytes_with_nul())?; + let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let name = name.to_bytes().to_vec(); + dir.entries.push(DirEntry::Hardlink { name }); Ok(()) } fn add_block_device(&mut self, name: &CStr) -> Result<(), Error> { - self.write_all(&[CatalogEntryType::BlockDevice as u8])?; - self.write_all(name.to_bytes_with_nul())?; - Ok(()) + let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let name = name.to_bytes().to_vec(); + dir.entries.push(DirEntry::BlockDevice { name }); + Ok(()) } fn add_char_device(&mut self, name: &CStr) -> Result<(), Error> { - self.write_all(&[CatalogEntryType::CharDevice as u8])?; - self.write_all(name.to_bytes_with_nul())?; + let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let name = name.to_bytes().to_vec(); + dir.entries.push(DirEntry::CharDevice { name }); Ok(()) } fn add_fifo(&mut self, name: &CStr) -> Result<(), Error> { - self.write_all(&[CatalogEntryType::Fifo as u8])?; - self.write_all(name.to_bytes_with_nul())?; + let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let name = name.to_bytes().to_vec(); + dir.entries.push(DirEntry::Fifo { name }); Ok(()) } fn add_socket(&mut self, name: &CStr) -> Result<(), Error> { - self.write_all(&[CatalogEntryType::Socket as u8])?; - self.write_all(name.to_bytes_with_nul())?; + let dir = self.dirstack.last_mut().ok_or_else(|| format_err!("outside root"))?; + let name = name.to_bytes().to_vec(); + dir.entries.push(DirEntry::Socket { name }); Ok(()) } } -impl CatalogBlobWriter { +impl CatalogWriter { fn write_all(&mut self, data: &[u8]) -> Result<(), Error> { self.writer.write_all(data)?; self.pos += u64::try_from(data.len())?; @@ -108,125 +219,109 @@ impl CatalogBlobWriter { } } -pub struct CatalogBlobReader { - reader: BufReader>, - dir_stack: Vec, +// fixme: move to somehere else? +/// Implement Write to tokio mpsc channel Sender +pub struct SenderWriter(tokio::sync::mpsc::Sender, Error>>); + +impl SenderWriter { + pub fn new(sender: tokio::sync::mpsc::Sender, Error>>) -> Self { + Self(sender) + } } -impl CatalogBlobReader { - - pub fn new(reader: R, crypt_config: Option>) -> Result { - let dir_stack = Vec::new(); - - let reader = BufReader::new(DataBlobReader::new(reader, crypt_config)?); - - Ok(Self { reader, dir_stack }) +impl Write for SenderWriter { + fn write(&mut self, buf: &[u8]) -> Result { + futures::executor::block_on(async move { + self.0.send(Ok(buf.to_vec())).await + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?; + Ok(buf.len()) + }) } - fn read_filename(&mut self) -> Result { - let mut filename = Vec::new(); - self.reader.read_until(0u8, &mut filename)?; - if filename.len() > 0 && filename[filename.len()-1] == 0u8 { - filename.pop(); - } - if filename.len() == 0 { - bail!("got zero length filename"); - } - if filename.iter().find(|b| **b == b'/').is_some() { - bail!("found invalid filename with slashes."); - } - Ok(unsafe { CString::from_vec_unchecked(filename) }) + fn flush(&mut self) -> Result<(), std::io::Error> { + Ok(()) + } +} + +pub struct CatalogReader { + reader: R, +} + +impl CatalogReader { + + pub fn new(reader: R) -> Self { + Self { reader } } - fn next_byte(&mut self) -> Result { + fn next_byte(mut reader: C) -> Result { let mut buf = [0u8; 1]; - self.reader.read_exact(&mut buf)?; + reader.read_exact(&mut buf)?; Ok(buf[0]) } - fn expect_next(&mut self, expect: u8) -> Result<(), Error> { - let next = self.next_byte()?; - if next != expect { - bail!("got unexpected byte ({} != {})", next, expect); - } - Ok(()) + pub fn dump(&mut self) -> Result<(), Error> { + + self.reader.seek(SeekFrom::End(-8))?; + + let start = unsafe { self.reader.read_le_value::()? }; + + self.dump_dir(std::path::Path::new("./"), start) } - fn print_entry(&self, etype: CatalogEntryType, filename: &CStr, size: u64, mtime: u64) -> Result<(), Error> { - let mut out = Vec::new(); + pub fn dump_dir(&mut self, prefix: &std::path::Path, start: u64) -> Result<(), Error> { - write!(out, "{} ", char::from(etype as u8))?; + self.reader.seek(SeekFrom::Start(start))?; - for name in &self.dir_stack { - out.extend(name.to_bytes()); - out.push(b'/'); - } + let size = unsafe { self.reader.read_le_value::()? } as usize; - out.extend(filename.to_bytes()); + if size < 8 { bail!("got small directory size {}", size) }; - let dt = Local.timestamp(mtime as i64, 0); + let data = self.reader.read_exact_allocated(size - 4)?; - if etype == CatalogEntryType::File { - write!(out, " {} {}", size, dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false))?; - } + let mut cursor = &data[..]; - writeln!(out)?; - std::io::stdout().write_all(&out)?; + let entries = unsafe { cursor.read_le_value::()? }; - Ok(()) - } + //println!("TEST {} {} size {}", start, entries, size); - fn parse_entries(&mut self) -> Result<(), Error> { + for _ in 0..entries { + let etype = CatalogEntryType::try_from(Self::next_byte(&mut cursor)?)?; + let name_len = unsafe { cursor.read_le_value::()? }; + let name = cursor.read_exact_allocated(name_len as usize)?; - loop { - let etype = match self.next_byte() { - Ok(v) => v, - Err(err) => { - if err.kind() == std::io::ErrorKind::UnexpectedEof && self.dir_stack.len() == 0 { - break; - } + let mut path = std::path::PathBuf::from(prefix); + path.push(std::ffi::OsString::from_vec(name)); - return Err(err.into()); - } - }; - if etype == b'}' { - if self.dir_stack.pop().is_none() { - bail!("got unexpected '}'"); - } - break; - } - - let etype = CatalogEntryType::try_from(etype)?; match etype { CatalogEntryType::Directory => { - let filename = self.read_filename()?; - self.print_entry(etype.into(), &filename, 0, 0)?; - self.dir_stack.push(filename); - self.expect_next(b'{')?; - self.parse_entries()?; + println!("{} {:?}", char::from(etype as u8), path); + let offset = unsafe { cursor.read_le_value::()? }; + if offset > start { + bail!("got wrong directory offset ({} > {})", offset, start); + } + let pos = start - offset; + self.dump_dir(&path, pos)?; } CatalogEntryType::File => { - let size = unsafe { self.reader.read_le_value::()? }; - let mtime = unsafe { self.reader.read_le_value::()? }; - let filename = self.read_filename()?; - self.print_entry(etype.into(), &filename, size, mtime)?; + let size = unsafe { cursor.read_le_value::()? }; + let mtime = unsafe { cursor.read_le_value::()? }; + + let dt = Local.timestamp(mtime as i64, 0); + + println!("{} {:?} {} {}", + char::from(etype as u8), + path, + size, + dt.to_rfc3339_opts(chrono::SecondsFormat::Secs, false) + ); } - CatalogEntryType::Symlink | - CatalogEntryType::Hardlink | - CatalogEntryType::Fifo | - CatalogEntryType::Socket | - CatalogEntryType::BlockDevice | - CatalogEntryType::CharDevice => { - let filename = self.read_filename()?; - self.print_entry(etype.into(), &filename, 0, 0)?; + _ => { + println!("{} {:?}", char::from(etype as u8), path); } } } + Ok(()) } - pub fn dump(&mut self) -> Result<(), Error> { - self.parse_entries()?; - Ok(()) - } } diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 38bdb26e..1fd2c54a 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -8,7 +8,7 @@ use chrono::{Local, Utc, TimeZone}; use std::path::{Path, PathBuf}; use std::collections::{HashSet, HashMap}; use std::ffi::OsStr; -use std::io::{BufReader, Read, Write, Seek, SeekFrom}; +use std::io::{Read, Write, Seek, SeekFrom}; use std::os::unix::fs::OpenOptionsExt; use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size}; @@ -188,7 +188,7 @@ async fn backup_directory>( verbose: bool, skip_lost_and_found: bool, crypt_config: Option>, - catalog: Arc>>, + catalog: Arc>>, ) -> Result { let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found, catalog)?; @@ -485,23 +485,41 @@ fn dump_catalog( true, ).await?; - let manifest = client.download_manifest().await?; - - let blob_file = std::fs::OpenOptions::new() - .read(true) + let tmpfile = std::fs::OpenOptions::new() .write(true) + .read(true) .custom_flags(libc::O_TMPFILE) .open("/tmp")?; - let mut blob_file = client.download(CATALOG_BLOB_NAME, blob_file).await?; + let manifest = client.download_manifest().await?; - let (csum, size) = compute_file_csum(&mut blob_file)?; - manifest.verify_file(CATALOG_BLOB_NAME, &csum, size)?; + let tmpfile = client.download(CATALOG_NAME, tmpfile).await?; - blob_file.seek(SeekFrom::Start(0))?; + let index = DynamicIndexReader::new(tmpfile) + .map_err(|err| format_err!("unable to read catalog index - {}", err))?; - let reader = BufReader::new(blob_file); - let mut catalog_reader = CatalogBlobReader::new(reader, crypt_config)?; + // Note: do not use values stored in index (not trusted) - instead, computed them again + let (csum, size) = index.compute_csum(); + manifest.verify_file(CATALOG_NAME, &csum, size)?; + + let most_used = index.find_most_used_chunks(8); + + let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used); + + let mut reader = BufferedDynamicReader::new(index, chunk_reader); + + let mut catalogfile = std::fs::OpenOptions::new() + .write(true) + .read(true) + .custom_flags(libc::O_TMPFILE) + .open("/tmp")?; + + std::io::copy(&mut reader, &mut catalogfile) + .map_err(|err| format_err!("unable to download catalog - {}", err))?; + + catalogfile.seek(SeekFrom::Start(0))?; + + let mut catalog_reader = CatalogReader::new(catalogfile); catalog_reader.dump()?; @@ -584,6 +602,40 @@ fn parse_backupspec(value: &str) -> Result<(&str, &str), Error> { bail!("unable to parse directory specification '{}'", value); } +fn spawn_catalog_upload( + client: Arc, + crypt_config: Option>, +) -> Result< + ( + Arc>>, + tokio::sync::oneshot::Receiver> + ), Error> +{ + let (catalog_tx, catalog_rx) = mpsc::channel(10); // allow to buffer 10 writes + let catalog_stream = catalog_rx.map_err(Error::from); + let catalog_chunk_size = 512*1024; + let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); + + let catalog = Arc::new(Mutex::new(CatalogWriter::new(SenderWriter::new(catalog_tx))?)); + + let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + let catalog_upload_result = client + .upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config) + .await; + + if let Err(ref err) = catalog_upload_result { + eprintln!("catalog upload error - {}", err); + client.cancel(); + } + + let _ = catalog_result_tx.send(catalog_upload_result); + }); + + Ok((catalog, catalog_result_rx)) +} + fn create_backup( param: Value, _info: &ApiMethod, @@ -637,6 +689,8 @@ fn create_backup( enum BackupType { PXAR, IMAGE, CONFIG, LOGFILE }; + let mut upload_catalog = false; + for backupspec in backupspec_list { let (target, filename) = parse_backupspec(backupspec.as_str().unwrap())?; @@ -655,6 +709,7 @@ fn create_backup( bail!("got unexpected file type (expected directory)"); } upload_list.push((BackupType::PXAR, filename.to_owned(), format!("{}.didx", target), 0)); + upload_catalog = true; } "img" => { @@ -731,15 +786,7 @@ fn create_backup( let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp()); let mut manifest = BackupManifest::new(snapshot); - // fixme: encrypt/sign catalog? - let catalog_file = std::fs::OpenOptions::new() - .write(true) - .read(true) - .custom_flags(libc::O_TMPFILE) - .open("/tmp")?; - - let catalog = Arc::new(Mutex::new(CatalogBlobWriter::new_compressed(catalog_file)?)); - let mut upload_catalog = false; + let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?; for (backup_type, filename, target, size) in upload_list { match backup_type { @@ -758,7 +805,6 @@ fn create_backup( manifest.add_file(target, stats.size, stats.csum); } BackupType::PXAR => { - upload_catalog = true; println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; let stats = backup_directory( @@ -795,14 +841,15 @@ fn create_backup( if upload_catalog { let mutex = Arc::try_unwrap(catalog) .map_err(|_| format_err!("unable to get catalog (still used)"))?; - let mut catalog_file = mutex.into_inner().unwrap().finish()?; + let mut catalog = mutex.into_inner().unwrap(); - let target = CATALOG_BLOB_NAME; + catalog.finish()?; - catalog_file.seek(SeekFrom::Start(0))?; + drop(catalog); // close upload stream - let stats = client.upload_blob(catalog_file, target).await?; - manifest.add_file(target.to_owned(), stats.size, stats.csum); + let stats = catalog_result_rx.await??; + + manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum); } if let Some(rsa_encrypted_key) = rsa_encrypted_key { diff --git a/src/client/pxar_backup_stream.rs b/src/client/pxar_backup_stream.rs index 62994366..8a1abd03 100644 --- a/src/client/pxar_backup_stream.rs +++ b/src/client/pxar_backup_stream.rs @@ -1,5 +1,5 @@ use std::collections::HashSet; -use std::io::{Seek, Write}; +use std::io::Write; use std::os::unix::io::FromRawFd; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -15,7 +15,7 @@ use nix::sys::stat::Mode; use nix::dir::Dir; use crate::pxar; -use crate::backup::CatalogBlobWriter; +use crate::backup::CatalogWriter; use crate::tools::wrapped_reader_stream::WrappedReaderStream; @@ -41,13 +41,13 @@ impl Drop for PxarBackupStream { impl PxarBackupStream { pin_utils::unsafe_pinned!(stream: Option>); - pub fn new( + pub fn new( mut dir: Dir, path: PathBuf, device_set: Option>, verbose: bool, skip_lost_and_found: bool, - catalog: Arc>>, + catalog: Arc>>, ) -> Result { let (rx, tx) = nix::unistd::pipe()?; @@ -89,12 +89,12 @@ impl PxarBackupStream { }) } - pub fn open( + pub fn open( dirname: &Path, device_set: Option>, verbose: bool, skip_lost_and_found: bool, - catalog: Arc>>, + catalog: Arc>>, ) -> Result { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;