use anyhow::{bail, Error}; use std::sync::{Arc, Mutex}; use std::collections::HashMap; use serde_json::{json, Value}; use proxmox::tools::digest_to_hex; use proxmox::tools::fs::{replace_file, CreateOptions}; use proxmox::api::{RpcEnvironment, RpcEnvironmentType}; use crate::server::WorkerTask; use crate::backup::*; use crate::server::formatter::*; use hyper::{Body, Response}; struct UploadStatistic { count: u64, size: u64, compressed_size: u64, duplicates: u64, } impl UploadStatistic { fn new() -> Self { Self { count: 0, size: 0, compressed_size: 0, duplicates: 0, } } } struct DynamicWriterState { name: String, index: DynamicIndexWriter, offset: u64, chunk_count: u64, upload_stat: UploadStatistic, } struct FixedWriterState { name: String, index: FixedIndexWriter, size: usize, chunk_size: u32, chunk_count: u64, small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller) upload_stat: UploadStatistic, } struct SharedBackupState { finished: bool, uid_counter: usize, file_counter: usize, // successfully uploaded files dynamic_writers: HashMap, fixed_writers: HashMap, known_chunks: HashMap<[u8;32], u32>, } impl SharedBackupState { // Raise error if finished flag is set fn ensure_unfinished(&self) -> Result<(), Error> { if self.finished { bail!("backup already marked as finished."); } Ok(()) } // Get an unique integer ID pub fn next_uid(&mut self) -> usize { self.uid_counter += 1; self.uid_counter } } /// `RpcEnvironmet` implementation for backup service #[derive(Clone)] pub struct BackupEnvironment { env_type: RpcEnvironmentType, result_attributes: Value, user: String, pub debug: bool, pub formatter: &'static OutputFormatter, pub worker: Arc, pub datastore: Arc, pub backup_dir: BackupDir, pub last_backup: Option, state: Arc> } impl BackupEnvironment { pub fn new( env_type: RpcEnvironmentType, user: String, worker: Arc, datastore: Arc, backup_dir: BackupDir, ) -> Self { let state = SharedBackupState { finished: false, uid_counter: 0, file_counter: 0, dynamic_writers: HashMap::new(), fixed_writers: HashMap::new(), known_chunks: HashMap::new(), }; Self { result_attributes: json!({}), env_type, user, worker, datastore, debug: false, formatter: &JSON_FORMATTER, backup_dir, last_backup: None, state: Arc::new(Mutex::new(state)), } } /// Register a Chunk with associated length. /// /// We do not fully trust clients, so a client may only use registered /// chunks. Please use this method to register chunks from previous backups. pub fn register_chunk(&self, digest: [u8; 32], length: u32) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; state.known_chunks.insert(digest, length); Ok(()) } /// Register fixed length chunks after upload. /// /// Like `register_chunk()`, but additionally record statistics for /// the fixed index writer. pub fn register_fixed_chunk( &self, wid: usize, digest: [u8; 32], size: u32, compressed_size: u32, is_duplicate: bool, ) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.fixed_writers.get_mut(&wid) { Some(data) => data, None => bail!("fixed writer '{}' not registered", wid), }; if size > data.chunk_size { bail!("fixed writer '{}' - got large chunk ({} > {}", data.name, size, data.chunk_size); } else if size < data.chunk_size { data.small_chunk_count += 1; if data.small_chunk_count > 1 { bail!("fixed writer '{}' - detected multiple end chunks (chunk size too small)"); } } // record statistics data.upload_stat.count += 1; data.upload_stat.size += size as u64; data.upload_stat.compressed_size += compressed_size as u64; if is_duplicate { data.upload_stat.duplicates += 1; } // register chunk state.known_chunks.insert(digest, size); Ok(()) } /// Register dynamic length chunks after upload. /// /// Like `register_chunk()`, but additionally record statistics for /// the dynamic index writer. pub fn register_dynamic_chunk( &self, wid: usize, digest: [u8; 32], size: u32, compressed_size: u32, is_duplicate: bool, ) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.dynamic_writers.get_mut(&wid) { Some(data) => data, None => bail!("dynamic writer '{}' not registered", wid), }; // record statistics data.upload_stat.count += 1; data.upload_stat.size += size as u64; data.upload_stat.compressed_size += compressed_size as u64; if is_duplicate { data.upload_stat.duplicates += 1; } // register chunk state.known_chunks.insert(digest, size); Ok(()) } pub fn lookup_chunk(&self, digest: &[u8; 32]) -> Option { let state = self.state.lock().unwrap(); match state.known_chunks.get(digest) { Some(len) => Some(*len), None => None, } } /// Store the writer with an unique ID pub fn register_dynamic_writer(&self, index: DynamicIndexWriter, name: String) -> Result { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let uid = state.next_uid(); state.dynamic_writers.insert(uid, DynamicWriterState { index, name, offset: 0, chunk_count: 0, upload_stat: UploadStatistic::new(), }); Ok(uid) } /// Store the writer with an unique ID pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let uid = state.next_uid(); state.fixed_writers.insert(uid, FixedWriterState { index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), }); Ok(uid) } /// Append chunk to dynamic writer pub fn dynamic_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.dynamic_writers.get_mut(&wid) { Some(data) => data, None => bail!("dynamic writer '{}' not registered", wid), }; if data.offset != offset { bail!("dynamic writer '{}' append chunk failed - got strange chunk offset ({} != {})", data.name, data.offset, offset); } data.offset += size as u64; data.chunk_count += 1; data.index.add_chunk(data.offset, digest)?; Ok(()) } /// Append chunk to fixed writer pub fn fixed_writer_append_chunk(&self, wid: usize, offset: u64, size: u32, digest: &[u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.fixed_writers.get_mut(&wid) { Some(data) => data, None => bail!("fixed writer '{}' not registered", wid), }; let end = (offset as usize) + (size as usize); let idx = data.index.check_chunk_alignment(end, size as usize)?; data.chunk_count += 1; data.index.add_digest(idx, digest)?; Ok(()) } fn log_upload_stat(&self, archive_name: &str, csum: &[u8; 32], uuid: &[u8; 16], size: u64, chunk_count: u64, upload_stat: &UploadStatistic) { self.log(format!("Upload statistics for '{}'", archive_name)); self.log(format!("UUID: {}", digest_to_hex(uuid))); self.log(format!("Checksum: {}", digest_to_hex(csum))); self.log(format!("Size: {}", size)); self.log(format!("Chunk count: {}", chunk_count)); if size == 0 || chunk_count == 0 { return; } self.log(format!("Upload size: {} ({}%)", upload_stat.size, (upload_stat.size*100)/size)); // account for zero chunk, which might be uploaded but never used let client_side_duplicates = if chunk_count < upload_stat.count { 0 } else { chunk_count - upload_stat.count }; let server_side_duplicates = upload_stat.duplicates; if (client_side_duplicates + server_side_duplicates) > 0 { let per = (client_side_duplicates + server_side_duplicates)*100/chunk_count; self.log(format!("Duplicates: {}+{} ({}%)", client_side_duplicates, server_side_duplicates, per)); } if upload_stat.size > 0 { self.log(format!("Compression: {}%", (upload_stat.compressed_size*100)/upload_stat.size)); } } /// Close dynamic writer pub fn dynamic_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.dynamic_writers.remove(&wid) { Some(data) => data, None => bail!("dynamic writer '{}' not registered", wid), }; if data.chunk_count != chunk_count { bail!("dynamic writer '{}' close failed - unexpected chunk count ({} != {})", data.name, data.chunk_count, chunk_count); } if data.offset != size { bail!("dynamic writer '{}' close failed - unexpected file size ({} != {})", data.name, data.offset, size); } let uuid = data.index.uuid; let expected_csum = data.index.close()?; println!("server checksum {:?} client: {:?}", expected_csum, csum); if csum != expected_csum { bail!("dynamic writer '{}' close failed - got unexpected checksum", data.name); } self.log_upload_stat(&data.name, &csum, &uuid, size, chunk_count, &data.upload_stat); state.file_counter += 1; Ok(()) } /// Close fixed writer pub fn fixed_writer_close(&self, wid: usize, chunk_count: u64, size: u64, csum: [u8; 32]) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.ensure_unfinished()?; let mut data = match state.fixed_writers.remove(&wid) { Some(data) => data, None => bail!("fixed writer '{}' not registered", wid), }; if data.chunk_count != chunk_count { bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count); } let expected_count = data.index.index_length(); if chunk_count != (expected_count as u64) { bail!("fixed writer '{}' close failed - unexpected chunk count ({} != {})", data.name, expected_count, chunk_count); } if size != (data.size as u64) { bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size); } let uuid = data.index.uuid; let expected_csum = data.index.close()?; println!("server checksum {:?} client: {:?}", expected_csum, csum); if csum != expected_csum { bail!("fixed writer '{}' close failed - got unexpected checksum", data.name); } self.log_upload_stat(&data.name, &expected_csum, &uuid, size, chunk_count, &data.upload_stat); state.file_counter += 1; Ok(()) } pub fn add_blob(&self, file_name: &str, data: Vec) -> Result<(), Error> { let mut path = self.datastore.base_path(); path.push(self.backup_dir.relative_path()); path.push(file_name); let blob_len = data.len(); let orig_len = data.len(); // fixme: let blob = DataBlob::from_raw(data)?; // always verify CRC at server side blob.verify_crc()?; let raw_data = blob.raw_data(); replace_file(&path, raw_data, CreateOptions::new())?; self.log(format!("add blob {:?} ({} bytes, comp: {})", path, orig_len, blob_len)); let mut state = self.state.lock().unwrap(); state.file_counter += 1; Ok(()) } /// Mark backup as finished pub fn finish_backup(&self) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); // test if all writer are correctly closed state.ensure_unfinished()?; if state.dynamic_writers.len() != 0 { bail!("found open index writer - unable to finish backup"); } if state.file_counter == 0 { bail!("backup does not contain valid files (file count == 0)"); } state.finished = true; Ok(()) } pub fn log>(&self, msg: S) { self.worker.log(msg); } pub fn debug>(&self, msg: S) { if self.debug { self.worker.log(msg); } } pub fn format_response(&self, result: Result) -> Response { match result { Ok(data) => (self.formatter.format_data)(data, self), Err(err) => (self.formatter.format_error)(err), } } /// Raise error if finished flag is not set pub fn ensure_finished(&self) -> Result<(), Error> { let state = self.state.lock().unwrap(); if !state.finished { bail!("backup ended but finished flag is not set."); } Ok(()) } /// Remove complete backup pub fn remove_backup(&self) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); state.finished = true; self.datastore.remove_backup_dir(&self.backup_dir)?; Ok(()) } } impl RpcEnvironment for BackupEnvironment { fn result_attrib_mut(&mut self) -> &mut Value { &mut self.result_attributes } fn result_attrib(&self) -> &Value { &self.result_attributes } fn env_type(&self) -> RpcEnvironmentType { self.env_type } fn set_user(&mut self, _user: Option) { panic!("unable to change user"); } fn get_user(&self) -> Option { Some(self.user.clone()) } } impl AsRef for dyn RpcEnvironment { fn as_ref(&self) -> &BackupEnvironment { self.as_any().downcast_ref::().unwrap() } } impl AsRef for Box { fn as_ref(&self) -> &BackupEnvironment { self.as_any().downcast_ref::().unwrap() } }