update backup api for incremental backup

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
This commit is contained in:
Dietmar Maurer
2020-06-25 12:23:30 +02:00
parent 8ea00f6e49
commit b957aa81bd
4 changed files with 141 additions and 305 deletions

View File

@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::os::unix::fs::OpenOptionsExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
@ -22,6 +23,7 @@ pub struct BackupWriter {
h2: H2Client,
abort: AbortHandle,
verbose: bool,
crypt_config: Option<Arc<CryptConfig>>,
}
impl Drop for BackupWriter {
@ -38,12 +40,13 @@ pub struct BackupStats {
impl BackupWriter {
fn new(h2: H2Client, abort: AbortHandle, verbose: bool) -> Arc<Self> {
Arc::new(Self { h2, abort, verbose })
fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
Arc::new(Self { h2, abort, crypt_config, verbose })
}
pub async fn start(
client: HttpClient,
crypt_config: Option<Arc<CryptConfig>>,
datastore: &str,
backup_type: &str,
backup_id: &str,
@ -64,7 +67,7 @@ impl BackupWriter {
let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
Ok(BackupWriter::new(h2, abort, debug))
Ok(BackupWriter::new(h2, abort, crypt_config, debug))
}
pub async fn get(
@ -159,16 +162,19 @@ impl BackupWriter {
&self,
data: Vec<u8>,
file_name: &str,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
sign_only: bool,
crypt_or_sign: Option<bool>,
) -> Result<BackupStats, Error> {
let blob = if let Some(ref crypt_config) = crypt_config {
if sign_only {
DataBlob::create_signed(&data, crypt_config, compress)?
let blob = if let Some(ref crypt_config) = self.crypt_config {
if let Some(encrypt) = crypt_or_sign {
if encrypt {
DataBlob::encode(&data, Some(crypt_config), compress)?
} else {
DataBlob::create_signed(&data, crypt_config, compress)?
}
} else {
DataBlob::encode(&data, Some(crypt_config), compress)?
DataBlob::encode(&data, None, compress)?
}
} else {
DataBlob::encode(&data, None, compress)?
@ -187,8 +193,8 @@ impl BackupWriter {
&self,
src_path: P,
file_name: &str,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
crypt_or_sign: Option<bool>,
) -> Result<BackupStats, Error> {
let src_path = src_path.as_ref();
@ -203,25 +209,16 @@ impl BackupWriter {
.await
.map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?;
let raw_data = blob.into_inner();
let size = raw_data.len() as u64;
let csum = openssl::sha::sha256(&raw_data);
let param = json!({
"encoded-size": size,
"file-name": file_name,
});
self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
Ok(BackupStats { size, csum })
self.upload_blob_from_data(contents, file_name, compress, crypt_or_sign).await
}
pub async fn upload_stream(
&self,
previous_manifest: Option<Arc<BackupManifest>>,
archive_name: &str,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
prefix: &str,
fixed_size: Option<u64>,
crypt_config: Option<Arc<CryptConfig>>,
) -> Result<BackupStats, Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
@ -233,7 +230,18 @@ impl BackupWriter {
let index_path = format!("{}_index", prefix);
let close_path = format!("{}_close", prefix);
self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?;
if let Some(manifest) = previous_manifest {
// try, but ignore errors
match archive_type(archive_name) {
Ok(ArchiveType::FixedIndex) => {
let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
}
Ok(ArchiveType::DynamicIndex) => {
let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
}
_ => { /* do nothing */ }
}
}
let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
@ -244,7 +252,7 @@ impl BackupWriter {
stream,
&prefix,
known_chunks.clone(),
crypt_config,
self.crypt_config.clone(),
self.verbose,
)
.await?;
@ -374,41 +382,93 @@ impl BackupWriter {
(verify_queue_tx, verify_result_rx)
}
pub async fn download_chunk_list(
pub async fn download_previous_fixed_index(
&self,
path: &str,
archive_name: &str,
manifest: &BackupManifest,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> {
) -> Result<FixedIndexReader, Error> {
let mut tmpfile = std::fs::OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_TMPFILE)
.open("/tmp")?;
let param = json!({ "archive-name": archive_name });
let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap();
self.h2.download("previous", Some(param), &mut tmpfile).await?;
let h2request = self.h2.send_request(request, None).await?;
let resp = h2request.await?;
let index = FixedIndexReader::new(tmpfile)
.map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
// Note: do not use values stored in index (not trusted) - instead, computed them again
let (csum, size) = index.compute_csum();
manifest.verify_file(archive_name, &csum, size)?;
let status = resp.status();
if !status.is_success() {
H2Client::h2api_response(resp).await?; // raise error
unreachable!();
}
let mut body = resp.into_body();
let mut flow_control = body.flow_control().clone();
let mut stream = DigestListDecoder::new(body.map_err(Error::from));
while let Some(chunk) = stream.try_next().await? {
let _ = flow_control.release_capacity(chunk.len());
known_chunks.lock().unwrap().insert(chunk);
// add index chunks to known chunks
let mut known_chunks = known_chunks.lock().unwrap();
for i in 0..index.index_count() {
known_chunks.insert(*index.index_digest(i).unwrap());
}
if self.verbose {
println!("{}: known chunks list length is {}", archive_name, known_chunks.lock().unwrap().len());
println!("{}: known chunks list length is {}", archive_name, index.index_count());
}
Ok(())
Ok(index)
}
pub async fn download_previous_dynamic_index(
&self,
archive_name: &str,
manifest: &BackupManifest,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<DynamicIndexReader, Error> {
let mut tmpfile = std::fs::OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_TMPFILE)
.open("/tmp")?;
let param = json!({ "archive-name": archive_name });
self.h2.download("previous", Some(param), &mut tmpfile).await?;
let index = DynamicIndexReader::new(tmpfile)
.map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
// Note: do not use values stored in index (not trusted) - instead, computed them again
let (csum, size) = index.compute_csum();
manifest.verify_file(archive_name, &csum, size)?;
// add index chunks to known chunks
let mut known_chunks = known_chunks.lock().unwrap();
for i in 0..index.index_count() {
known_chunks.insert(*index.index_digest(i).unwrap());
}
if self.verbose {
println!("{}: known chunks list length is {}", archive_name, index.index_count());
}
Ok(index)
}
/// Download backup manifest (index.json) of last backup
pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
use std::convert::TryFrom;
let mut raw_data = Vec::with_capacity(64 * 1024);
let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
self.h2.download("previous", Some(param), &mut raw_data).await?;
let blob = DataBlob::from_raw(raw_data)?;
blob.verify_crc()?;
let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
let json: Value = serde_json::from_slice(&data[..])?;
let manifest = BackupManifest::try_from(json)?;
Ok(manifest)
}
fn upload_chunk_info_stream(