To prevent a race with a background GC operation, do not allow deletion of backups who's index might currently be referenced as the "known chunk list" for successive backups. Otherwise the GC could delete chunks it thinks are no longer referenced, while at the same time telling the client that it doesn't need to upload said chunks because they already exist. Additionally, prevent deletion of whole backup groups, if there are snapshots contained that appear to be currently in-progress. This is currently unlikely to trigger, as that function is only used for sync jobs, but it's a useful safeguard either way. Deleting a single snapshot has a 'force' parameter, which is necessary to allow deleting incomplete snapshots on an aborted backup. Pruning also sets force=true to avoid the check, since it calculates which snapshots to keep on its own. To avoid code duplication, the is_finished method is factored out. Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
522 lines
15 KiB
522 lines
15 KiB
use anyhow::{bail, Error};
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use serde_json::{json, Value};
use proxmox::tools::digest_to_hex;
use proxmox::tools::fs::{replace_file, CreateOptions};
use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
use crate::server::WorkerTask;
use crate::backup::*;
use crate::server::formatter::*;
use hyper::{Body, Response};
struct UploadStatistic {
count: u64,
size: u64,
compressed_size: u64,
duplicates: u64,
impl UploadStatistic {
fn new() -> Self {
Self {
count: 0,
size: 0,
compressed_size: 0,
duplicates: 0,
struct DynamicWriterState {
name: String,
index: DynamicIndexWriter,
offset: u64,
chunk_count: u64,
upload_stat: UploadStatistic,
struct FixedWriterState {
name: String,
index: FixedIndexWriter,
size: usize,
chunk_size: u32,
chunk_count: u64,
small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
upload_stat: UploadStatistic,
incremental: bool,
struct SharedBackupState {
finished: bool,
uid_counter: usize,
file_counter: usize, // successfully uploaded files
dynamic_writers: HashMap<usize, DynamicWriterState>,
fixed_writers: HashMap<usize, FixedWriterState>,
known_chunks: HashMap<[u8;32], u32>,
impl SharedBackupState {
// Raise error if finished flag is set
fn ensure_unfinished(&self) -> Result<(), Error> {
if self.finished {
bail!("backup already marked as finished.");
// Get an unique integer ID
pub fn next_uid(&mut self) -> usize {
self.uid_counter += 1;
/// `RpcEnvironmet` implementation for backup service
pub struct BackupEnvironment {
env_type: RpcEnvironmentType,
result_attributes: Value,
user: String,
pub debug: bool,
pub formatter: &'static OutputFormatter,
pub worker: Arc<WorkerTask>,
pub datastore: Arc<DataStore>,
pub backup_dir: BackupDir,
pub last_backup: Option<BackupInfo>,
state: Arc<Mutex<SharedBackupState>>
impl BackupEnvironment {
pub fn new(
env_type: RpcEnvironmentType,
user: String,
worker: Arc<WorkerTask>,
datastore: Arc<DataStore>,
backup_dir: BackupDir,
) -> Self {
let state = SharedBackupState {
finished: false,
uid_counter: 0,
file_counter: 0,
dynamic_writers: HashMap::new(),
fixed_writers: HashMap::new(),
known_chunks: HashMap::new(),
Self {
result_attributes: json!({}),
debug: false,
formatter: &JSON_FORMATTER,
last_backup: None,
state: Arc::new(Mutex::new(state)),
/// Register a Chunk with associated length.
/// We do not fully trust clients, so a client may only use registered
/// chunks. Please use this method to register chunks from previous backups.
pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.known_chunks.insert(digest, length);
/// Register fixed length chunks after upload.
/// Like `register_chunk()`, but additionally record statistics for
/// the fixed index writer.
pub fn register_fixed_chunk(
wid: usize,
digest: [u8; 32],
size: u32,
compressed_size: u32,
is_duplicate: bool,
) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
let mut data = match state.fixed_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
if size > data.chunk_size {
bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size);
} else if size < data.chunk_size {
data.small_chunk_count += 1;
if data.small_chunk_count > 1 {
bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)");
// record statistics
data.upload_stat.count += 1;
data.upload_stat.size += size as u64;
data.upload_stat.compressed_size += compressed_size as u64;
if is_duplicate { data.upload_stat.duplicates += 1; }
// register chunk
state.known_chunks.insert(digest, size);
/// Register dynamic length chunks after upload.
/// Like `register_chunk()`, but additionally record statistics for
/// the dynamic index writer.
pub fn register_dynamic_chunk(
wid: usize,
digest: [u8; 32],
size: u32,
compressed_size: u32,
is_duplicate: bool,
) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
let mut data = match state.dynamic_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("dynamic writer '{}' not registered", wid),
// record statistics
data.upload_stat.count += 1;
data.upload_stat.size += size as u64;
data.upload_stat.compressed_size += compressed_size as u64;
if is_duplicate { data.upload_stat.duplicates += 1; }
// register chunk
state.known_chunks.insert(digest, size);
pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option<u32> {
let state = self.state.lock().unwrap();
match state.known_chunks.get(digest) {
Some(len) => Some(*len),
None => None,
/// Store the writer with an unique ID
pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
let uid = state.next_uid();
state.dynamic_writers.insert(uid, DynamicWriterState {
index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(),
/// Store the writer with an unique ID
pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, incremental: bool) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
let uid = state.next_uid();
state.fixed_writers.insert(uid, FixedWriterState {
index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), incremental,
/// Append chunk to dynamic writer
pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
let mut data = match state.dynamic_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("dynamic writer '{}' not registered", wid),
if data.offset != offset {
bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})",
data.name, data.offset, offset);
data.offset += size as u64;
data.chunk_count += 1;
data.index.add_chunk(data.offset, digest)?;
/// Append chunk to fixed writer
pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
let mut data = match state.fixed_writers.get_mut(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
let end = (offset as usize) + (size as usize);
let idx = data.index.check_chunk_alignment(end, size as usize)?;
data.chunk_count += 1;
data.index.add_digest(idx, digest)?;
fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) {
self.log(format!("Upload statistics for '{}'", archive_name));
self.log(format!("UUID: {}", digest_to_hex(uuid)));
self.log(format!("Checksum: {}", digest_to_hex(csum)));
self.log(format!("Size: {}", size));
self.log(format!("Chunk count: {}", chunk_count));
if size == 0 || chunk_count == 0 {
self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size));
// account for zero chunk, which might be uploaded but never used
let client_side_duplicates = if chunk_count < upload_stat.count {
} else {
chunk_count - upload_stat.count
let server_side_duplicates = upload_stat.duplicates;
if (client_side_duplicates + server_side_duplicates) > 0 {
let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count;
self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per));
if upload_stat.size > 0 {
self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size));
/// Close dynamic writer
pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
let mut data = match state.dynamic_writers.remove(&wid) {
Some(data) => data,
None => bail!("dynamic writer '{}' not registered", wid),
if data.chunk_count != chunk_count {
bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count);
if data.offset != size {
bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size);
let uuid = data.index.uuid;
let expected_csum = data.index.close()?;
println!("server checksum {:?} client: {:?}", expected_csum, csum);
if csum != expected_csum {
bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name);
self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat);
state.file_counter += 1;
/// Close fixed writer
pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
let mut data = match state.fixed_writers.remove(&wid) {
Some(data) => data,
None => bail!("fixed writer '{}' not registered", wid),
if data.chunk_count != chunk_count {
bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
if !data.incremental {
let expected_count = data.index.index_length();
if chunk_count != (expected_count as u64) {
bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count);
if size != (data.size as u64) {
bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
let uuid = data.index.uuid;
let expected_csum = data.index.close()?;
println!("server checksum: {:?} client: {:?} (incremental: {})", expected_csum, csum, data.incremental);
if csum != expected_csum {
bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat);
state.file_counter += 1;
pub fn add_blob(&self, file_name: &str, data: Vec<u8>) -> Result<(), Error> {
let mut path = self.datastore.base_path();
let blob_len = data.len();
let orig_len = data.len(); // fixme:
// always verify blob/CRC at server side
let blob = DataBlob::load_from_reader(&mut &data[..])?;
let raw_data = blob.raw_data();
replace_file(&path, raw_data, CreateOptions::new())?;
self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len));
let mut state = self.state.lock().unwrap();
state.file_counter += 1;
/// Mark backup as finished
pub fn finish_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
// test if all writer are correctly closed
if state.dynamic_writers.len() != 0 {
bail!("found open index writer - unable to finish backup");
if state.file_counter == 0 {
bail!("backup does not contain valid files (file count == 0)");
state.finished = true;
pub fn log<S: AsRef<str>>(&self, msg: S) {
pub fn debug<S: AsRef<str>>(&self, msg: S) {
if self.debug { self.worker.log(msg); }
pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
match result {
Ok(data) => (self.formatter.format_data)(data, self),
Err(err) => (self.formatter.format_error)(err),
/// Raise error if finished flag is not set
pub fn ensure_finished(&self) -> Result<(), Error> {
let state = self.state.lock().unwrap();
if !state.finished {
bail!("backup ended but finished flag is not set.");
/// Remove complete backup
pub fn remove_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
state.finished = true;
self.datastore.remove_backup_dir(&self.backup_dir, true)?;
impl RpcEnvironment for BackupEnvironment {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
fn result_attrib(&self) -> &Value {
fn env_type(&self) -> RpcEnvironmentType {
fn set_user(&mut self, _user: Option<String>) {
panic!("unable to change user");
fn get_user(&self) -> Option<String> {
impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
fn as_ref(&self) -> &BackupEnvironment {
impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
fn as_ref(&self) -> &BackupEnvironment {