add incremental backup support
To support incremental backups (where not all chunks are sent to the server), a new parameter "reuse-csum" is introduced on the "create_fixed_index" API call. When set and equal to last backups' checksum, the backup writer clones the data from the last index of this archive file, and only updates chunks it actually receives. In incremental mode some checks usually done on closing an index cannot be made, since they would be inaccurate. Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
This commit is contained in:
parent
21302088de
commit
facd9801cf
@ -284,6 +284,8 @@ pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
|
|||||||
.minimum(1)
|
.minimum(1)
|
||||||
.schema()
|
.schema()
|
||||||
),
|
),
|
||||||
|
("reuse-csum", true, &StringSchema::new("If set, compare last backup's \
|
||||||
|
csum and reuse index for incremental backup if it matches.").schema()),
|
||||||
]),
|
]),
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@ -298,6 +300,7 @@ fn create_fixed_index(
|
|||||||
|
|
||||||
let name = tools::required_string_param(¶m, "archive-name")?.to_owned();
|
let name = tools::required_string_param(¶m, "archive-name")?.to_owned();
|
||||||
let size = tools::required_integer_param(¶m, "size")? as usize;
|
let size = tools::required_integer_param(¶m, "size")? as usize;
|
||||||
|
let reuse_csum = param["reuse-csum"].as_str();
|
||||||
|
|
||||||
let archive_name = name.clone();
|
let archive_name = name.clone();
|
||||||
if !archive_name.ends_with(".fidx") {
|
if !archive_name.ends_with(".fidx") {
|
||||||
@ -305,12 +308,49 @@ fn create_fixed_index(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut path = env.backup_dir.relative_path();
|
let mut path = env.backup_dir.relative_path();
|
||||||
path.push(archive_name);
|
path.push(&archive_name);
|
||||||
|
|
||||||
let chunk_size = 4096*1024; // todo: ??
|
let chunk_size = 4096*1024; // todo: ??
|
||||||
|
|
||||||
let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
|
// do incremental backup if csum is set
|
||||||
let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
|
let mut reader = None;
|
||||||
|
let mut incremental = false;
|
||||||
|
if let Some(csum) = reuse_csum {
|
||||||
|
incremental = true;
|
||||||
|
let last_backup = match &env.last_backup {
|
||||||
|
Some(info) => info,
|
||||||
|
None => {
|
||||||
|
bail!("cannot reuse index - no previous backup exists");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut last_path = last_backup.backup_dir.relative_path();
|
||||||
|
last_path.push(&archive_name);
|
||||||
|
|
||||||
|
let index = match env.datastore.open_fixed_reader(last_path) {
|
||||||
|
Ok(index) => index,
|
||||||
|
Err(_) => {
|
||||||
|
bail!("cannot reuse index - no previous backup exists for archive");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (old_csum, _) = index.compute_csum();
|
||||||
|
let old_csum = proxmox::tools::digest_to_hex(&old_csum);
|
||||||
|
if old_csum != csum {
|
||||||
|
bail!("expected csum ({}) doesn't match last backup's ({}), cannot do incremental backup",
|
||||||
|
csum, old_csum);
|
||||||
|
}
|
||||||
|
|
||||||
|
reader = Some(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut writer = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
|
||||||
|
|
||||||
|
if let Some(reader) = reader {
|
||||||
|
writer.clone_data_from(&reader)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let wid = env.register_fixed_writer(writer, name, size, chunk_size as u32, incremental)?;
|
||||||
|
|
||||||
env.log(format!("created new fixed index {} ({:?})", wid, path));
|
env.log(format!("created new fixed index {} ({:?})", wid, path));
|
||||||
|
|
||||||
@ -518,15 +558,15 @@ pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
|
|||||||
(
|
(
|
||||||
"chunk-count",
|
"chunk-count",
|
||||||
false,
|
false,
|
||||||
&IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
|
&IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks. Ignored for incremental backups.")
|
||||||
.minimum(1)
|
.minimum(0)
|
||||||
.schema()
|
.schema()
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"size",
|
"size",
|
||||||
false,
|
false,
|
||||||
&IntegerSchema::new("File size. This is used to verify that the server got all data.")
|
&IntegerSchema::new("File size. This is used to verify that the server got all data. Ignored for incremental backups.")
|
||||||
.minimum(1)
|
.minimum(0)
|
||||||
.schema()
|
.schema()
|
||||||
),
|
),
|
||||||
("csum", false, &StringSchema::new("Digest list checksum.").schema()),
|
("csum", false, &StringSchema::new("Digest list checksum.").schema()),
|
||||||
|
@ -47,6 +47,7 @@ struct FixedWriterState {
|
|||||||
chunk_count: u64,
|
chunk_count: u64,
|
||||||
small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
|
small_chunk_count: usize, // allow 0..1 small chunks (last chunk may be smaller)
|
||||||
upload_stat: UploadStatistic,
|
upload_stat: UploadStatistic,
|
||||||
|
incremental: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SharedBackupState {
|
struct SharedBackupState {
|
||||||
@ -237,7 +238,7 @@ impl BackupEnvironment {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Store the writer with an unique ID
|
/// Store the writer with an unique ID
|
||||||
pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32) -> Result<usize, Error> {
|
pub fn register_fixed_writer(&self, index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, incremental: bool) -> Result<usize, Error> {
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
state.ensure_unfinished()?;
|
state.ensure_unfinished()?;
|
||||||
@ -245,7 +246,7 @@ impl BackupEnvironment {
|
|||||||
let uid = state.next_uid();
|
let uid = state.next_uid();
|
||||||
|
|
||||||
state.fixed_writers.insert(uid, FixedWriterState {
|
state.fixed_writers.insert(uid, FixedWriterState {
|
||||||
index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(),
|
index, name, chunk_count: 0, size, chunk_size, small_chunk_count: 0, upload_stat: UploadStatistic::new(), incremental,
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(uid)
|
Ok(uid)
|
||||||
@ -379,6 +380,7 @@ impl BackupEnvironment {
|
|||||||
bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
|
bail!("fixed writer '{}' close failed - received wrong number of chunk ({} != {})", data.name, data.chunk_count, chunk_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !data.incremental {
|
||||||
let expected_count = data.index.index_length();
|
let expected_count = data.index.index_length();
|
||||||
|
|
||||||
if chunk_count != (expected_count as u64) {
|
if chunk_count != (expected_count as u64) {
|
||||||
@ -388,12 +390,12 @@ impl BackupEnvironment {
|
|||||||
if size != (data.size as u64) {
|
if size != (data.size as u64) {
|
||||||
bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
|
bail!("fixed writer '{}' close failed - unexpected file size ({} != {})", data.name, data.size, size);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let uuid = data.index.uuid;
|
let uuid = data.index.uuid;
|
||||||
|
|
||||||
let expected_csum = data.index.close()?;
|
let expected_csum = data.index.close()?;
|
||||||
|
|
||||||
println!("server checksum {:?} client: {:?}", expected_csum, csum);
|
println!("server checksum: {:?} client: {:?} (incremental: {})", expected_csum, csum, data.incremental);
|
||||||
if csum != expected_csum {
|
if csum != expected_csum {
|
||||||
bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
|
bail!("fixed writer '{}' close failed - got unexpected checksum", data.name);
|
||||||
}
|
}
|
||||||
|
@ -467,6 +467,18 @@ impl FixedIndexWriter {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn clone_data_from(&mut self, reader: &FixedIndexReader) -> Result<(), Error> {
|
||||||
|
if self.index_length != reader.index_count() {
|
||||||
|
bail!("clone_data_from failed - index sizes not equal");
|
||||||
|
}
|
||||||
|
|
||||||
|
for i in 0..self.index_length {
|
||||||
|
self.add_digest(i, reader.index_digest(i).unwrap())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BufferedFixedReader<S> {
|
pub struct BufferedFixedReader<S> {
|
||||||
|
Loading…
Reference in New Issue
Block a user