src/backup/dynamic_index.rs: introduce ReadChunk trait

This commit is contained in:
Dietmar Maurer 2019-06-28 16:35:00 +02:00
parent 09d7dc5024
commit d48a9955a5
2 changed files with 54 additions and 39 deletions

View File

@ -112,7 +112,9 @@ impl DataStore {
pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> { pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
let index = DynamicIndexReader::open(self.chunk_store.clone(), filename.as_ref())?; let full_path = self.chunk_store.relative_path(filename.as_ref());
let index = DynamicIndexReader::open(&full_path)?;
Ok(index) Ok(index)
} }
@ -257,6 +259,10 @@ impl DataStore {
Ok(()) Ok(())
} }
pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
self.chunk_store.chunk_path(digest)
}
pub fn insert_chunk( pub fn insert_chunk(
&self, &self,
chunk: &DataChunk, chunk: &DataChunk,

View File

@ -1,5 +1,6 @@
use failure::*; use failure::*;
use std::convert::TryInto; use std::convert::TryInto;
use std::io::{Seek, SeekFrom};
use crate::tools; use crate::tools;
use super::IndexFile; use super::IndexFile;
@ -23,7 +24,6 @@ use super::{DataChunk, DataChunkBuilder};
/// Header format definition for dynamic index files (`.dixd`) /// Header format definition for dynamic index files (`.dixd`)
#[repr(C)] #[repr(C)]
pub struct DynamicIndexHeader { pub struct DynamicIndexHeader {
/// The string `PROXMOX-DIDX`
pub magic: [u8; 8], pub magic: [u8; 8],
pub uuid: [u8; 16], pub uuid: [u8; 16],
pub ctime: u64, pub ctime: u64,
@ -34,10 +34,8 @@ pub struct DynamicIndexHeader {
pub struct DynamicIndexReader { pub struct DynamicIndexReader {
store: Arc<ChunkStore>,
_file: File, _file: File,
pub size: usize, pub size: usize,
filename: PathBuf,
index: *const u8, index: *const u8,
index_entries: usize, index_entries: usize,
pub uuid: [u8; 16], pub uuid: [u8; 16],
@ -53,50 +51,52 @@ impl Drop for DynamicIndexReader {
fn drop(&mut self) { fn drop(&mut self) {
if let Err(err) = self.unmap() { if let Err(err) = self.unmap() {
eprintln!("Unable to unmap file {:?} - {}", self.filename, err); eprintln!("Unable to unmap dynamic index - {}", err);
} }
} }
} }
impl DynamicIndexReader { impl DynamicIndexReader {
pub fn open(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> { pub fn open(path: &Path) -> Result<Self, Error> {
let full_path = store.relative_path(path); let file = std::fs::File::open(&path)?;
let mut file = std::fs::File::open(&full_path)?;
if let Err(err) = nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) { if let Err(err) = nix::fcntl::flock(file.as_raw_fd(), nix::fcntl::FlockArg::LockSharedNonblock) {
bail!("unable to get shared lock on {:?} - {}", full_path, err); bail!("unable to get shared lock on {:?} - {}", path, err);
} }
Self::new(file)
}
pub fn new(mut file: std::fs::File) -> Result<Self, Error> {
file.seek(SeekFrom::Start(0))?;
let header_size = std::mem::size_of::<DynamicIndexHeader>(); let header_size = std::mem::size_of::<DynamicIndexHeader>();
// todo: use static assertion when available in rust // todo: use static assertion when available in rust
if header_size != 4096 { bail!("got unexpected header size for {:?}", path); } if header_size != 4096 { bail!("got unexpected header size"); }
let buffer = file.read_exact_allocated(header_size)?; let buffer = file.read_exact_allocated(header_size)?;
let header = unsafe { &* (buffer.as_ptr() as *const DynamicIndexHeader) }; let header = unsafe { &* (buffer.as_ptr() as *const DynamicIndexHeader) };
if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 { if header.magic != super::DYNAMIC_SIZED_CHUNK_INDEX_1_0 {
bail!("got unknown magic number for {:?}", path); bail!("got unknown magic number");
} }
let ctime = u64::from_le(header.ctime); let ctime = u64::from_le(header.ctime);
let rawfd = file.as_raw_fd(); let rawfd = file.as_raw_fd();
let stat = match nix::sys::stat::fstat(rawfd) { let stat = nix::sys::stat::fstat(rawfd)?;
Ok(stat) => stat,
Err(err) => bail!("fstat {:?} failed - {}", path, err),
};
let size = stat.st_size as usize; let size = stat.st_size as usize;
let index_size = size - header_size; let index_size = size - header_size;
if (index_size % 40) != 0 { if (index_size % 40) != 0 {
bail!("got unexpected file size for {:?}", path); bail!("got unexpected file size");
} }
let data = unsafe { nix::sys::mman::mmap( let data = unsafe { nix::sys::mman::mmap(
@ -108,8 +108,6 @@ impl DynamicIndexReader {
header_size as i64) }? as *const u8; header_size as i64) }? as *const u8;
Ok(Self { Ok(Self {
store,
filename: full_path,
_file: file, _file: file,
size, size,
index: data, index: data,
@ -125,7 +123,7 @@ impl DynamicIndexReader {
if self.index == std::ptr::null_mut() { return Ok(()); } if self.index == std::ptr::null_mut() { return Ok(()); }
if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } { if let Err(err) = unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, self.index_entries*40) } {
bail!("unmap file {:?} failed - {}", self.filename, err); bail!("unmap dynamic index failed - {}", err);
} }
self.index = std::ptr::null_mut(); self.index = std::ptr::null_mut();
@ -167,6 +165,10 @@ impl DynamicIndexReader {
slice.try_into().unwrap() slice.try_into().unwrap()
} }
pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
unimplemented!();
}
/*
pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> { pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
for pos in 0..self.index_entries { for pos in 0..self.index_entries {
@ -196,6 +198,7 @@ impl DynamicIndexReader {
Ok(()) Ok(())
} }
*/
fn binary_search( fn binary_search(
&self, &self,
@ -240,7 +243,14 @@ impl IndexFile for DynamicIndexReader {
} }
} }
pub struct BufferedDynamicReader { /// The ReadChunk trait allows reading backup data chunks (local or remote)
pub trait ReadChunk {
/// Returns the decoded chunk data
fn read_chunk(&self, digest:&[u8; 32]) -> Result<Vec<u8>, Error>;
}
pub struct BufferedDynamicReader<S> {
store: S,
index: DynamicIndexReader, index: DynamicIndexReader,
archive_size: u64, archive_size: u64,
read_buffer: Vec<u8>, read_buffer: Vec<u8>,
@ -249,12 +259,13 @@ pub struct BufferedDynamicReader {
read_offset: u64, read_offset: u64,
} }
impl BufferedDynamicReader { impl <S: ReadChunk> BufferedDynamicReader<S> {
pub fn new(index: DynamicIndexReader) -> Self { pub fn new(index: DynamicIndexReader, store: S) -> Self {
let archive_size = index.chunk_end(index.index_entries - 1); let archive_size = index.chunk_end(index.index_entries - 1);
Self { Self {
store,
index: index, index: index,
archive_size: archive_size, archive_size: archive_size,
read_buffer: Vec::with_capacity(1024*1024), read_buffer: Vec::with_capacity(1024*1024),
@ -269,25 +280,28 @@ impl BufferedDynamicReader {
fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
let index = &self.index; let index = &self.index;
let end = index.chunk_end(idx); let (start, end, digest) = index.chunk_info(idx)?;
let digest = index.chunk_digest(idx);
let chunk = index.store.read_chunk(digest)?;
// fimxe: handle encrypted chunks
// fixme: avoid copy // fixme: avoid copy
let data = chunk.decode(None)?;
let data = self.store.read_chunk(&digest)?;
if (end - start) != data.len() as u64 {
bail!("read chunk with wrong size ({} != {}", (end - start), data.len());
}
self.read_buffer.clear(); self.read_buffer.clear();
self.read_buffer.extend_from_slice(&data); self.read_buffer.extend_from_slice(&data);
self.buffered_chunk_idx = idx; self.buffered_chunk_idx = idx;
self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
self.buffered_chunk_start = start as u64;
//println!("BUFFER {} {}", self.buffered_chunk_start, end); //println!("BUFFER {} {}", self.buffered_chunk_start, end);
Ok(()) Ok(())
} }
} }
impl crate::tools::BufferedRead for BufferedDynamicReader { impl <S: ReadChunk> crate::tools::BufferedRead for BufferedDynamicReader<S> {
fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
@ -323,10 +337,9 @@ impl crate::tools::BufferedRead for BufferedDynamicReader {
let buffer_offset = (offset - self.buffered_chunk_start) as usize; let buffer_offset = (offset - self.buffered_chunk_start) as usize;
Ok(&self.read_buffer[buffer_offset..]) Ok(&self.read_buffer[buffer_offset..])
} }
} }
impl std::io::Read for BufferedDynamicReader { impl <S: ReadChunk> std::io::Read for BufferedDynamicReader<S> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
@ -348,11 +361,9 @@ impl std::io::Read for BufferedDynamicReader {
} }
} }
impl std::io::Seek for BufferedDynamicReader { impl <S: ReadChunk> std::io::Seek for BufferedDynamicReader<S> {
fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64, std::io::Error> { fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
use std::io::{SeekFrom};
let new_offset = match pos { let new_offset = match pos {
SeekFrom::Start(start_offset) => start_offset as i64, SeekFrom::Start(start_offset) => start_offset as i64,
@ -461,10 +472,8 @@ impl DynamicIndexWriter {
self.writer.flush()?; self.writer.flush()?;
use std::io::Seek;
let csum_offset = proxmox::tools::offsetof!(DynamicIndexHeader, index_csum); let csum_offset = proxmox::tools::offsetof!(DynamicIndexHeader, index_csum);
self.writer.seek(std::io::SeekFrom::Start(csum_offset as u64))?; self.writer.seek(SeekFrom::Start(csum_offset as u64))?;
let csum = self.csum.take().unwrap(); let csum = self.csum.take().unwrap();
let index_csum = csum.finish(); let index_csum = csum.finish();