src/client/http_client.rs: compute checksums for chunk streams
This commit is contained in:
parent
bd15e96dd9
commit
dbed4c8cd7
@ -708,7 +708,7 @@ impl BackupClient {
|
|||||||
.and_then(move |res| {
|
.and_then(move |res| {
|
||||||
let wid = res.as_u64().unwrap();
|
let wid = res.as_u64().unwrap();
|
||||||
Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config)
|
Self::upload_chunk_info_stream(h2_3, wid, stream, &prefix, known_chunks.clone(), crypt_config)
|
||||||
.and_then(move |(chunk_count, size, _speed)| {
|
.and_then(move |(chunk_count, size, _speed, csum)| {
|
||||||
let param = json!({
|
let param = json!({
|
||||||
"wid": wid ,
|
"wid": wid ,
|
||||||
"chunk-count": chunk_count,
|
"chunk-count": chunk_count,
|
||||||
@ -716,7 +716,7 @@ impl BackupClient {
|
|||||||
});
|
});
|
||||||
h2_4.post(&close_path, Some(param))
|
h2_4.post(&close_path, Some(param))
|
||||||
.map(move |_| {
|
.map(move |_| {
|
||||||
BackupStats { size: size as u64, csum: [0u8; 32] }
|
BackupStats { size: size as u64, csum }
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -865,7 +865,7 @@ impl BackupClient {
|
|||||||
prefix: &str,
|
prefix: &str,
|
||||||
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||||
crypt_config: Option<Arc<CryptConfig>>,
|
crypt_config: Option<Arc<CryptConfig>>,
|
||||||
) -> impl Future<Item=(usize, usize, usize), Error=Error> {
|
) -> impl Future<Item=(usize, usize, usize, [u8; 32]), Error=Error> {
|
||||||
|
|
||||||
let repeat = std::sync::Arc::new(AtomicUsize::new(0));
|
let repeat = std::sync::Arc::new(AtomicUsize::new(0));
|
||||||
let repeat2 = repeat.clone();
|
let repeat2 = repeat.clone();
|
||||||
@ -880,6 +880,9 @@ impl BackupClient {
|
|||||||
|
|
||||||
let start_time = std::time::Instant::now();
|
let start_time = std::time::Instant::now();
|
||||||
|
|
||||||
|
let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
|
||||||
|
let index_csum_2 = index_csum.clone();
|
||||||
|
|
||||||
stream
|
stream
|
||||||
.and_then(move |data| {
|
.and_then(move |data| {
|
||||||
|
|
||||||
@ -897,6 +900,12 @@ impl BackupClient {
|
|||||||
|
|
||||||
let mut known_chunks = known_chunks.lock().unwrap();
|
let mut known_chunks = known_chunks.lock().unwrap();
|
||||||
let digest = chunk_builder.digest();
|
let digest = chunk_builder.digest();
|
||||||
|
|
||||||
|
let mut guard = index_csum.lock().unwrap();
|
||||||
|
let csum = guard.as_mut().unwrap();
|
||||||
|
csum.update(&offset.to_le_bytes());
|
||||||
|
csum.update(digest);
|
||||||
|
|
||||||
let chunk_is_known = known_chunks.contains(digest);
|
let chunk_is_known = known_chunks.contains(digest);
|
||||||
if chunk_is_known {
|
if chunk_is_known {
|
||||||
Ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
|
Ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
|
||||||
@ -962,7 +971,11 @@ impl BackupClient {
|
|||||||
println!("Average chunk size was {} bytes.", stream_len/repeat);
|
println!("Average chunk size was {} bytes.", stream_len/repeat);
|
||||||
println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
|
println!("Time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
|
||||||
}
|
}
|
||||||
Ok((repeat, stream_len, speed))
|
|
||||||
|
let mut guard = index_csum_2.lock().unwrap();
|
||||||
|
let csum = guard.take().unwrap().finish();
|
||||||
|
|
||||||
|
Ok((repeat, stream_len, speed, csum))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user