diff --git a/src/backup/archive_index.rs b/src/backup/archive_index.rs index 8a5a89cf..b6b7542c 100644 --- a/src/backup/archive_index.rs +++ b/src/backup/archive_index.rs @@ -23,6 +23,7 @@ pub struct ArchiveIndexWriter<'a> { store: &'a ChunkStore, chunker: Chunker, file: File, + closed: bool, filename: PathBuf, tmp_filename: PathBuf, uuid: [u8; 16], @@ -71,6 +72,7 @@ impl <'a> ArchiveIndexWriter<'a> { store, chunker: Chunker::new(chunk_size), file: file, + closed: false, filename: full_path, tmp_filename: tmp_path, ctime, @@ -81,6 +83,59 @@ impl <'a> ArchiveIndexWriter<'a> { chunk_buffer: Vec::with_capacity(chunk_size*4), }) } + + pub fn close(&mut self) -> Result<(), Error> { + + if self.closed { + bail!("cannot close already closed archive index file {:?}", self.filename); + } + + self.closed = true; + + self.write_chunk_buffer()?; + + self.file.sync_all()?; + + // fixme: + + if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { + bail!("Atomic rename file {:?} failed - {}", self.filename, err); + } + + Ok(()) + } + + fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> { + + use std::io::{Error, ErrorKind}; + + let chunk_size = self.chunk_buffer.len(); + + if chunk_size == 0 { return Ok(()); } + + let expected_chunk_size = self.chunk_offset - self.last_chunk; + if expected_chunk_size != self.chunk_buffer.len() { + return Err(Error::new( + ErrorKind::Other, + format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size))); + } + + self.last_chunk = self.chunk_offset; + + match self.store.insert_chunk(&self.chunk_buffer) { + Ok((is_duplicate, digest)) => { + println!("ADD CHUNK {} {} {} {}", self.chunk_offset, chunk_size, is_duplicate, digest_to_hex(&digest)); + self.chunk_buffer.truncate(0); + return Ok(()); + } + Err(err) => { + self.chunk_buffer.truncate(0); + return Err(Error::new(ErrorKind::Other, err.to_string())); + } + } + + Ok(()) + } } impl <'a> Write for ArchiveIndexWriter<'a> { @@ -97,32 +152,13 @@ impl <'a> Write for ArchiveIndexWriter<'a> { self.chunk_buffer.extend(&data[0..pos]); self.chunk_offset += pos; - let chunk_size = self.chunk_buffer.len(); - - let expected_chunk_size = self.chunk_offset - self.last_chunk; - if expected_chunk_size != self.chunk_buffer.len() { - panic!("wrong chunk size {} != {}", - expected_chunk_size, chunk_size); - } - - self.last_chunk = self.chunk_offset; - - match self.store.insert_chunk(&self.chunk_buffer) { - Ok((is_duplicate, digest)) => { - println!("ADD CHUNK {} {} {} {}", self.chunk_offset, chunk_size, is_duplicate, digest_to_hex(&digest)); - self.chunk_buffer.truncate(0); - return Ok(pos); - } - Err(err) => { - self.chunk_buffer.truncate(0); - return Err(Error::new(ErrorKind::Other, err.to_string())); - } - } + self.write_chunk_buffer()?; + Ok(pos) } else { self.chunk_offset += data.len(); self.chunk_buffer.extend(data); - return Ok(data.len()); + Ok(data.len()) } } @@ -130,21 +166,6 @@ impl <'a> Write for ArchiveIndexWriter<'a> { use std::io::{Error, ErrorKind}; - let chunk_size = self.chunk_buffer.len(); - - if chunk_size == 0 { return Ok(()); } - - // fixme: finalize index, disable further writes - match self.store.insert_chunk(&self.chunk_buffer) { - Ok((is_duplicate, digest)) => { - println!("ADD LAST CHUNK {} {} {} {}", self.last_chunk, chunk_size, is_duplicate, digest_to_hex(&digest)); - self.chunk_buffer.truncate(0); - Ok(()) - } - Err(err) => { - self.chunk_buffer.truncate(0); - Err(Error::new(ErrorKind::Other, err.to_string())) - } - } + Err(Error::new(ErrorKind::Other, "please use close() instead of flush()")) } } diff --git a/src/backup/chunker.rs b/src/backup/chunker.rs index f652deb0..eee2dd79 100644 --- a/src/backup/chunker.rs +++ b/src/backup/chunker.rs @@ -24,9 +24,6 @@ pub struct Chunker { discriminator: u32, window: [u8; CA_CHUNKER_WINDOW_SIZE], - - offset: usize, // only used for debug - last_offset: usize, // only used for debug } const BUZHASH_TABLE: [u32; 256] = [ @@ -122,8 +119,6 @@ impl Chunker { chunk_size_avg: chunk_size_avg, discriminator: discriminator, window: [0u8; CA_CHUNKER_WINDOW_SIZE], - offset: 0, - last_offset: 0, } } diff --git a/src/bin/backup-client.rs b/src/bin/backup-client.rs index d807f46e..5c728aa5 100644 --- a/src/bin/backup-client.rs +++ b/src/bin/backup-client.rs @@ -36,18 +36,13 @@ fn backup_dir( target.set_extension("aidx"); } - // fixme: implement chunked writer - // let writer = std::fs::OpenOptions::new() - // .create(true) - // .write(true) - // .truncate(true) - // .open("mytest.catar")?; - - let index = datastore.create_archive_writer(&target, chunk_size)?; + let mut index = datastore.create_archive_writer(&target, chunk_size)?; let path = std::path::PathBuf::from(path); - CaTarEncoder::encode(path, dir, index)?; + CaTarEncoder::encode(path, dir, &mut index)?; + + index.close()?; // commit changes Ok(()) } diff --git a/src/catar/encoder.rs b/src/catar/encoder.rs index df8cac68..0e4abcc6 100644 --- a/src/catar/encoder.rs +++ b/src/catar/encoder.rs @@ -26,18 +26,17 @@ use nix::sys::stat::FileStat; /// maximum memory usage. pub const MAX_DIRECTORY_ENTRIES: usize = 256*1024; -pub struct CaTarEncoder { +pub struct CaTarEncoder<'a, W: Write> { current_path: PathBuf, // used for error reporting - writer: W, + writer: &'a mut W, writer_pos: usize, size: usize, file_copy_buffer: Vec, } +impl <'a, W: Write> CaTarEncoder<'a, W> { -impl CaTarEncoder { - - pub fn encode(path: PathBuf, dir: &mut nix::dir::Dir, writer: W) -> Result<(), Error> { + pub fn encode(path: PathBuf, dir: &mut nix::dir::Dir, writer: &'a mut W) -> Result<(), Error> { const FILE_COPY_BUFFER_SIZE: usize = 1024*1024; @@ -55,8 +54,6 @@ impl CaTarEncoder { // todo: use scandirat?? me.encode_dir(dir)?; - me.writer.flush()?; - Ok(()) } diff --git a/tests/catar.rs b/tests/catar.rs index 6b274ce0..cca9b94f 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -14,7 +14,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { .status() .expect("failed to execute casync"); - let writer = std::fs::OpenOptions::new() + let mut writer = std::fs::OpenOptions::new() .create(true) .write(true) .truncate(true) @@ -26,7 +26,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { let path = std::path::PathBuf::from(dir_name); - CaTarEncoder::encode(path, &mut dir, writer)?; + CaTarEncoder::encode(path, &mut dir, &mut writer)?; Command::new("cmp") .arg("--verbose")