diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs index 1b58551a..ccea6485 100644 --- a/src/backup/fixed_index.rs +++ b/src/backup/fixed_index.rs @@ -1,4 +1,5 @@ use failure::*; +use std::io::{Seek, SeekFrom}; use crate::tools; use super::IndexFile; @@ -12,7 +13,9 @@ use std::path::{Path, PathBuf}; use std::os::unix::io::AsRawFd; use uuid::Uuid; use chrono::{Local, TimeZone}; + use super::ChunkInfo; +use super::read_chunk::*; /// Header format definition for fixed index files (`.fidx`) #[repr(C)] @@ -68,6 +71,8 @@ impl FixedIndexReader { bail!("unable to get shared lock - {}", err); } + file.seek(SeekFrom::Start(0))?; + let header_size = std::mem::size_of::(); // todo: use static assertion when available in rust @@ -136,6 +141,38 @@ impl FixedIndexReader { Ok(()) } + pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> { + + if pos >= self.index_length { + bail!("chunk index out of range"); + } + let start = (pos * self.chunk_size) as u64; + let mut end = start + self.chunk_size as u64; + + if end > self.size { + end = self.size; + } + + let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() }; + unsafe { std::ptr::copy_nonoverlapping(self.index.add(pos*32), digest.as_mut_ptr(), 32); } + + Ok((start, end, digest)) + } + + #[inline] + fn chunk_end(&self, pos: usize) -> u64 { + if pos >= self.index_length { + panic!("chunk index out of range"); + } + + let end = ((pos+1) * self.chunk_size) as u64; + if end > self.size { + self.size + } else { + end + } + } + pub fn print_info(&self) { println!("Size: {}", self.size); println!("ChunkSize: {}", self.chunk_size); @@ -284,10 +321,8 @@ impl FixedIndexWriter { self.unmap()?; - use std::io::Seek; - let csum_offset = proxmox::tools::offsetof!(FixedIndexHeader, index_csum); - self.file.seek(std::io::SeekFrom::Start(csum_offset as u64))?; + self.file.seek(SeekFrom::Start(csum_offset as u64))?; self.file.write_all(&index_csum)?; self.file.flush()?; @@ -367,3 +402,135 @@ impl FixedIndexWriter { Ok(()) } } + +pub struct BufferedFixedReader { + store: S, + index: FixedIndexReader, + archive_size: u64, + read_buffer: Vec, + buffered_chunk_idx: usize, + buffered_chunk_start: u64, + read_offset: u64, +} + +impl BufferedFixedReader { + + pub fn new(index: FixedIndexReader, store: S) -> Self { + + let archive_size = index.size; + Self { + store, + index: index, + archive_size: archive_size, + read_buffer: Vec::with_capacity(1024*1024), + buffered_chunk_idx: 0, + buffered_chunk_start: 0, + read_offset: 0, + } + } + + pub fn archive_size(&self) -> u64 { self.archive_size } + + fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { + + let index = &self.index; + let (start, end, digest) = index.chunk_info(idx)?; + + // fixme: avoid copy + + let data = self.store.read_chunk(&digest)?; + + if (end - start) != data.len() as u64 { + bail!("read chunk with wrong size ({} != {}", (end - start), data.len()); + } + + self.read_buffer.clear(); + self.read_buffer.extend_from_slice(&data); + + self.buffered_chunk_idx = idx; + + self.buffered_chunk_start = start as u64; + //println!("BUFFER {} {}", self.buffered_chunk_start, end); + Ok(()) + } +} + +impl crate::tools::BufferedRead for BufferedFixedReader { + + fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { + + if offset == self.archive_size { return Ok(&self.read_buffer[0..0]); } + + let buffer_len = self.read_buffer.len(); + let index = &self.index; + + // optimization for sequential read + if buffer_len > 0 && + ((self.buffered_chunk_idx + 1) < index.index_length) && + (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) + { + let next_idx = self.buffered_chunk_idx + 1; + let next_end = index.chunk_end(next_idx); + if offset < next_end { + self.buffer_chunk(next_idx)?; + let buffer_offset = (offset - self.buffered_chunk_start) as usize; + return Ok(&self.read_buffer[buffer_offset..]); + } + } + + if (buffer_len == 0) || + (offset < self.buffered_chunk_start) || + (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) + { + let idx = (offset / index.chunk_size as u64) as usize; + self.buffer_chunk(idx)?; + } + + let buffer_offset = (offset - self.buffered_chunk_start) as usize; + Ok(&self.read_buffer[buffer_offset..]) + } +} + +impl std::io::Read for BufferedFixedReader { + + fn read(&mut self, buf: &mut [u8]) -> Result { + + use std::io::{Error, ErrorKind}; + use crate::tools::BufferedRead; + + let data = match self.buffered_read(self.read_offset) { + Ok(v) => v, + Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), + }; + + let n = if data.len() > buf.len() { buf.len() } else { data.len() }; + + unsafe { std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); } + + self.read_offset += n as u64; + + return Ok(n); + } +} + +impl Seek for BufferedFixedReader { + + fn seek(&mut self, pos: SeekFrom) -> Result { + + let new_offset = match pos { + SeekFrom::Start(start_offset) => start_offset as i64, + SeekFrom::End(end_offset) => (self.archive_size as i64)+ end_offset, + SeekFrom::Current(offset) => (self.read_offset as i64) + offset, + }; + + use std::io::{Error, ErrorKind}; + if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { + return Err(Error::new( + ErrorKind::Other, + format!("seek is out of range {} ([0..{}])", new_offset, self.archive_size))); + } + self.read_offset = new_offset as u64; + + Ok(self.read_offset) + } +} diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 6e316b1b..447bc108 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -626,11 +626,16 @@ fn restore( } }; - if !archive_name.ends_with(".pxar") { - bail!("unknown file extensions - unable to restore '{}'", archive_name); - } + let server_archive_name = if archive_name.ends_with(".pxar") { + format!("{}.didx", archive_name) + } else if archive_name.ends_with(".img") { + format!("{}.fidx", archive_name) + } else { + bail!("unknown archive file extension (expected .pxar of .img)"); + }; let client = client.start_backup_reader(repo.store(), &backup_type, &backup_id, backup_time, true).wait()?; + let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config); use std::os::unix::fs::OpenOptionsExt; @@ -640,23 +645,42 @@ fn restore( .custom_flags(libc::O_TMPFILE) .open("/tmp")?; - let tmpfile = client.download(&format!("{}.didx", archive_name), tmpfile).wait()?; + if server_archive_name.ends_with(".didx") { + let tmpfile = client.download(&server_archive_name, tmpfile).wait()?; - let index = DynamicIndexReader::new(tmpfile)?; + let index = DynamicIndexReader::new(tmpfile) + .map_err(|err| format_err!("unable to read dynamic index '{}' - {}", archive_name, err))?; - let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config); + let mut reader = BufferedDynamicReader::new(index, chunk_reader); - let mut reader = BufferedDynamicReader::new(index, chunk_reader); + let feature_flags = pxar::CA_FORMAT_DEFAULT; + let mut decoder = pxar::SequentialDecoder::new(&mut reader, feature_flags, |path| { + if verbose { + println!("{:?}", path); + } + Ok(()) + }); - let feature_flags = pxar::CA_FORMAT_DEFAULT; - let mut decoder = pxar::SequentialDecoder::new(&mut reader, feature_flags, |path| { - if verbose { - println!("{:?}", path); - } - Ok(()) - }); + decoder.restore(Path::new(target))?; - decoder.restore(Path::new(target))?; + } else if server_archive_name.ends_with(".fidx") { + let tmpfile = client.download(&server_archive_name, tmpfile).wait()?; + + let index = FixedIndexReader::new(tmpfile) + .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?; + + let mut reader = BufferedFixedReader::new(index, chunk_reader); + + let mut writer = std::fs::OpenOptions::new() + .write(true) + .create(true) + .create_new(true) + .open(target) + .map_err(|err| format_err!("unable to create target file {:?} - {}", target, err))?; + + std::io::copy(&mut reader, &mut writer) + .map_err(|err| format_err!("unable to store data - {}", err))?; + } Ok(Value::Null) }