src/client/http_client.rs - BackupClient: use async

This commit is contained in:
Dietmar Maurer 2019-09-05 12:55:22 +02:00
parent 2a1e6d7dea
commit 2f831baec0

View File

@ -613,102 +613,91 @@ impl BackupClient {
self.canceller.cancel(); self.canceller.cancel();
} }
pub fn upload_blob<R: std::io::Read>( pub async fn upload_blob<R: std::io::Read>(
&self, &self,
mut reader: R, mut reader: R,
file_name: &str, file_name: &str,
) -> impl Future<Output = Result<BackupStats, Error>> { ) -> Result<BackupStats, Error> {
let mut raw_data = Vec::new();
// fixme: avoid loading into memory
reader.read_to_end(&mut raw_data)?;
let h2 = self.h2.clone(); let csum = openssl::sha::sha256(&raw_data);
let file_name = file_name.to_owned(); let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
let size = raw_data.len() as u64; // fixme: should be decoded size instead??
async move { let _value = self.h2.upload("blob", Some(param), raw_data).await?;
let mut raw_data = Vec::new(); Ok(BackupStats { size, csum })
// fixme: avoid loading into memory
reader.read_to_end(&mut raw_data)?;
let csum = openssl::sha::sha256(&raw_data);
let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
let size = raw_data.len() as u64; // fixme: should be decoded size instead??
let _value = h2.upload("blob", Some(param), raw_data).await?;
Ok(BackupStats { size, csum })
}
} }
pub fn upload_blob_from_data( pub async fn upload_blob_from_data(
&self, &self,
data: Vec<u8>, data: Vec<u8>,
file_name: &str, file_name: &str,
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
compress: bool, compress: bool,
sign_only: bool, sign_only: bool,
) -> impl Future<Output = Result<BackupStats, Error>> { ) -> Result<BackupStats, Error> {
let h2 = self.h2.clone();
let file_name = file_name.to_owned();
let size = data.len() as u64; let size = data.len() as u64;
async move { let blob = if let Some(crypt_config) = crypt_config {
let blob = if let Some(crypt_config) = crypt_config { if sign_only {
if sign_only { DataBlob::create_signed(&data, crypt_config, compress)?
DataBlob::create_signed(&data, crypt_config, compress)?
} else {
DataBlob::encode(&data, Some(crypt_config.clone()), compress)?
}
} else { } else {
DataBlob::encode(&data, None, compress)? DataBlob::encode(&data, Some(crypt_config.clone()), compress)?
}; }
} else {
DataBlob::encode(&data, None, compress)?
};
let raw_data = blob.into_inner(); let raw_data = blob.into_inner();
let csum = openssl::sha::sha256(&raw_data); let csum = openssl::sha::sha256(&raw_data);
let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); let param = json!({"encoded-size": raw_data.len(), "file-name": file_name });
let _value = h2.upload("blob", Some(param), raw_data).await?; let _value = self.h2.upload("blob", Some(param), raw_data).await?;
Ok(BackupStats { size, csum }) Ok(BackupStats { size, csum })
}
} }
pub fn upload_blob_from_file<P: AsRef<std::path::Path>>( pub async fn upload_blob_from_file<P: AsRef<std::path::Path>>(
&self, &self,
src_path: P, src_path: P,
file_name: &str, file_name: &str,
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
compress: bool, compress: bool,
) -> impl Future<Output = Result<BackupStats, Error>> { ) -> Result<BackupStats, Error> {
let h2 = self.h2.clone(); let src_path = src_path.as_ref();
let file_name = file_name.to_owned();
let src_path = src_path.as_ref().to_owned();
async move { let mut file = tokio::fs::File::open(src_path.clone())
let mut file = tokio::fs::File::open(src_path.clone()) .await
.await .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?;
.map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err))?;
let mut contents = Vec::new(); let mut contents = Vec::new();
file.read_to_end(&mut contents).await.map_err(Error::from)?;
let size: u64 = contents.len() as u64; file.read_to_end(&mut contents)
let blob = DataBlob::encode(&contents, crypt_config, compress)?; .await
let raw_data = blob.into_inner(); .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
let csum = openssl::sha::sha256(&raw_data);
let param = json!({ let size: u64 = contents.len() as u64;
"encoded-size": raw_data.len(), let blob = DataBlob::encode(&contents, crypt_config, compress)?;
"file-name": file_name, let raw_data = blob.into_inner();
}); let csum = openssl::sha::sha256(&raw_data);
h2.upload("blob", Some(param), raw_data).await?; let param = json!({
Ok(BackupStats { size, csum }) "encoded-size": raw_data.len(),
} "file-name": file_name,
});
self.h2.upload("blob", Some(param), raw_data).await?;
Ok(BackupStats { size, csum })
} }
pub fn upload_stream( pub async fn upload_stream(
&self, &self,
archive_name: &str, archive_name: &str,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>, stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
prefix: &str, prefix: &str,
fixed_size: Option<u64>, fixed_size: Option<u64>,
crypt_config: Option<Arc<CryptConfig>>, crypt_config: Option<Arc<CryptConfig>>,
) -> impl Future<Output = Result<BackupStats, Error>> { ) -> Result<BackupStats, Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new())); let known_chunks = Arc::new(Mutex::new(HashSet::new()));
let mut param = json!({ "archive-name": archive_name }); let mut param = json!({ "archive-name": archive_name });
@ -719,20 +708,13 @@ impl BackupClient {
let index_path = format!("{}_index", prefix); let index_path = format!("{}_index", prefix);
let close_path = format!("{}_close", prefix); let close_path = format!("{}_close", prefix);
let prefix = prefix.to_owned(); Self::download_chunk_list(self.h2.clone(), &index_path, archive_name, known_chunks.clone()).await?;
let h2 = self.h2.clone(); let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
let download_future = let (chunk_count, size, _speed, csum) =
Self::download_chunk_list(h2.clone(), &index_path, archive_name, known_chunks.clone()); Self::upload_chunk_info_stream(
self.h2.clone(),
async move {
download_future.await?;
let wid = h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
let (chunk_count, size, _speed, csum) = Self::upload_chunk_info_stream(
h2.clone(),
wid, wid,
stream, stream,
&prefix, &prefix,
@ -741,17 +723,16 @@ impl BackupClient {
) )
.await?; .await?;
let param = json!({ let param = json!({
"wid": wid , "wid": wid ,
"chunk-count": chunk_count, "chunk-count": chunk_count,
"size": size, "size": size,
}); });
let _value = h2.post(&close_path, Some(param)).await?; let _value = self.h2.post(&close_path, Some(param)).await?;
Ok(BackupStats { Ok(BackupStats {
size: size as u64, size: size as u64,
csum, csum,
}) })
}
} }
fn response_queue() -> ( fn response_queue() -> (