src/backup/dynamic_index.rs: split class DynamicIndexWriter

This commit is contained in:
Dietmar Maurer 2019-05-29 08:49:57 +02:00
parent cb0708dd46
commit 976595e1a9
5 changed files with 80 additions and 56 deletions

View File

@ -215,9 +215,7 @@ fn create_dynamic_index(
let mut path = env.backup_dir.relative_path(); let mut path = env.backup_dir.relative_path();
path.push(archive_name); path.push(archive_name);
let chunk_size = 4096*1024; // todo: ?? let index = env.datastore.create_dynamic_writer(&path)?;
let index = env.datastore.create_dynamic_writer(&path, chunk_size)?;
let wid = env.register_dynamic_writer(index, name)?; let wid = env.register_dynamic_writer(index, name)?;
env.log(format!("created new dynamic index {} ({:?})", wid, path)); env.log(format!("created new dynamic index {} ({:?})", wid, path));

View File

@ -20,7 +20,7 @@ use hyper::http::request::Parts;
pub struct UploadPxar { pub struct UploadPxar {
stream: Body, stream: Body,
index: DynamicIndexWriter, index: DynamicChunkWriter,
count: usize, count: usize,
} }
@ -88,7 +88,8 @@ fn upload_pxar(
path.push(archive_name); path.push(archive_name);
let index = datastore.create_dynamic_writer(path, chunk_size as usize)?; let index = datastore.create_dynamic_writer(path)?;
let index = DynamicChunkWriter::new(index, chunk_size as usize);
let upload = UploadPxar { stream: req_body, index, count: 0}; let upload = UploadPxar { stream: req_body, index, count: 0};

View File

@ -153,7 +153,7 @@ impl pmx_server::HandleBackup for BackupHandler {
None => { None => {
path_str.push_str(".didx"); path_str.push_str(".didx");
let path = PathBuf::from(path_str.as_str()); let path = PathBuf::from(path_str.as_str());
let writer = self.store.create_dynamic_writer(path, chunk_size)?; let writer = self.store.create_dynamic_writer(path)?;
Ok(Box::new(DynamicFile { Ok(Box::new(DynamicFile {
writer: Some(writer), writer: Some(writer),
path: path_str, path: path_str,

View File

@ -101,11 +101,10 @@ impl DataStore {
pub fn create_dynamic_writer<P: AsRef<Path>>( pub fn create_dynamic_writer<P: AsRef<Path>>(
&self, filename: P, &self, filename: P,
chunk_size: usize
) -> Result<DynamicIndexWriter, Error> { ) -> Result<DynamicIndexWriter, Error> {
let index = DynamicIndexWriter::create( let index = DynamicIndexWriter::create(
self.chunk_store.clone(), filename.as_ref(), chunk_size)?; self.chunk_store.clone(), filename.as_ref())?;
Ok(index) Ok(index)
} }

View File

@ -7,7 +7,7 @@ use super::chunk_store::*;
use proxmox_protocol::Chunker; use proxmox_protocol::Chunker;
use std::sync::Arc; use std::sync::Arc;
use std::io::{Read, Write, BufWriter}; use std::io::{Write, BufWriter};
use std::fs::File; use std::fs::File;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
@ -364,23 +364,16 @@ impl std::io::Seek for BufferedDynamicReader {
} }
} }
/// Create dynamic index files (`.dixd`)
pub struct DynamicIndexWriter { pub struct DynamicIndexWriter {
store: Arc<ChunkStore>, store: Arc<ChunkStore>,
_lock: tools::ProcessLockSharedGuard, _lock: tools::ProcessLockSharedGuard,
chunker: Chunker,
writer: BufWriter<File>, writer: BufWriter<File>,
closed: bool, closed: bool,
filename: PathBuf, filename: PathBuf,
tmp_filename: PathBuf, tmp_filename: PathBuf,
pub uuid: [u8; 16], pub uuid: [u8; 16],
pub ctime: u64, pub ctime: u64,
stat: ChunkStat,
chunk_offset: usize,
last_chunk: usize,
chunk_buffer: Vec<u8>,
} }
impl Drop for DynamicIndexWriter { impl Drop for DynamicIndexWriter {
@ -392,7 +385,7 @@ impl Drop for DynamicIndexWriter {
impl DynamicIndexWriter { impl DynamicIndexWriter {
pub fn create(store: Arc<ChunkStore>, path: &Path, chunk_size: usize) -> Result<Self, Error> { pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
let shared_lock = store.try_shared_lock()?; let shared_lock = store.try_shared_lock()?;
@ -431,22 +424,19 @@ impl DynamicIndexWriter {
Ok(Self { Ok(Self {
store, store,
_lock: shared_lock, _lock: shared_lock,
chunker: Chunker::new(chunk_size),
writer: writer, writer: writer,
closed: false, closed: false,
filename: full_path, filename: full_path,
tmp_filename: tmp_path, tmp_filename: tmp_path,
ctime, ctime,
uuid: *uuid.as_bytes(), uuid: *uuid.as_bytes(),
stat: ChunkStat::new(0),
chunk_offset: 0,
last_chunk: 0,
chunk_buffer: Vec::with_capacity(chunk_size*4),
}) })
} }
pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> {
self.store.insert_chunk(chunk)
}
pub fn close(&mut self) -> Result<(), Error> { pub fn close(&mut self) -> Result<(), Error> {
if self.closed { if self.closed {
@ -455,19 +445,8 @@ impl DynamicIndexWriter {
self.closed = true; self.closed = true;
self.write_chunk_buffer()?;
self.writer.flush()?; self.writer.flush()?;
self.stat.size = self.chunk_offset as u64;
// add size of index file
self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
println!("STAT: {:?}", self.stat);
// fixme:
if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) { if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
bail!("Atomic rename file {:?} failed - {}", self.filename, err); bail!("Atomic rename file {:?} failed - {}", self.filename, err);
} }
@ -475,13 +454,69 @@ impl DynamicIndexWriter {
Ok(()) Ok(())
} }
// fixme: rename to add_digest
pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), Error> {
if self.closed {
bail!("cannot write to closed dynamic index file {:?}", self.filename);
}
self.writer.write(unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) })?;
self.writer.write(digest)?;
Ok(())
}
}
/// Writer which splits a binary stream into dynamic sized chunks
///
/// And store the resulting chunk list into the index file.
pub struct DynamicChunkWriter {
index: DynamicIndexWriter,
closed: bool,
chunker: Chunker,
stat: ChunkStat,
chunk_offset: usize,
last_chunk: usize,
chunk_buffer: Vec<u8>,
}
impl DynamicChunkWriter {
pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
Self {
index,
closed: false,
chunker: Chunker::new(chunk_size),
stat: ChunkStat::new(0),
chunk_offset: 0,
last_chunk: 0,
chunk_buffer: Vec::with_capacity(chunk_size*4),
}
}
pub fn stat(&self) -> &ChunkStat { pub fn stat(&self) -> &ChunkStat {
&self.stat &self.stat
} }
fn write_chunk_buffer(&mut self) -> Result<(), std::io::Error> { pub fn close(&mut self) -> Result<(), Error> {
use std::io::{Error, ErrorKind}; if self.closed {
return Ok(());
}
self.closed = true;
self.write_chunk_buffer()?;
self.index.close()?;
self.stat.size = self.chunk_offset as u64;
// add size of index file
self.stat.size += (self.stat.chunk_count*40 + std::mem::size_of::<DynamicIndexHeader>()) as u64;
Ok(())
}
fn write_chunk_buffer(&mut self) -> Result<(), Error> {
let chunk_size = self.chunk_buffer.len(); let chunk_size = self.chunk_buffer.len();
@ -489,16 +524,14 @@ impl DynamicIndexWriter {
let expected_chunk_size = self.chunk_offset - self.last_chunk; let expected_chunk_size = self.chunk_offset - self.last_chunk;
if expected_chunk_size != self.chunk_buffer.len() { if expected_chunk_size != self.chunk_buffer.len() {
return Err(Error::new( bail!("wrong chunk size {} != {}", expected_chunk_size, chunk_size);
ErrorKind::Other,
format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size)));
} }
self.stat.chunk_count += 1; self.stat.chunk_count += 1;
self.last_chunk = self.chunk_offset; self.last_chunk = self.chunk_offset;
match self.store.insert_chunk(&self.chunk_buffer) { match self.index.insert_chunk(&self.chunk_buffer) {
Ok((is_duplicate, digest, compressed_size)) => { Ok((is_duplicate, digest, compressed_size)) => {
self.stat.compressed_size += compressed_size; self.stat.compressed_size += compressed_size;
@ -510,25 +543,19 @@ impl DynamicIndexWriter {
println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size, println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size,
(compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(&digest)); (compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(&digest));
self.add_chunk(self.chunk_offset as u64, &digest)?; self.index.add_chunk(self.chunk_offset as u64, &digest)?;
self.chunk_buffer.truncate(0); self.chunk_buffer.truncate(0);
return Ok(()); return Ok(());
} }
Err(err) => { Err(err) => {
self.chunk_buffer.truncate(0); self.chunk_buffer.truncate(0);
return Err(Error::new(ErrorKind::Other, err.to_string())); return Err(err);
} }
} }
} }
pub fn add_chunk(&mut self, offset: u64, digest: &[u8; 32]) -> Result<(), std::io::Error> {
self.writer.write(unsafe { &std::mem::transmute::<u64, [u8;8]>(offset.to_le()) })?;
self.writer.write(digest)?;
Ok(())
}
} }
impl Write for DynamicIndexWriter { impl Write for DynamicChunkWriter {
fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> { fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
@ -540,7 +567,9 @@ impl Write for DynamicIndexWriter {
self.chunk_buffer.extend(&data[0..pos]); self.chunk_buffer.extend(&data[0..pos]);
self.chunk_offset += pos; self.chunk_offset += pos;
self.write_chunk_buffer()?; if let Err(err) = self.write_chunk_buffer() {
return Err(std::io::Error::new(std::io::ErrorKind::Other, err.to_string()));
}
Ok(pos) Ok(pos)
} else { } else {
@ -551,9 +580,6 @@ impl Write for DynamicIndexWriter {
} }
fn flush(&mut self) -> std::result::Result<(), std::io::Error> { fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
Err(std::io::Error::new(std::io::ErrorKind::Other, "please use close() instead of flush()"))
use std::io::{Error, ErrorKind};
Err(Error::new(ErrorKind::Other, "please use close() instead of flush()"))
} }
} }