backup: Add support for client side encryption

first try ...
This commit is contained in:
Dietmar Maurer 2019-06-13 11:47:23 +02:00
parent 51929e4532
commit f98ac774ee
7 changed files with 187 additions and 110 deletions

View File

@ -10,20 +10,23 @@ use crate::tools;
use crate::backup::*; use crate::backup::*;
use crate::api_schema::*; use crate::api_schema::*;
use crate::api_schema::router::*; use crate::api_schema::router::*;
use crate::api2::types::*;
use super::environment::*; use super::environment::*;
pub struct UploadChunk { pub struct UploadChunk {
stream: Body, stream: Body,
store: Arc<DataStore>, store: Arc<DataStore>,
digest: [u8; 32],
size: u32, size: u32,
chunk: Vec<u8>, encoded_size: u32,
raw_data: Option<Vec<u8>>,
} }
impl UploadChunk { impl UploadChunk {
pub fn new(stream: Body, store: Arc<DataStore>, size: u32) -> Self { pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
Self { stream, store, size, chunk: vec![] } Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
} }
} }
@ -34,20 +37,30 @@ impl Future for UploadChunk {
fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> { fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> {
loop { loop {
match try_ready!(self.stream.poll()) { match try_ready!(self.stream.poll()) {
Some(chunk) => { Some(input) => {
if (self.chunk.len() + chunk.len()) > (self.size as usize) { if let Some(ref mut raw_data) = self.raw_data {
if (raw_data.len() + input.len()) > (self.encoded_size as usize) {
bail!("uploaded chunk is larger than announced."); bail!("uploaded chunk is larger than announced.");
} }
self.chunk.extend_from_slice(&chunk); raw_data.extend_from_slice(&input);
} else {
bail!("poll upload chunk stream failed - already finished.");
}
} }
None => { None => {
if self.chunk.len() != (self.size as usize) { if let Some(raw_data) = self.raw_data.take() {
if raw_data.len() != (self.encoded_size as usize) {
bail!("uploaded chunk has unexpected size."); bail!("uploaded chunk has unexpected size.");
} }
let (is_duplicate, digest, compressed_size) = self.store.insert_chunk(&self.chunk)?; let chunk = DataChunk::from_raw(raw_data, self.digest)?;
return Ok(Async::Ready((digest, self.size, compressed_size as u32, is_duplicate))) let (is_duplicate, compressed_size) = self.store.insert_chunk(&chunk)?;
return Ok(Async::Ready((self.digest, self.size, compressed_size as u32, is_duplicate)))
} else {
bail!("poll upload chunk stream failed - already finished.");
}
} }
} }
} }
@ -62,10 +75,15 @@ pub fn api_method_upload_fixed_chunk() -> ApiAsyncMethod {
.minimum(1) .minimum(1)
.maximum(256) .maximum(256)
) )
.required("digest", CHUNK_DIGEST_SCHEMA.clone())
.required("size", IntegerSchema::new("Chunk size.") .required("size", IntegerSchema::new("Chunk size.")
.minimum(1) .minimum(1)
.maximum(1024*1024*16) .maximum(1024*1024*16)
) )
.required("encoded-size", IntegerSchema::new("Encoded chunk size.")
.minimum(9)
// fixme: .maximum(1024*1024*16+40)
)
) )
} }
@ -79,10 +97,14 @@ fn upload_fixed_chunk(
let wid = tools::required_integer_param(&param, "wid")? as usize; let wid = tools::required_integer_param(&param, "wid")? as usize;
let size = tools::required_integer_param(&param, "size")? as u32; let size = tools::required_integer_param(&param, "size")? as u32;
let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
let digest_str = tools::required_string_param(&param, "digest")?;
let digest = crate::tools::hex_to_digest(digest_str)?;
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), size); let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size);
let resp = upload let resp = upload
.then(move |result| { .then(move |result| {
@ -109,10 +131,15 @@ pub fn api_method_upload_dynamic_chunk() -> ApiAsyncMethod {
.minimum(1) .minimum(1)
.maximum(256) .maximum(256)
) )
.required("digest", CHUNK_DIGEST_SCHEMA.clone())
.required("size", IntegerSchema::new("Chunk size.") .required("size", IntegerSchema::new("Chunk size.")
.minimum(1) .minimum(1)
.maximum(1024*1024*16) .maximum(1024*1024*16)
) )
.required("encoded-size", IntegerSchema::new("Encoded chunk size.")
.minimum(9)
// fixme: .maximum(1024*1024*16+40)
)
) )
} }
@ -126,10 +153,14 @@ fn upload_dynamic_chunk(
let wid = tools::required_integer_param(&param, "wid")? as usize; let wid = tools::required_integer_param(&param, "wid")? as usize;
let size = tools::required_integer_param(&param, "size")? as u32; let size = tools::required_integer_param(&param, "size")? as u32;
let encoded_size = tools::required_integer_param(&param, "encoded-size")? as u32;
let digest_str = tools::required_string_param(&param, "digest")?;
let digest = crate::tools::hex_to_digest(digest_str)?;
let env: &BackupEnvironment = rpcenv.as_ref(); let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), size); let upload = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size);
let resp = upload let resp = upload
.then(move |result| { .then(move |result| {

View File

@ -1,14 +1,13 @@
use failure::*; use failure::*;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::io::{Read, Write}; use std::io::Write;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use serde_derive::Serialize; use serde_derive::Serialize;
use openssl::sha;
use crate::tools; use crate::tools;
use super::DataChunk;
#[derive(Clone, Serialize)] #[derive(Clone, Serialize)]
pub struct GarbageCollectionStatus { pub struct GarbageCollectionStatus {
@ -173,21 +172,19 @@ impl ChunkStore {
Ok(()) Ok(())
} }
pub fn read_chunk(&self, digest:&[u8], buffer: &mut Vec<u8>) -> Result<(), Error> { pub fn read_chunk(&self, digest:&[u8; 32]) -> Result<DataChunk, Error> {
let mut chunk_path = self.chunk_dir.clone(); let mut chunk_path = self.chunk_dir.clone();
let prefix = digest_to_prefix(&digest); let prefix = digest_to_prefix(digest);
chunk_path.push(&prefix); chunk_path.push(&prefix);
let digest_str = tools::digest_to_hex(&digest); let digest_str = tools::digest_to_hex(digest);
chunk_path.push(&digest_str); chunk_path.push(&digest_str);
buffer.clear(); let mut file = std::fs::File::open(&chunk_path)
let f = std::fs::File::open(&chunk_path)?; .map_err(|err| format_err!(
let mut decoder = zstd::stream::Decoder::new(f)?; "store '{}', unable to read chunk '{}' - {}", self.name, digest_str, err))?;
decoder.read_to_end(buffer)?; DataChunk::load(&mut file, *digest)
Ok(())
} }
pub fn get_chunk_iterator( pub fn get_chunk_iterator(
@ -320,21 +317,14 @@ impl ChunkStore {
Ok(()) Ok(())
} }
pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> { pub fn insert_chunk(
// fixme: use Sha512/256 when available
let digest = sha::sha256(chunk);
let (new, csize) = self.insert_chunk_noverify(&digest, chunk)?;
Ok((new, digest, csize))
}
pub fn insert_chunk_noverify(
&self, &self,
digest: &[u8; 32], chunk: &DataChunk,
chunk: &[u8],
) -> Result<(bool, u64), Error> { ) -> Result<(bool, u64), Error> {
//println!("DIGEST {}", tools::digest_to_hex(&digest)); let digest = chunk.digest();
//println!("DIGEST {}", tools::digest_to_hex(digest));
let mut chunk_path = self.chunk_dir.clone(); let mut chunk_path = self.chunk_dir.clone();
let prefix = digest_to_prefix(digest); let prefix = digest_to_prefix(digest);
@ -355,12 +345,12 @@ impl ChunkStore {
let mut tmp_path = chunk_path.clone(); let mut tmp_path = chunk_path.clone();
tmp_path.set_extension("tmp"); tmp_path.set_extension("tmp");
let f = std::fs::File::create(&tmp_path)?; let mut file = std::fs::File::create(&tmp_path)?;
let mut encoder = zstd::stream::Encoder::new(f, 1)?; let raw_data = chunk.raw_data();
let encoded_size = raw_data.len() as u64;
encoder.write_all(chunk)?; file.write_all(raw_data)?;
let f = encoder.finish()?;
if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) { if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) {
if let Err(_) = std::fs::remove_file(&tmp_path) { /* ignore */ } if let Err(_) = std::fs::remove_file(&tmp_path) { /* ignore */ }
@ -372,15 +362,9 @@ impl ChunkStore {
); );
} }
// fixme: is there a better way to get the compressed size?
let stat = nix::sys::stat::fstat(f.as_raw_fd())?;
let compressed_size = stat.st_size as u64;
//println!("PATH {:?}", chunk_path);
drop(lock); drop(lock);
Ok((false, compressed_size)) Ok((false, encoded_size))
} }
pub fn relative_path(&self, path: &Path) -> PathBuf { pub fn relative_path(&self, path: &Path) -> PathBuf {
@ -416,10 +400,13 @@ fn test_chunk_store1() {
assert!(chunk_store.is_err()); assert!(chunk_store.is_err());
let chunk_store = ChunkStore::create("test", &path).unwrap(); let chunk_store = ChunkStore::create("test", &path).unwrap();
let (exists, _, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap();
let chunk = super::DataChunkBuilder::new(&[0u8, 1u8]).build().unwrap();
let (exists, _) = chunk_store.insert_chunk(&chunk).unwrap();
assert!(!exists); assert!(!exists);
let (exists, _, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap(); let (exists, _) = chunk_store.insert_chunk(&chunk).unwrap();
assert!(exists); assert!(exists);

View File

@ -13,6 +13,7 @@ use super::fixed_index::*;
use super::dynamic_index::*; use super::dynamic_index::*;
use super::index::*; use super::index::*;
use super::backup_info::*; use super::backup_info::*;
use super::DataChunk;
use crate::server::WorkerTask; use crate::server::WorkerTask;
lazy_static!{ lazy_static!{
@ -256,15 +257,10 @@ impl DataStore {
Ok(()) Ok(())
} }
pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> { pub fn insert_chunk(
&self,
chunk: &DataChunk,
) -> Result<(bool, u64), Error> {
self.chunk_store.insert_chunk(chunk) self.chunk_store.insert_chunk(chunk)
} }
pub fn insert_chunk_noverify(
&self,
digest: &[u8; 32],
chunk: &[u8],
) -> Result<(bool, u64), Error> {
self.chunk_store.insert_chunk_noverify(digest, chunk)
}
} }

View File

@ -1,4 +1,5 @@
use failure::*; use failure::*;
use std::convert::TryInto;
use crate::tools; use crate::tools;
use super::IndexFile; use super::IndexFile;
@ -17,6 +18,8 @@ use uuid::Uuid;
use crate::tools::io::ops::*; use crate::tools::io::ops::*;
use crate::tools::vec; use crate::tools::vec;
use super::{DataChunk, DataChunkBuilder};
/// Header format definition for dynamic index files (`.dixd`) /// Header format definition for dynamic index files (`.dixd`)
#[repr(C)] #[repr(C)]
pub struct DynamicIndexHeader { pub struct DynamicIndexHeader {
@ -158,11 +161,12 @@ impl DynamicIndexReader {
} }
#[inline] #[inline]
fn chunk_digest(&self, pos: usize) -> &[u8] { fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
if pos >= self.index_entries { if pos >= self.index_entries {
panic!("chunk index out of range"); panic!("chunk index out of range");
} }
unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) } let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos*40+8), 32) };
slice.try_into().unwrap()
} }
pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> { pub fn mark_used_chunks(&self, _status: &mut GarbageCollectionStatus) -> Result<(), Error> {
@ -182,15 +186,14 @@ impl DynamicIndexReader {
pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> { pub fn dump_pxar(&self, mut writer: Box<dyn Write>) -> Result<(), Error> {
let mut buffer = Vec::with_capacity(1024*1024);
for pos in 0..self.index_entries { for pos in 0..self.index_entries {
let _end = self.chunk_end(pos); let _end = self.chunk_end(pos);
let digest = self.chunk_digest(pos); let digest = self.chunk_digest(pos);
//println!("Dump {:08x}", end ); //println!("Dump {:08x}", end );
self.store.read_chunk(digest, &mut buffer)?; let chunk = self.store.read_chunk(digest)?;
writer.write_all(&buffer)?; // fimxe: handle encrypted chunks
let data = chunk.decode(None)?;
writer.write_all(&data)?;
} }
Ok(()) Ok(())
@ -270,7 +273,14 @@ impl BufferedDynamicReader {
let index = &self.index; let index = &self.index;
let end = index.chunk_end(idx); let end = index.chunk_end(idx);
let digest = index.chunk_digest(idx); let digest = index.chunk_digest(idx);
index.store.read_chunk(digest, &mut self.read_buffer)?;
let chunk = index.store.read_chunk(digest)?;
// fimxe: handle encrypted chunks
// fixme: avoid copy
let data = chunk.decode(None)?;
self.read_buffer.clear();
self.read_buffer.extend_from_slice(&data);
self.buffered_chunk_idx = idx; self.buffered_chunk_idx = idx;
self.buffered_chunk_start = end - (self.read_buffer.len() as u64); self.buffered_chunk_start = end - (self.read_buffer.len() as u64);
@ -433,7 +443,8 @@ impl DynamicIndexWriter {
}) })
} }
pub fn insert_chunk(&self, chunk: &[u8]) -> Result<(bool, [u8; 32], u64), Error> { // fixme: use add_chunk instead?
pub fn insert_chunk(&self, chunk: &DataChunk) -> Result<(bool, u64), Error> {
self.store.insert_chunk(chunk) self.store.insert_chunk(chunk)
} }
@ -531,8 +542,14 @@ impl DynamicChunkWriter {
self.last_chunk = self.chunk_offset; self.last_chunk = self.chunk_offset;
match self.index.insert_chunk(&self.chunk_buffer) { let chunk = DataChunkBuilder::new(&self.chunk_buffer)
Ok((is_duplicate, digest, compressed_size)) => { .compress(true)
.build()?;
let digest = chunk.digest();
match self.index.insert_chunk(&chunk) {
Ok((is_duplicate, compressed_size)) => {
self.stat.compressed_size += compressed_size; self.stat.compressed_size += compressed_size;
if is_duplicate { if is_duplicate {
@ -542,7 +559,7 @@ impl DynamicChunkWriter {
} }
println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size, println!("ADD CHUNK {:016x} {} {}% {} {}", self.chunk_offset, chunk_size,
(compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(&digest)); (compressed_size*100)/(chunk_size as u64), is_duplicate, tools::digest_to_hex(digest));
self.index.add_chunk(self.chunk_offset as u64, &digest)?; self.index.add_chunk(self.chunk_offset as u64, &digest)?;
self.chunk_buffer.truncate(0); self.chunk_buffer.truncate(0);
return Ok(()); return Ok(());

View File

@ -12,6 +12,7 @@ use std::path::{Path, PathBuf};
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use uuid::Uuid; use uuid::Uuid;
use chrono::{Local, TimeZone}; use chrono::{Local, TimeZone};
use super::ChunkInfo;
/// Header format definition for fixed index files (`.fidx`) /// Header format definition for fixed index files (`.fidx`)
#[repr(C)] #[repr(C)]
@ -307,29 +308,42 @@ impl FixedIndexWriter {
} }
// Note: We want to add data out of order, so do not assume any order here. // Note: We want to add data out of order, so do not assume any order here.
pub fn add_chunk(&mut self, pos: usize, chunk: &[u8], stat: &mut ChunkStat) -> Result<(), Error> { pub fn add_chunk(&mut self, chunk_info: &ChunkInfo, stat: &mut ChunkStat) -> Result<(), Error> {
let end = pos + chunk.len(); let chunk_len = chunk_info.chunk_len as usize;
let end = chunk_info.offset as usize;
if end < chunk_len {
bail!("got chunk with small offset ({} < {}", end, chunk_len);
}
let pos = end - chunk_len;
if end > self.size { if end > self.size {
bail!("write chunk data exceeds size ({} >= {})", end, self.size); bail!("write chunk data exceeds size ({} >= {})", end, self.size);
} }
// last chunk can be smaller // last chunk can be smaller
if ((end != self.size) && (chunk.len() != self.chunk_size)) || if ((end != self.size) && (chunk_len != self.chunk_size)) ||
(chunk.len() > self.chunk_size) || (chunk.len() == 0) { (chunk_len > self.chunk_size) || (chunk_len == 0) {
bail!("got chunk with wrong length ({} != {}", chunk.len(), self.chunk_size); bail!("got chunk with wrong length ({} != {}", chunk_len, self.chunk_size);
} }
if pos & (self.chunk_size-1) != 0 { bail!("add unaligned chunk (pos = {})", pos); } if pos & (self.chunk_size-1) != 0 { bail!("add unaligned chunk (pos = {})", pos); }
let (is_duplicate, digest, compressed_size) = self.store.insert_chunk(chunk)?; if (end as u64) != chunk_info.offset {
bail!("got chunk with wrong offset ({} != {}", end, chunk_info.offset);
}
let (is_duplicate, compressed_size) = self.store.insert_chunk(&chunk_info.chunk)?;
stat.chunk_count += 1; stat.chunk_count += 1;
stat.compressed_size += compressed_size; stat.compressed_size += compressed_size;
println!("ADD CHUNK {} {} {}% {} {}", pos, chunk.len(), let digest = chunk_info.chunk.digest();
(compressed_size*100)/(chunk.len() as u64), is_duplicate, tools::digest_to_hex(&digest));
println!("ADD CHUNK {} {} {}% {} {}", pos, chunk_len,
(compressed_size*100)/(chunk_len as u64), is_duplicate, tools::digest_to_hex(digest));
if is_duplicate { if is_duplicate {
stat.duplicate_chunks += 1; stat.duplicate_chunks += 1;
@ -337,7 +351,7 @@ impl FixedIndexWriter {
stat.disk_size += compressed_size; stat.disk_size += compressed_size;
} }
self.add_digest(pos / self.chunk_size, &digest) self.add_digest(pos / self.chunk_size, digest)
} }
pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> { pub fn add_digest(&mut self, index: usize, digest: &[u8; 32]) -> Result<(), Error> {

View File

@ -113,6 +113,7 @@ fn backup_directory<P: AsRef<Path>>(
chunk_size: Option<usize>, chunk_size: Option<usize>,
all_file_systems: bool, all_file_systems: bool,
verbose: bool, verbose: bool,
crypt_config: Option<Arc<CryptConfig>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?; let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), all_file_systems, verbose)?;
@ -130,7 +131,7 @@ fn backup_directory<P: AsRef<Path>>(
.map_err(|_| {}).map(|_| ()) .map_err(|_| {}).map(|_| ())
); );
client.upload_stream(archive_name, stream, "dynamic", None).wait()?; client.upload_stream(archive_name, stream, "dynamic", None, crypt_config).wait()?;
Ok(()) Ok(())
} }
@ -142,6 +143,7 @@ fn backup_image<P: AsRef<Path>>(
image_size: u64, image_size: u64,
chunk_size: Option<usize>, chunk_size: Option<usize>,
_verbose: bool, _verbose: bool,
crypt_config: Option<Arc<CryptConfig>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let path = image_path.as_ref().to_owned(); let path = image_path.as_ref().to_owned();
@ -153,7 +155,7 @@ fn backup_image<P: AsRef<Path>>(
let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024)); let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
client.upload_stream(archive_name, stream, "fixed", Some(image_size)).wait()?; client.upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config).wait()?;
Ok(()) Ok(())
} }
@ -456,6 +458,8 @@ fn create_backup(
println!("Client name: {}", tools::nodename()); println!("Client name: {}", tools::nodename());
println!("Start Time: {}", backup_time.to_rfc3339()); println!("Start Time: {}", backup_time.to_rfc3339());
let crypt_config = None;
let client = client.start_backup(repo.store(), "host", &backup_id, verbose).wait()?; let client = client.start_backup(repo.store(), "host", &backup_id, verbose).wait()?;
for (backup_type, filename, target, size) in upload_list { for (backup_type, filename, target, size) in upload_list {
@ -466,11 +470,27 @@ fn create_backup(
} }
BackupType::PXAR => { BackupType::PXAR => {
println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
backup_directory(&client, &filename, &target, chunk_size_opt, all_file_systems, verbose)?; backup_directory(
&client,
&filename,
&target,
chunk_size_opt,
all_file_systems,
verbose,
crypt_config.clone(),
)?;
} }
BackupType::IMAGE => { BackupType::IMAGE => {
println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
backup_image(&client, &filename, &target, size, chunk_size_opt, verbose)?; backup_image(
&client,
&filename,
&target,
size,
chunk_size_opt,
verbose,
crypt_config.clone(),
)?;
} }
} }
} }

View File

@ -484,20 +484,11 @@ impl BackupClient {
stream: impl Stream<Item=bytes::BytesMut, Error=Error>, stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
prefix: &str, prefix: &str,
fixed_size: Option<u64>, fixed_size: Option<u64>,
crypt_config: Option<Arc<CryptConfig>>,
) -> impl Future<Item=(), Error=Error> { ) -> impl Future<Item=(), Error=Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new())); let known_chunks = Arc::new(Mutex::new(HashSet::new()));
let mut stream_len = 0u64;
let stream = stream.
map(move |data| {
let digest = openssl::sha::sha256(&data);
let offset = stream_len;
stream_len += data.len() as u64;
ChunkInfo { data, digest, offset }
});
let h2 = self.h2.clone(); let h2 = self.h2.clone();
let h2_2 = self.h2.clone(); let h2_2 = self.h2.clone();
let h2_3 = self.h2.clone(); let h2_3 = self.h2.clone();
@ -519,7 +510,7 @@ impl BackupClient {
}) })
.and_then(move |res| { .and_then(move |res| {
let wid = res.as_u64().unwrap(); let wid = res.as_u64().unwrap();
Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone()) Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config)
.and_then(move |(chunk_count, size, _speed)| { .and_then(move |(chunk_count, size, _speed)| {
let param = json!({ let param = json!({
"wid": wid , "wid": wid ,
@ -671,9 +662,10 @@ impl BackupClient {
fn upload_chunk_info_stream( fn upload_chunk_info_stream(
h2: H2Client, h2: H2Client,
wid: u64, wid: u64,
stream: impl Stream<Item=ChunkInfo, Error=Error>, stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
prefix: &str, prefix: &str,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
crypt_config: Option<Arc<CryptConfig>>,
) -> impl Future<Item=(usize, usize, usize), Error=Error> { ) -> impl Future<Item=(usize, usize, usize), Error=Error> {
let repeat = std::sync::Arc::new(AtomicUsize::new(0)); let repeat = std::sync::Arc::new(AtomicUsize::new(0));
@ -690,17 +682,29 @@ impl BackupClient {
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
stream stream
.map(move |chunk_info| { .and_then(move |data| {
let chunk_len = data.len();
repeat.fetch_add(1, Ordering::SeqCst); repeat.fetch_add(1, Ordering::SeqCst);
stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst); let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
let mut chunk_builder = DataChunkBuilder::new(data.as_ref())
.compress(true);
if let Some(ref crypt_config) = crypt_config {
chunk_builder = chunk_builder.crypt_config(crypt_config);
}
let mut known_chunks = known_chunks.lock().unwrap(); let mut known_chunks = known_chunks.lock().unwrap();
let chunk_is_known = known_chunks.contains(&chunk_info.digest); let digest = chunk_builder.digest();
let chunk_is_known = known_chunks.contains(digest);
if chunk_is_known { if chunk_is_known {
MergedChunkInfo::Known(vec![(chunk_info.offset, chunk_info.digest)]) Ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
} else { } else {
known_chunks.insert(chunk_info.digest); known_chunks.insert(*digest);
MergedChunkInfo::New(chunk_info) let chunk = chunk_builder.build()?;
Ok(MergedChunkInfo::New(ChunkInfo { chunk, chunk_len: chunk_len as u64, offset }))
} }
}) })
.merge_known_chunks() .merge_known_chunks()
@ -708,15 +712,23 @@ impl BackupClient {
if let MergedChunkInfo::New(chunk_info) = merged_chunk_info { if let MergedChunkInfo::New(chunk_info) = merged_chunk_info {
let offset = chunk_info.offset; let offset = chunk_info.offset;
let digest = chunk_info.digest; let digest = *chunk_info.chunk.digest();
let digest_str = tools::digest_to_hex(&digest);
let upload_queue = upload_queue.clone(); let upload_queue = upload_queue.clone();
println!("upload new chunk {} ({} bytes, offset {})", tools::digest_to_hex(&digest), println!("upload new chunk {} ({} bytes, offset {})", digest_str,
chunk_info.data.len(), offset); chunk_info.chunk_len, offset);
let chunk_data = chunk_info.chunk.raw_data();
let param = json!({
"wid": wid,
"digest": digest_str,
"size": chunk_info.chunk_len,
"encoded-size": chunk_data.len(),
});
let param = json!({ "wid": wid, "size" : chunk_info.data.len() });
let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param)).unwrap(); let request = H2Client::request_builder("localhost", "POST", &upload_chunk_path, Some(param)).unwrap();
let upload_data = Some(chunk_info.data.freeze()); let upload_data = Some(bytes::Bytes::from(chunk_data));
let new_info = MergedChunkInfo::Known(vec![(offset, digest)]); let new_info = MergedChunkInfo::Known(vec![(offset, digest)]);
@ -883,7 +895,7 @@ impl H2Client {
.and_then(move |mut send_request| { .and_then(move |mut send_request| {
if let Some(data) = data { if let Some(data) = data {
let (response, stream) = send_request.send_request(request, false).unwrap(); let (response, stream) = send_request.send_request(request, false).unwrap();
future::Either::A(PipeToSendStream::new(bytes::Bytes::from(data), stream) future::Either::A(PipeToSendStream::new(data, stream)
.and_then(move |_| { .and_then(move |_| {
future::ok(response) future::ok(response)
})) }))