src/client/http_client.rs: use ChunkInfo streams

This will make out of order uploads possible...
This commit is contained in:
Dietmar Maurer 2019-05-23 09:42:37 +02:00
parent 8ea3b1d188
commit 91320f0879
1 changed files with 25 additions and 12 deletions

View File

@ -405,6 +405,12 @@ pub struct BackupClient {
h2: H2Client, h2: H2Client,
} }
struct ChunkInfo {
digest: [u8; 32],
data: bytes::BytesMut,
offset: u64,
}
impl BackupClient { impl BackupClient {
pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self { pub fn new(h2: h2::client::SendRequest<bytes::Bytes>) -> Self {
@ -435,6 +441,15 @@ impl BackupClient {
let known_chunks = Arc::new(Mutex::new(HashSet::new())); let known_chunks = Arc::new(Mutex::new(HashSet::new()));
let mut stream_len = 0u64;
let stream = stream.
map(move |data| {
let digest = openssl::sha::sha256(&data);
stream_len += data.len() as u64;
ChunkInfo { data, digest, offset: stream_len }
});
let h2 = self.h2.clone(); let h2 = self.h2.clone();
let h2_2 = self.h2.clone(); let h2_2 = self.h2.clone();
let h2_3 = self.h2.clone(); let h2_3 = self.h2.clone();
@ -532,7 +547,7 @@ impl BackupClient {
fn upload_stream( fn upload_stream(
h2: H2Client, h2: H2Client,
wid: u64, wid: u64,
stream: impl Stream<Item=bytes::BytesMut, Error=Error>, stream: impl Stream<Item=ChunkInfo, Error=Error>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>, known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> impl Future<Item=(usize, usize, usize), Error=Error> { ) -> impl Future<Item=(usize, usize, usize), Error=Error> {
@ -547,33 +562,31 @@ impl BackupClient {
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
stream stream
.for_each(move |data| { .for_each(move |chunk_info| {
let h2 = h2.clone(); let h2 = h2.clone();
repeat.fetch_add(1, Ordering::SeqCst); repeat.fetch_add(1, Ordering::SeqCst);
stream_len.fetch_add(data.len(), Ordering::SeqCst); stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst);
let upload_queue = upload_queue.clone(); let upload_queue = upload_queue.clone();
let digest = openssl::sha::sha256(&data);
let mut known_chunks = known_chunks.lock().unwrap(); let mut known_chunks = known_chunks.lock().unwrap();
let chunk_is_known = known_chunks.contains(&digest); let chunk_is_known = known_chunks.contains(&chunk_info.digest);
let upload_data; let upload_data;
let request; let request;
if chunk_is_known { if chunk_is_known {
println!("append existing chunk ({} bytes)", data.len()); println!("append existing chunk ({} bytes)", chunk_info.data.len());
let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) }); let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&chunk_info.digest) });
request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
upload_data = None; upload_data = None;
} else { } else {
println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len()); println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&chunk_info.digest), chunk_info.data.len());
known_chunks.insert(digest); known_chunks.insert(chunk_info.digest);
let param = json!({ "wid": wid, "size" : data.len() }); let param = json!({ "wid": wid, "size" : chunk_info.data.len() });
request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
upload_data = Some(bytes::Bytes::from(data)); upload_data = Some(chunk_info.data.freeze());
} }
h2.send_request(request, upload_data) h2.send_request(request, upload_data)