diff --git a/src/api2/admin/datastore/backup.rs b/src/api2/admin/datastore/backup.rs index e2bbfe53..fd2fcfd6 100644 --- a/src/api2/admin/datastore/backup.rs +++ b/src/api2/admin/datastore/backup.rs @@ -215,9 +215,7 @@ fn create_dynamic_index( let mut path = env.backup_dir.relative_path(); path.push(archive_name); - let chunk_size = 4096*1024; // todo: ?? - - let index = env.datastore.create_dynamic_writer(&path, chunk_size)?; + let index = env.datastore.create_dynamic_writer(&path)?; let wid = env.register_dynamic_writer(index, name)?; env.log(format!("created new dynamic index {} ({:?})", wid, path)); diff --git a/src/api2/admin/datastore/pxar.rs b/src/api2/admin/datastore/pxar.rs index d7cf821c..f2c46bf4 100644 --- a/src/api2/admin/datastore/pxar.rs +++ b/src/api2/admin/datastore/pxar.rs @@ -20,7 +20,7 @@ use hyper::http::request::Parts; pub struct UploadPxar { stream: Body, - index: DynamicIndexWriter, + index: DynamicChunkWriter, count: usize, } @@ -88,7 +88,8 @@ fn upload_pxar( path.push(archive_name); - let index = datastore.create_dynamic_writer(path, chunk_size as usize)?; + let index = datastore.create_dynamic_writer(path)?; + let index = DynamicChunkWriter::new(index, chunk_size as usize); let upload = UploadPxar { stream: req_body, index, count: 0}; diff --git a/src/api2/admin/datastore/upload.rs b/src/api2/admin/datastore/upload.rs index d5b50a15..58bf26bb 100644 --- a/src/api2/admin/datastore/upload.rs +++ b/src/api2/admin/datastore/upload.rs @@ -153,7 +153,7 @@ impl pmx_server::HandleBackup for BackupHandler { None => { path_str.push_str(".didx"); let path = PathBuf::from(path_str.as_str()); - let writer = self.store.create_dynamic_writer(path, chunk_size)?; + let writer = self.store.create_dynamic_writer(path)?; Ok(Box::new(DynamicFile { writer: Some(writer), path: path_str, diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs index 8696cc3a..f0c092b6 100644 --- a/src/backup/datastore.rs +++ b/src/backup/datastore.rs @@ -101,11 +101,10 @@ impl DataStore { pub fn create_dynamic_writer>( &self, filename: P, - chunk_size: usize ) -> Result { let index = DynamicIndexWriter::create( - self.chunk_store.clone(), filename.as_ref(), chunk_size)?; + self.chunk_store.clone(), filename.as_ref())?; Ok(index) } diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs index 2cb205de..11073fbf 100644 --- a/src/backup/dynamic_index.rs +++ b/src/backup/dynamic_index.rs @@ -7,7 +7,7 @@ use super::chunk_store::*; use proxmox_protocol::Chunker; use std::sync::Arc; -use std::io::{Read, Write, BufWriter}; +use std::io::{Write, BufWriter}; use std::fs::File; use std::path::{Path, PathBuf}; use std::os::unix::io::AsRawFd; @@ -364,23 +364,16 @@ impl std::io::Seek for BufferedDynamicReader { } } +/// Create dynamic index files (`.dixd`) pub struct DynamicIndexWriter { store: Arc, _lock: tools::ProcessLockSharedGuard, - - chunker: Chunker, writer: BufWriter, closed: bool, filename: PathBuf, tmp_filename: PathBuf, pub uuid: [u8; 16], pub ctime: u64, - - stat: ChunkStat, - - chunk_offset: usize, - last_chunk: usize, - chunk_buffer: Vec, } impl Drop for DynamicIndexWriter { @@ -392,7 +385,7 @@ impl Drop for DynamicIndexWriter { impl DynamicIndexWriter { - pub fn create(store: Arc, path: &Path, chunk_size: usize) -> Result { + pub fn create(store: Arc, path: &Path) -> Result { let shared_lock = store.try_shared_lock()?; @@ -431,22 +424,19 @@ impl DynamicIndexWriter { Ok(Self { store, _lock: shared_lock, - chunker: Chunker::new(chunk_size), writer: writer, closed: false, filename: full_path, tmp_filename: tmp_path, ctime, uuid: *uuid.as_bytes(), - - stat: ChunkStat::new(0), - - chunk_offset: 0, - last_chunk: 0, - chunk_buffer: Vec::with_capacity(chunk_size*4), }) } + pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> { + self.store.insert_chunk(chunk) + } + pub fn close(&mut self) -> Result<(), Error> { if self.closed { @@ -455,19 +445,8 @@ impl DynamicIndexWriter { self.closed = true; - self.write_chunk_buffer()?; - self.writer.flush()?; - self.stat.size = self.chunk_offset as u64; - - // add size of index file - self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::()) as u64; - - println!("STAT: {:?}", self.stat); - - // fixme: - if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { bail!("Atomic rename file {:?} failed - {}", self.filename, err); } @@ -475,13 +454,69 @@ impl DynamicIndexWriter { Ok(()) } + // fixme: rename to add_digest + pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> { + if self.closed { + bail!("cannot write to closed dynamic index file {:?}", self.filename); + } + self.writer.write(unsafe { &std::mem::transmute::(offset.to_le()) })?; + self.writer.write(digest)?; + Ok(()) + } +} + +/// Writer which splits a binary stream into dynamic sized chunks +/// +/// And store the resulting chunk list into the index file. +pub struct DynamicChunkWriter { + index: DynamicIndexWriter, + closed: bool, + chunker: Chunker, + stat: ChunkStat, + chunk_offset: usize, + last_chunk: usize, + chunk_buffer: Vec, +} + +impl DynamicChunkWriter { + + pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self { + Self { + index, + closed: false, + chunker: Chunker::new(chunk_size), + stat: ChunkStat::new(0), + chunk_offset: 0, + last_chunk: 0, + chunk_buffer: Vec::with_capacity(chunk_size*4), + } + } + pub fn stat(&self) -> &ChunkStat { &self.stat } - fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> { + pub fn close(&mut self) -> Result<(), Error> { - use std::io::{Error, ErrorKind}; + if self.closed { + return Ok(()); + } + + self.closed = true; + + self.write_chunk_buffer()?; + + self.index.close()?; + + self.stat.size = self.chunk_offset as u64; + + // add size of index file + self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::()) as u64; + + Ok(()) + } + + fn write_chunk_buffer(&mut self) -> Result<(), Error> { let chunk_size = self.chunk_buffer.len(); @@ -489,16 +524,14 @@ impl DynamicIndexWriter { let expected_chunk_size = self.chunk_offset - self.last_chunk; if expected_chunk_size != self.chunk_buffer.len() { - return Err(Error::new( - ErrorKind::Other, - format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size))); + bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size); } self.stat.chunk_count += 1; self.last_chunk = self.chunk_offset; - match self.store.insert_chunk(&self.chunk_buffer) { + match self.index.insert_chunk(&self.chunk_buffer) { Ok((is_duplicate, digest, compressed_size)) => { self.stat.compressed_size += compressed_size; @@ -510,25 +543,19 @@ impl DynamicIndexWriter { println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size, (compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(&digest)); - self.add_chunk(self.chunk_offset as u64, &digest)?; + self.index.add_chunk(self.chunk_offset as u64, &digest)?; self.chunk_buffer.truncate(0); return Ok(()); } Err(err) => { self.chunk_buffer.truncate(0); - return Err(Error::new(ErrorKind::Other, err.to_string())); + return Err(err); } } } - - pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), std::io::Error> { - self.writer.write(unsafe { &std::mem::transmute::(offset.to_le()) })?; - self.writer.write(digest)?; - Ok(()) - } } -impl Write for DynamicIndexWriter { +impl Write for DynamicChunkWriter { fn write(&mut self, data: &[u8]) -> std::result::Result { @@ -540,7 +567,9 @@ impl Write for DynamicIndexWriter { self.chunk_buffer.extend(&data[0..pos]); self.chunk_offset += pos; - self.write_chunk_buffer()?; + if let Err(err) = self.write_chunk_buffer() { + return Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string())); + } Ok(pos) } else { @@ -551,9 +580,6 @@ impl Write for DynamicIndexWriter { } fn flush(&mut self) -> std::result::Result<(), std::io::Error> { - - use std::io::{Error, ErrorKind}; - - Err(Error::new(ErrorKind::Other, "please use close() instead of flush()")) + Err(std::io::Error::new(std::io::ErrorKind::Other, "please use close() instead of flush()")) } }