From a914a774487d2cd4a28deeaa0c4c83a0c3cf915d Mon Sep 17 00:00:00 2001 From: root Date: Fri, 14 Dec 2018 13:39:41 +0100 Subject: [PATCH] backup-client: read file by chunks --- src/bin/backup-client.rs | 93 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/src/bin/backup-client.rs b/src/bin/backup-client.rs index adab220e..e676f432 100644 --- a/src/bin/backup-client.rs +++ b/src/bin/backup-client.rs @@ -3,6 +3,11 @@ extern crate apitest; use failure::*; use std::collections::HashMap; +use std::fs::File; +use std::io::Read; +use std::io::ErrorKind; +use std::io::prelude::*; +use std::iter::Iterator; use apitest::cli::command::*; use apitest::api::schema::*; @@ -13,10 +18,92 @@ use std::path::{Path, PathBuf}; use apitest::config::datastore; +fn required_string_param<'a>(param: &'a Value, name: &str) -> &'a str { + param[name].as_str().expect(&format!("missing parameter '{}'", name)) +} + + +fn file_chunker( + mut file: File, + chunk_cb: C +) -> Result<(), Error> + where C: Fn(usize, &[u8]) -> Result +{ + + let chunk_size = 64*1024; + + let mut buf = vec![0u8; 4*1024*1024]; + + let mut pos = 0; + let mut file_pos = 0; + loop { + let mut eof = false; + let mut tmp = &mut buf[..]; + // try to read large portions, at least chunk_size + while pos < chunk_size { + match file.read(tmp) { + Ok(0) => { eof = true; break; }, + Ok(n) => { + pos += n; + if pos > chunk_size { break; } + tmp = &mut tmp[n..]; + } + Err(ref e) if e.kind() == ErrorKind::Interrupted => { /* try again */ } + Err(e) => bail!("read error - {}", e.to_string()), + } + } + println!("READ {} {}", pos, eof); + + let mut start = 0; + while start + chunk_size <= pos { + if !(chunk_cb)(file_pos, &buf[start..start+chunk_size])? { break; } + file_pos += chunk_size; + start += chunk_size; + } + if eof { + if start < pos { + (chunk_cb)(file_pos, &buf[start..pos])?; + //file_pos += pos - start; + } + break; + } else { + let rest = pos - start; + if rest > 0 { + let ptr = buf.as_mut_ptr(); + unsafe { std::ptr::copy_nonoverlapping(ptr.add(start), ptr, rest); } + pos = rest; + } else { + pos = 0; + } + } + } + + Ok(()) + +} + fn backup_file(param: Value, _info: &ApiMethod) -> Result { - println!("Backup file '{}'", param["filename"].as_str().unwrap()); - + let filename = required_string_param(¶m, "filename"); + let store = required_string_param(¶m, "store"); + + let config = datastore::config()?; + let (_, store_config) = config.sections.get(store) + .ok_or(format_err!("no such datastore '{}'", store))?; + + let path = store_config["path"].as_str().unwrap(); + + let _store = ChunkStore::open(path)?; + + println!("Backup file '{}' to '{}'", filename, store); + + let file = std::fs::File::open(filename)?; + + file_chunker(file, |pos, chunk| { + println!("CHUNK {} {}", pos, chunk.len()); + Ok(true) + })?; + Ok(Value::Null) } @@ -31,7 +118,7 @@ fn main() { .required("store", StringSchema::new("Datastore name.")) )) .arg_param(vec!["filename"]); - + if let Err(err) = run_cli_command(&cmd_def.into()) { eprintln!("Error: {}", err); print_cli_usage();