src/api2/admin/datastore/catar.rs: allow to configure chunk-size
This commit is contained in:
parent
c584aa21fd
commit
247cdbce72
|
@ -76,7 +76,8 @@ fn upload_catar(
|
||||||
bail!("got wrong content-type for catar archive upload");
|
bail!("got wrong content-type for catar archive upload");
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk_size = 4*1024*1024;
|
let chunk_size = param["chunk-size"].as_u64().unwrap_or(4096*1024);
|
||||||
|
verify_chunk_size(chunk_size)?;
|
||||||
|
|
||||||
let datastore = DataStore::lookup_datastore(store)?;
|
let datastore = DataStore::lookup_datastore(store)?;
|
||||||
|
|
||||||
|
@ -84,7 +85,7 @@ fn upload_catar(
|
||||||
|
|
||||||
path.push(archive_name);
|
path.push(archive_name);
|
||||||
|
|
||||||
let index = datastore.create_dynamic_writer(path, chunk_size)?;
|
let index = datastore.create_dynamic_writer(path, chunk_size as usize)?;
|
||||||
|
|
||||||
let upload = UploadCaTar { stream: req_body, index, count: 0};
|
let upload = UploadCaTar { stream: req_body, index, count: 0};
|
||||||
|
|
||||||
|
@ -112,7 +113,13 @@ pub fn api_method_upload_catar() -> ApiAsyncMethod {
|
||||||
.required("id", StringSchema::new("Backup ID."))
|
.required("id", StringSchema::new("Backup ID."))
|
||||||
.required("time", IntegerSchema::new("Backup time (Unix epoch.)")
|
.required("time", IntegerSchema::new("Backup time (Unix epoch.)")
|
||||||
.minimum(1547797308))
|
.minimum(1547797308))
|
||||||
|
.optional(
|
||||||
|
"chunk-size",
|
||||||
|
IntegerSchema::new("Chunk size in bytes. Must be a power of 2.")
|
||||||
|
.minimum(64*1024)
|
||||||
|
.maximum(4096*1024)
|
||||||
|
.default(4096*1024)
|
||||||
|
)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,16 @@ pub struct ChunkStore {
|
||||||
|
|
||||||
// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
|
// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
|
||||||
|
|
||||||
|
pub fn verify_chunk_size(size: u64) -> Result<(), Error> {
|
||||||
|
|
||||||
|
static SIZES: [u64; 7] = [64*1024, 128*1024, 256*1024, 512*1024, 1024*1024, 2048*1024, 4096*1024];
|
||||||
|
|
||||||
|
if !SIZES.contains(&size) {
|
||||||
|
bail!("Got unsupported chunk size '{}'", size);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn digest_to_prefix(digest: &[u8]) -> PathBuf {
|
fn digest_to_prefix(digest: &[u8]) -> PathBuf {
|
||||||
|
|
||||||
let mut buf = Vec::<u8>::with_capacity(2+1+2+1);
|
let mut buf = Vec::<u8>::with_capacity(2+1+2+1);
|
||||||
|
|
|
@ -329,6 +329,7 @@ pub struct DynamicIndexWriter {
|
||||||
pub uuid: [u8; 16],
|
pub uuid: [u8; 16],
|
||||||
pub ctime: u64,
|
pub ctime: u64,
|
||||||
|
|
||||||
|
chunk_count: usize,
|
||||||
chunk_offset: usize,
|
chunk_offset: usize,
|
||||||
last_chunk: usize,
|
last_chunk: usize,
|
||||||
chunk_buffer: Vec<u8>,
|
chunk_buffer: Vec<u8>,
|
||||||
|
@ -387,6 +388,7 @@ impl DynamicIndexWriter {
|
||||||
ctime,
|
ctime,
|
||||||
uuid: *uuid.as_bytes(),
|
uuid: *uuid.as_bytes(),
|
||||||
|
|
||||||
|
chunk_count: 0,
|
||||||
chunk_offset: 0,
|
chunk_offset: 0,
|
||||||
last_chunk: 0,
|
last_chunk: 0,
|
||||||
chunk_buffer: Vec::with_capacity(chunk_size*4),
|
chunk_buffer: Vec::with_capacity(chunk_size*4),
|
||||||
|
@ -405,6 +407,8 @@ impl DynamicIndexWriter {
|
||||||
|
|
||||||
self.writer.flush()?;
|
self.writer.flush()?;
|
||||||
|
|
||||||
|
let avg = ((self.chunk_offset as f64)/(self.chunk_count as f64)) as usize;
|
||||||
|
println!("Average chunk size {}", avg);
|
||||||
// fixme:
|
// fixme:
|
||||||
|
|
||||||
if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
|
if let Err(err) = std::fs::rename(&self.tmp_filename, &self.filename) {
|
||||||
|
@ -429,6 +433,8 @@ impl DynamicIndexWriter {
|
||||||
format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size)));
|
format!("wrong chunk size {} != {}", expected_chunk_size, chunk_size)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.chunk_count += 1;
|
||||||
|
|
||||||
self.last_chunk = self.chunk_offset;
|
self.last_chunk = self.chunk_offset;
|
||||||
|
|
||||||
match self.store.insert_chunk(&self.chunk_buffer) {
|
match self.store.insert_chunk(&self.chunk_buffer) {
|
||||||
|
|
|
@ -8,7 +8,7 @@ use proxmox_backup::cli::command::*;
|
||||||
use proxmox_backup::api_schema::*;
|
use proxmox_backup::api_schema::*;
|
||||||
use proxmox_backup::api_schema::router::*;
|
use proxmox_backup::api_schema::router::*;
|
||||||
use proxmox_backup::client::*;
|
use proxmox_backup::client::*;
|
||||||
//use proxmox_backup::backup::chunk_store::*;
|
use proxmox_backup::backup::*;
|
||||||
//use proxmox_backup::backup::image_index::*;
|
//use proxmox_backup::backup::image_index::*;
|
||||||
//use proxmox_backup::config::datastore;
|
//use proxmox_backup::config::datastore;
|
||||||
//use proxmox_backup::catar::encoder::*;
|
//use proxmox_backup::catar::encoder::*;
|
||||||
|
@ -18,19 +18,31 @@ use serde_json::{Value};
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
fn backup_directory(repo: &BackupRepository, body: Body, archive_name: &str) -> Result<(), Error> {
|
fn backup_directory(
|
||||||
|
repo: &BackupRepository,
|
||||||
|
body: Body,
|
||||||
|
archive_name: &str,
|
||||||
|
chunk_size: Option<u64>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
|
||||||
let client = HttpClient::new(&repo.host, &repo.user);
|
let client = HttpClient::new(&repo.host, &repo.user);
|
||||||
|
|
||||||
let epoch = std::time::SystemTime::now().duration_since(
|
let epoch = std::time::SystemTime::now().duration_since(
|
||||||
std::time::SystemTime::UNIX_EPOCH)?.as_secs();
|
std::time::SystemTime::UNIX_EPOCH)?.as_secs();
|
||||||
|
|
||||||
let query = url::form_urlencoded::Serializer::new(String::new())
|
let mut query = url::form_urlencoded::Serializer::new(String::new());
|
||||||
|
|
||||||
|
query
|
||||||
.append_pair("archive_name", archive_name)
|
.append_pair("archive_name", archive_name)
|
||||||
.append_pair("type", "host")
|
.append_pair("type", "host")
|
||||||
.append_pair("id", &tools::nodename())
|
.append_pair("id", &tools::nodename())
|
||||||
.append_pair("time", &epoch.to_string())
|
.append_pair("time", &epoch.to_string());
|
||||||
.finish();
|
|
||||||
|
if let Some(size) = chunk_size {
|
||||||
|
query.append_pair("chunk-size", &size.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
let query = query.finish();
|
||||||
|
|
||||||
let path = format!("api2/json/admin/datastore/{}/catar?{}", repo.store, query);
|
let path = format!("api2/json/admin/datastore/{}/catar?{}", repo.store, query);
|
||||||
|
|
||||||
|
@ -96,16 +108,10 @@ fn create_backup(
|
||||||
|
|
||||||
let repo = BackupRepository::parse(repo_url)?;
|
let repo = BackupRepository::parse(repo_url)?;
|
||||||
|
|
||||||
let mut _chunk_size = 4*1024*1024;
|
let chunk_size_opt = param["chunk-size"].as_u64().map(|v| v*1024);
|
||||||
|
|
||||||
if let Some(size) = param["chunk-size"].as_u64() {
|
if let Some(size) = chunk_size_opt {
|
||||||
static SIZES: [u64; 7] = [64, 128, 256, 512, 1024, 2048, 4096];
|
verify_chunk_size(size)?;
|
||||||
|
|
||||||
if SIZES.contains(&size) {
|
|
||||||
_chunk_size = (size as usize) * 1024;
|
|
||||||
} else {
|
|
||||||
bail!("Got unsupported chunk size '{}'", size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let stat = match nix::sys::stat::stat(filename) {
|
let stat = match nix::sys::stat::stat(filename) {
|
||||||
|
@ -120,7 +126,7 @@ fn create_backup(
|
||||||
|
|
||||||
let body = Body::wrap_stream(stream);
|
let body = Body::wrap_stream(stream);
|
||||||
|
|
||||||
backup_directory(&repo, body, target)?;
|
backup_directory(&repo, body, target, chunk_size_opt)?;
|
||||||
|
|
||||||
} else if (stat.st_mode & (libc::S_IFREG|libc::S_IFBLK)) != 0 {
|
} else if (stat.st_mode & (libc::S_IFREG|libc::S_IFBLK)) != 0 {
|
||||||
println!("Backup image '{}' to '{:?}'", filename, repo);
|
println!("Backup image '{}' to '{:?}'", filename, repo);
|
||||||
|
|
Loading…
Reference in New Issue