src/backup/chunk_stream.rs: add optional chunk_size parameter

This commit is contained in:
Dietmar Maurer 2019-05-30 13:28:24 +02:00
parent 49ef316bcd
commit 36898ffce6
5 changed files with 11 additions and 15 deletions

View File

@ -78,7 +78,7 @@ fn upload_pxar(
bail!("got wrong content-type for pxar archive upload"); bail!("got wrong content-type for pxar archive upload");
} }
let chunk_size = param["chunk-size"].as_u64().unwrap_or(4096*1024); let chunk_size = param["chunk-size"].as_u64().unwrap_or(4096*1024) as usize;
verify_chunk_size(chunk_size)?; verify_chunk_size(chunk_size)?;
let datastore = DataStore::lookup_datastore(store)?; let datastore = DataStore::lookup_datastore(store)?;

View File

@ -42,9 +42,9 @@ pub struct ChunkStore {
// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ? // TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
pub fn verify_chunk_size(size: u64) -> Result<(), Error> { pub fn verify_chunk_size(size: usize) -> Result<(), Error> {
static SIZES: [u64; 7] = [64*1024, 128*1024, 256*1024, 512*1024, 1024*1024, 2048*1024, 4096*1024]; static SIZES: [usize; 7] = [64*1024, 128*1024, 256*1024, 512*1024, 1024*1024, 2048*1024, 4096*1024];
if !SIZES.contains(&size) { if !SIZES.contains(&size) {
bail!("Got unsupported chunk size '{}'", size); bail!("Got unsupported chunk size '{}'", size);

View File

@ -15,8 +15,8 @@ pub struct ChunkStream<S> {
} }
impl <S> ChunkStream<S> { impl <S> ChunkStream<S> {
pub fn new(input: S) -> Self { pub fn new(input: S, chunk_size: Option<usize>) -> Self {
Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: BytesMut::new(), scan_pos: 0} Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0}
} }
} }

View File

@ -110,17 +110,13 @@ fn backup_directory<P: AsRef<Path>>(
client: &BackupClient, client: &BackupClient,
dir_path: P, dir_path: P,
archive_name: &str, archive_name: &str,
chunk_size: Option<u64>, chunk_size: Option<usize>,
all_file_systems: bool, all_file_systems: bool,
verbose: bool, verbose: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
if let Some(_size) = chunk_size {
unimplemented!();
}
let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?; let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?;
let chunk_stream = ChunkStream::new(pxar_stream); let chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
@ -144,7 +140,7 @@ fn backup_image<P: AsRef<Path>>(
image_path: P, image_path: P,
archive_name: &str, archive_name: &str,
image_size: u64, image_size: u64,
chunk_size: Option<u64>, chunk_size: Option<usize>,
verbose: bool, verbose: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -155,7 +151,7 @@ fn backup_image<P: AsRef<Path>>(
let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
.map_err(Error::from); .map_err(Error::from);
let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024) as usize); let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
client.upload_stream(archive_name, stream, "fixed", Some(image_size)).wait()?; client.upload_stream(archive_name, stream, "fixed", Some(image_size)).wait()?;
@ -395,7 +391,7 @@ fn create_backup(
let verbose = param["verbose"].as_bool().unwrap_or(false); let verbose = param["verbose"].as_bool().unwrap_or(false);
let chunk_size_opt = param["chunk-size"].as_u64().map(|v| v*1024); let chunk_size_opt = param["chunk-size"].as_u64().map(|v| (v*1024) as usize);
if let Some(size) = chunk_size_opt { if let Some(size) = chunk_size_opt {
verify_chunk_size(size)?; verify_chunk_size(size)?;

View File

@ -27,7 +27,7 @@ fn main() {
let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
.map(|bytes| bytes.to_vec()).map_err(Error::from); .map(|bytes| bytes.to_vec()).map_err(Error::from);
//let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
let chunk_stream = ChunkStream::new(stream); let chunk_stream = ChunkStream::new(stream, None);
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();