change catalog format, use dynamic index to store catalog.

In order to remove size restriction of a single blob.
This commit is contained in:
Dietmar Maurer
2019-11-08 10:35:48 +01:00
parent e016f9ff2f
commit bf6e321744
4 changed files with 313 additions and 170 deletions

View File

@ -8,7 +8,7 @@ use chrono::{Local, Utc, TimeZone};
use std::path::{Path, PathBuf};
use std::collections::{HashSet, HashMap};
use std::ffi::OsStr;
use std::io::{BufReader, Read, Write, Seek, SeekFrom};
use std::io::{Read, Write, Seek, SeekFrom};
use std::os::unix::fs::OpenOptionsExt;
use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size};
@ -188,7 +188,7 @@ async fn backup_directory<P: AsRef<Path>>(
verbose: bool,
skip_lost_and_found: bool,
crypt_config: Option<Arc<CryptConfig>>,
catalog: Arc<Mutex<CatalogBlobWriter<std::fs::File>>>,
catalog: Arc<Mutex<CatalogWriter<SenderWriter>>>,
) -> Result<BackupStats, Error> {
let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), device_set, verbose, skip_lost_and_found, catalog)?;
@ -485,23 +485,41 @@ fn dump_catalog(
true,
).await?;
let manifest = client.download_manifest().await?;
let blob_file = std::fs::OpenOptions::new()
.read(true)
let tmpfile = std::fs::OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_TMPFILE)
.open("/tmp")?;
let mut blob_file = client.download(CATALOG_BLOB_NAME, blob_file).await?;
let manifest = client.download_manifest().await?;
let (csum, size) = compute_file_csum(&mut blob_file)?;
manifest.verify_file(CATALOG_BLOB_NAME, &csum, size)?;
let tmpfile = client.download(CATALOG_NAME, tmpfile).await?;
blob_file.seek(SeekFrom::Start(0))?;
let index = DynamicIndexReader::new(tmpfile)
.map_err(|err| format_err!("unable to read catalog index - {}", err))?;
let reader = BufReader::new(blob_file);
let mut catalog_reader = CatalogBlobReader::new(reader, crypt_config)?;
// Note: do not use values stored in index (not trusted) - instead, computed them again
let (csum, size) = index.compute_csum();
manifest.verify_file(CATALOG_NAME, &csum, size)?;
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
let mut reader = BufferedDynamicReader::new(index, chunk_reader);
let mut catalogfile = std::fs::OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_TMPFILE)
.open("/tmp")?;
std::io::copy(&mut reader, &mut catalogfile)
.map_err(|err| format_err!("unable to download catalog - {}", err))?;
catalogfile.seek(SeekFrom::Start(0))?;
let mut catalog_reader = CatalogReader::new(catalogfile);
catalog_reader.dump()?;
@ -584,6 +602,40 @@ fn parse_backupspec(value: &str) -> Result<(&str, &str), Error> {
bail!("unable to parse directory specification '{}'", value);
}
fn spawn_catalog_upload(
client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>,
) -> Result<
(
Arc<Mutex<CatalogWriter<SenderWriter>>>,
tokio::sync::oneshot::Receiver<Result<BackupStats, Error>>
), Error>
{
let (catalog_tx, catalog_rx) = mpsc::channel(10); // allow to buffer 10 writes
let catalog_stream = catalog_rx.map_err(Error::from);
let catalog_chunk_size = 512*1024;
let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
let catalog = Arc::new(Mutex::new(CatalogWriter::new(SenderWriter::new(catalog_tx))?));
let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let catalog_upload_result = client
.upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config)
.await;
if let Err(ref err) = catalog_upload_result {
eprintln!("catalog upload error - {}", err);
client.cancel();
}
let _ = catalog_result_tx.send(catalog_upload_result);
});
Ok((catalog, catalog_result_rx))
}
fn create_backup(
param: Value,
_info: &ApiMethod,
@ -637,6 +689,8 @@ fn create_backup(
enum BackupType { PXAR, IMAGE, CONFIG, LOGFILE };
let mut upload_catalog = false;
for backupspec in backupspec_list {
let (target, filename) = parse_backupspec(backupspec.as_str().unwrap())?;
@ -655,6 +709,7 @@ fn create_backup(
bail!("got unexpected file type (expected directory)");
}
upload_list.push((BackupType::PXAR, filename.to_owned(), format!("{}.didx", target), 0));
upload_catalog = true;
}
"img" => {
@ -731,15 +786,7 @@ fn create_backup(
let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
let mut manifest = BackupManifest::new(snapshot);
// fixme: encrypt/sign catalog?
let catalog_file = std::fs::OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_TMPFILE)
.open("/tmp")?;
let catalog = Arc::new(Mutex::new(CatalogBlobWriter::new_compressed(catalog_file)?));
let mut upload_catalog = false;
let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?;
for (backup_type, filename, target, size) in upload_list {
match backup_type {
@ -758,7 +805,6 @@ fn create_backup(
manifest.add_file(target, stats.size, stats.csum);
}
BackupType::PXAR => {
upload_catalog = true;
println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target);
catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
let stats = backup_directory(
@ -795,14 +841,15 @@ fn create_backup(
if upload_catalog {
let mutex = Arc::try_unwrap(catalog)
.map_err(|_| format_err!("unable to get catalog (still used)"))?;
let mut catalog_file = mutex.into_inner().unwrap().finish()?;
let mut catalog = mutex.into_inner().unwrap();
let target = CATALOG_BLOB_NAME;
catalog.finish()?;
catalog_file.seek(SeekFrom::Start(0))?;
drop(catalog); // close upload stream
let stats = client.upload_blob(catalog_file, target).await?;
manifest.add_file(target.to_owned(), stats.size, stats.csum);
let stats = catalog_result_rx.await??;
manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum);
}
if let Some(rsa_encrypted_key) = rsa_encrypted_key {