src/backup/archive_index.rs: use close() instead of flush()

Also pass a reference to the encoder.
This commit is contained in:
Dietmar Maurer 2019-01-02 11:02:56 +01:00
parent 94a882e900
commit 5e7a09be0d
5 changed files with 69 additions and 61 deletions

View File

@ -23,6 +23,7 @@ pub struct ArchiveIndexWriter<'a> {
store: &'a ChunkStore, store: &'a ChunkStore,
chunker: Chunker, chunker: Chunker,
file: File, file: File,
closed: bool,
filename: PathBuf, filename: PathBuf,
tmp_filename: PathBuf, tmp_filename: PathBuf,
uuid: [u8; 16], uuid: [u8; 16],
@ -71,6 +72,7 @@ impl <'a> ArchiveIndexWriter<'a> {
store, store,
chunker: Chunker::new(chunk_size), chunker: Chunker::new(chunk_size),
file: file, file: file,
closed: false,
filename: full_path, filename: full_path,
tmp_filename: tmp_path, tmp_filename: tmp_path,
ctime, ctime,
@ -81,6 +83,59 @@ impl <'a> ArchiveIndexWriter<'a> {
chunk_buffer: Vec::with_capacity(chunk_size*4), chunk_buffer: Vec::with_capacity(chunk_size*4),
}) })
} }
pub fn close(&mut self) -> Result<(), Error> {
if self.closed {
bail!("cannot close already closed archive index file {:?}", self.filename);
}
self.closed = true;
self.write_chunk_buffer()?;
self.file.sync_all()?;
// fixme:
if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
bail!("Atomic rename file {:?} failed - {}", self.filename, err);
}
Ok(())
}
fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> {
use std::io::{Error, ErrorKind};
let chunk_size = self.chunk_buffer.len();
if chunk_size == 0 { return Ok(()); }
let expected_chunk_size = self.chunk_offset - self.last_chunk;
if expected_chunk_size != self.chunk_buffer.len() {
return Err(Error::new(
ErrorKind::Other,
format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size)));
}
self.last_chunk = self.chunk_offset;
match self.store.insert_chunk(&self.chunk_buffer) {
Ok((is_duplicate, digest)) => {
println!("ADD CHUNK {} {} {} {}", self.chunk_offset, chunk_size, is_duplicate, digest_to_hex(&digest));
self.chunk_buffer.truncate(0);
return Ok(());
}
Err(err) => {
self.chunk_buffer.truncate(0);
return Err(Error::new(ErrorKind::Other, err.to_string()));
}
}
Ok(())
}
} }
impl <'a> Write for ArchiveIndexWriter<'a> { impl <'a> Write for ArchiveIndexWriter<'a> {
@ -97,32 +152,13 @@ impl <'a> Write for ArchiveIndexWriter<'a> {
self.chunk_buffer.extend(&data[0..pos]); self.chunk_buffer.extend(&data[0..pos]);
self.chunk_offset += pos; self.chunk_offset += pos;
let chunk_size = self.chunk_buffer.len(); self.write_chunk_buffer()?;
Ok(pos)
let expected_chunk_size = self.chunk_offset - self.last_chunk;
if expected_chunk_size != self.chunk_buffer.len() {
panic!("wrong chunk size {} != {}",
expected_chunk_size, chunk_size);
}
self.last_chunk = self.chunk_offset;
match self.store.insert_chunk(&self.chunk_buffer) {
Ok((is_duplicate, digest)) => {
println!("ADD CHUNK {} {} {} {}", self.chunk_offset, chunk_size, is_duplicate, digest_to_hex(&digest));
self.chunk_buffer.truncate(0);
return Ok(pos);
}
Err(err) => {
self.chunk_buffer.truncate(0);
return Err(Error::new(ErrorKind::Other, err.to_string()));
}
}
} else { } else {
self.chunk_offset += data.len(); self.chunk_offset += data.len();
self.chunk_buffer.extend(data); self.chunk_buffer.extend(data);
return Ok(data.len()); Ok(data.len())
} }
} }
@ -130,21 +166,6 @@ impl <'a> Write for ArchiveIndexWriter<'a> {
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
let chunk_size = self.chunk_buffer.len(); Err(Error::new(ErrorKind::Other, "please use close() instead of flush()"))
if chunk_size == 0 { return Ok(()); }
// fixme: finalize index, disable further writes
match self.store.insert_chunk(&self.chunk_buffer) {
Ok((is_duplicate, digest)) => {
println!("ADD LAST CHUNK {} {} {} {}", self.last_chunk, chunk_size, is_duplicate, digest_to_hex(&digest));
self.chunk_buffer.truncate(0);
Ok(())
}
Err(err) => {
self.chunk_buffer.truncate(0);
Err(Error::new(ErrorKind::Other, err.to_string()))
}
}
} }
} }

