src/client/http_client.rs: implement download_chunk_list

This commit is contained in:
Dietmar Maurer 2019-05-22 09:46:02 +02:00
parent 0cc0fffd1e
commit 553610b43e
1 changed files with 48 additions and 4 deletions

View File

@ -5,6 +5,8 @@ use hyper::Body;
use hyper::client::Client; use hyper::client::Client;
use xdg::BaseDirectories; use xdg::BaseDirectories;
use chrono::Utc; use chrono::Utc;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use http::{Request, Response}; use http::{Request, Response};
use http::header::HeaderValue; use http::header::HeaderValue;
@ -465,10 +467,50 @@ impl H2Client {
(verify_queue_tx, verify_result_rx) (verify_queue_tx, verify_result_rx)
} }
pub fn download_chunk_list(
&self,
path: &str,
archive_name: &str,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> impl Future<Item=(), Error=Error> {
let param = json!({ "archive-name": archive_name });
let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap();
self.send_request(request, None)
.and_then(move |response| {
response
.map_err(Error::from)
.and_then(move |resp| {
let status = resp.status();
if !status.is_success() {
bail!("download chunk list failed with status {}", status);
}
let (_head, body) = resp.into_parts();
Ok(body)
})
.and_then(move |mut body| {
let mut release_capacity = body.release_capacity().clone();
crate::backup::DigestListDecoder::new(body.map_err(Error::from))
.for_each(move |chunk| {
let _ = release_capacity.release_capacity(chunk.len());
println!("GOT DOWNLOAD {}", tools::digest_to_hex(&chunk));
known_chunks.lock().unwrap().insert(chunk);
Ok(())
})
})
})
}
pub fn upload_stream( pub fn upload_stream(
&self, &self,
wid: u64, wid: u64,
stream: impl Stream<Item=Vec<u8>, Error=Error>, stream: impl Stream<Item=bytes::BytesMut, Error=Error>,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> impl Future<Item=usize, Error=Error> { ) -> impl Future<Item=usize, Error=Error> {
let repeat = std::sync::Arc::new(AtomicUsize::new(0)); let repeat = std::sync::Arc::new(AtomicUsize::new(0));
@ -492,18 +534,20 @@ impl H2Client {
let digest = openssl::sha::sha256(&data); let digest = openssl::sha::sha256(&data);
let chunk_is_known = false; let mut known_chunks = known_chunks.lock().unwrap();
let chunk_is_known = known_chunks.contains(&digest);
let upload_data; let upload_data;
let request; let request;
println!("upload chunk ({} bytes)", data.len());
if chunk_is_known { if chunk_is_known {
println!("append existing chunk ({} bytes)", data.len());
let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) }); let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) });
request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap();
upload_data = None; upload_data = None;
} else { } else {
println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len());
known_chunks.insert(digest);
let param = json!({ "wid": wid, "size" : data.len() }); let param = json!({ "wid": wid, "size" : data.len() });
request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap();
upload_data = Some(bytes::Bytes::from(data)); upload_data = Some(bytes::Bytes::from(data));