View File

@ -24,9 +24,6 @@ pub struct Chunker {
discriminator: u32, discriminator: u32,
window: [u8; CA_CHUNKER_WINDOW_SIZE], window: [u8; CA_CHUNKER_WINDOW_SIZE],
offset: usize, // only used for debug
last_offset: usize, // only used for debug
} }
const BUZHASH_TABLE: [u32; 256] = [ const BUZHASH_TABLE: [u32; 256] = [
@ -122,8 +119,6 @@ impl Chunker {
chunk_size_avg: chunk_size_avg, chunk_size_avg: chunk_size_avg,
discriminator: discriminator, discriminator: discriminator,
window: [0u8; CA_CHUNKER_WINDOW_SIZE], window: [0u8; CA_CHUNKER_WINDOW_SIZE],
offset: 0,
last_offset: 0,
} }
} }

View File

@ -36,18 +36,13 @@ fn backup_dir(
target.set_extension("aidx"); target.set_extension("aidx");
} }
// fixme: implement chunked writer let mut index = datastore.create_archive_writer(&target, chunk_size)?;
// let writer = std::fs::OpenOptions::new()
// .create(true)
// .write(true)
// .truncate(true)
// .open("mytest.catar")?;
let index = datastore.create_archive_writer(&target, chunk_size)?;
let path = std::path::PathBuf::from(path); let path = std::path::PathBuf::from(path);
CaTarEncoder::encode(path, dir, index)?; CaTarEncoder::encode(path, dir, &mut index)?;
index.close()?; // commit changes
Ok(()) Ok(())
} }

View File

@ -26,18 +26,17 @@ use nix::sys::stat::FileStat;
/// maximum memory usage. /// maximum memory usage.
pub const MAX_DIRECTORY_ENTRIES: usize = 256*1024; pub const MAX_DIRECTORY_ENTRIES: usize = 256*1024;
pub struct CaTarEncoder<W: Write> { pub struct CaTarEncoder<'a, W: Write> {
current_path: PathBuf, // used for error reporting current_path: PathBuf, // used for error reporting
writer: W, writer: &'a mut W,
writer_pos: usize, writer_pos: usize,
size: usize, size: usize,
file_copy_buffer: Vec<u8>, file_copy_buffer: Vec<u8>,
} }
impl <'a, W: Write> CaTarEncoder<'a, W> {
impl <W: Write> CaTarEncoder<W> { pub fn encode(path: PathBuf, dir: &mut nix::dir::Dir, writer: &'a mut W) -> Result<(), Error> {
pub fn encode(path: PathBuf, dir: &mut nix::dir::Dir, writer: W) -> Result<(), Error> {
const FILE_COPY_BUFFER_SIZE: usize = 1024*1024; const FILE_COPY_BUFFER_SIZE: usize = 1024*1024;
@ -55,8 +54,6 @@ impl <W: Write> CaTarEncoder<W> {
// todo: use scandirat?? // todo: use scandirat??
me.encode_dir(dir)?; me.encode_dir(dir)?;
me.writer.flush()?;
Ok(()) Ok(())
} }

View File

@ -14,7 +14,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
.status() .status()
.expect("failed to execute casync"); .expect("failed to execute casync");
let writer = std::fs::OpenOptions::new() let mut writer = std::fs::OpenOptions::new()
.create(true) .create(true)
.write(true) .write(true)
.truncate(true) .truncate(true)
@ -26,7 +26,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
let path = std::path::PathBuf::from(dir_name); let path = std::path::PathBuf::from(dir_name);
CaTarEncoder::encode(path, &mut dir, writer)?; CaTarEncoder::encode(path, &mut dir, &mut writer)?;
Command::new("cmp") Command::new("cmp")
.arg("--verbose") .arg("--verbose")