Compare commits
31 Commits
Author | SHA1 | Date | |
---|---|---|---|
0d5ab04a90 | |||
4059285649 | |||
2e079b8bf2 | |||
4ff2c9b832 | |||
a8e2940ff3 | |||
d5d5f2174e | |||
2311238450 | |||
2ea501ffdf | |||
4eb4e94918 | |||
817bcda848 | |||
f6de2c7359 | |||
3f0b9c10ec | |||
2b66abbfab | |||
402c8861d8 | |||
3f683799a8 | |||
573bcd9a92 | |||
90779237ae | |||
1f82f9b7b5 | |||
19b5c3c43e | |||
fe3e65c3ea | |||
fdaab0df4e | |||
b957aa81bd | |||
8ea00f6e49 | |||
4bd789b0fa | |||
2f050cf2ed | |||
e22f4882e7 | |||
c65bc99a41 | |||
355c055e81 | |||
c2009e5309 | |||
23f74c190e | |||
a6f8728339 |
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "proxmox-backup"
|
||||
version = "0.3.0"
|
||||
version = "0.5.0"
|
||||
authors = ["Dietmar Maurer <dietmar@proxmox.com>"]
|
||||
edition = "2018"
|
||||
license = "AGPL-3"
|
||||
@ -38,11 +38,11 @@ pam-sys = "0.5"
|
||||
percent-encoding = "2.1"
|
||||
pin-utils = "0.1.0"
|
||||
pathpatterns = "0.1.1"
|
||||
proxmox = { version = "0.1.40", features = [ "sortable-macro", "api-macro" ] }
|
||||
proxmox = { version = "0.1.41", features = [ "sortable-macro", "api-macro" ] }
|
||||
#proxmox = { git = "ssh://gitolite3@proxdev.maurer-it.com/rust/proxmox", version = "0.1.2", features = [ "sortable-macro", "api-macro" ] }
|
||||
#proxmox = { path = "../proxmox/proxmox", features = [ "sortable-macro", "api-macro" ] }
|
||||
proxmox-fuse = "0.1.0"
|
||||
pxar = { version = "0.1.8", features = [ "tokio-io", "futures-io" ] }
|
||||
pxar = { version = "0.2.0", features = [ "tokio-io", "futures-io" ] }
|
||||
#pxar = { path = "../pxar", features = [ "tokio-io", "futures-io" ] }
|
||||
regex = "1.2"
|
||||
rustyline = "6"
|
||||
|
2
TODO.rst
2
TODO.rst
@ -30,8 +30,6 @@ Chores:
|
||||
|
||||
* move tools/xattr.rs and tools/acl.rs to proxmox/sys/linux/
|
||||
|
||||
* recompute PXAR_ header types from strings: avoid using numbers from casync
|
||||
|
||||
* remove pbs-* systemd timers and services on package purge
|
||||
|
||||
|
||||
|
20
debian/changelog
vendored
20
debian/changelog
vendored
@ -1,6 +1,24 @@
|
||||
rust-proxmox-backup (0.5.0-1) unstable; urgency=medium
|
||||
|
||||
* partially revert commit 1f82f9b7b5d231da22a541432d5617cb303c0000
|
||||
|
||||
* ui: allow to Forget (delete) backup snapshots
|
||||
|
||||
* pxar: deal with files changing size during archiving
|
||||
|
||||
-- Proxmox Support Team <support@proxmox.com> Mon, 29 Jun 2020 13:00:54 +0200
|
||||
|
||||
rust-proxmox-backup (0.4.0-1) unstable; urgency=medium
|
||||
|
||||
* change api for incremental backups mode
|
||||
|
||||
* zfs disk management gui
|
||||
|
||||
-- Proxmox Support Team <support@proxmox.com> Fri, 26 Jun 2020 10:43:27 +0200
|
||||
|
||||
rust-proxmox-backup (0.3.0-1) unstable; urgency=medium
|
||||
|
||||
* support incrtemental backups mode
|
||||
* support incremental backups mode
|
||||
|
||||
* new disk management
|
||||
|
||||
|
@ -17,7 +17,7 @@ async fn upload_speed() -> Result<usize, Error> {
|
||||
|
||||
let backup_time = chrono::Utc::now();
|
||||
|
||||
let client = BackupWriter::start(client, datastore, "host", "speedtest", backup_time, false).await?;
|
||||
let client = BackupWriter::start(client, None, datastore, "host", "speedtest", backup_time, false).await?;
|
||||
|
||||
println!("start upload speed test");
|
||||
let res = client.upload_speedtest().await?;
|
||||
|
@ -9,6 +9,7 @@ pub mod status;
|
||||
pub mod types;
|
||||
pub mod version;
|
||||
pub mod pull;
|
||||
mod helpers;
|
||||
|
||||
use proxmox::api::router::SubdirMap;
|
||||
use proxmox::api::Router;
|
||||
|
@ -394,6 +394,90 @@ pub fn status(
|
||||
crate::tools::disks::disk_usage(&datastore.base_path())
|
||||
}
|
||||
|
||||
#[api(
|
||||
input: {
|
||||
properties: {
|
||||
store: {
|
||||
schema: DATASTORE_SCHEMA,
|
||||
},
|
||||
"backup-type": {
|
||||
schema: BACKUP_TYPE_SCHEMA,
|
||||
optional: true,
|
||||
},
|
||||
"backup-id": {
|
||||
schema: BACKUP_ID_SCHEMA,
|
||||
optional: true,
|
||||
},
|
||||
"backup-time": {
|
||||
schema: BACKUP_TIME_SCHEMA,
|
||||
optional: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
returns: {
|
||||
schema: UPID_SCHEMA,
|
||||
},
|
||||
access: {
|
||||
permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP, true), // fixme
|
||||
},
|
||||
)]
|
||||
/// Verify backups.
|
||||
///
|
||||
/// This function can verify a single backup snapshot, all backup from a backup group,
|
||||
/// or all backups in the datastore.
|
||||
pub fn verify(
|
||||
store: String,
|
||||
backup_type: Option<String>,
|
||||
backup_id: Option<String>,
|
||||
backup_time: Option<i64>,
|
||||
rpcenv: &mut dyn RpcEnvironment,
|
||||
) -> Result<Value, Error> {
|
||||
let datastore = DataStore::lookup_datastore(&store)?;
|
||||
|
||||
let worker_id;
|
||||
|
||||
let mut backup_dir = None;
|
||||
let mut backup_group = None;
|
||||
|
||||
match (backup_type, backup_id, backup_time) {
|
||||
(Some(backup_type), Some(backup_id), Some(backup_time)) => {
|
||||
let dir = BackupDir::new(backup_type, backup_id, backup_time);
|
||||
worker_id = format!("{}_{}", store, dir);
|
||||
backup_dir = Some(dir);
|
||||
}
|
||||
(Some(backup_type), Some(backup_id), None) => {
|
||||
let group = BackupGroup::new(backup_type, backup_id);
|
||||
worker_id = format!("{}_{}", store, group);
|
||||
backup_group = Some(group);
|
||||
}
|
||||
(None, None, None) => {
|
||||
worker_id = store.clone();
|
||||
}
|
||||
_ => bail!("parameters do not spefify a backup group or snapshot"),
|
||||
}
|
||||
|
||||
let username = rpcenv.get_user().unwrap();
|
||||
let to_stdout = if rpcenv.env_type() == RpcEnvironmentType::CLI { true } else { false };
|
||||
|
||||
let upid_str = WorkerTask::new_thread(
|
||||
"verify", Some(worker_id.clone()), &username, to_stdout, move |worker|
|
||||
{
|
||||
let success = if let Some(backup_dir) = backup_dir {
|
||||
verify_backup_dir(&datastore, &backup_dir, &worker)?
|
||||
} else if let Some(backup_group) = backup_group {
|
||||
verify_backup_group(&datastore, &backup_group, &worker)?
|
||||
} else {
|
||||
verify_all_backups(&datastore, &worker)?
|
||||
};
|
||||
if !success {
|
||||
bail!("verfication failed - please check the log for details");
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(json!(upid_str))
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! add_common_prune_prameters {
|
||||
( [ $( $list1:tt )* ] ) => {
|
||||
@ -1261,6 +1345,11 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
|
||||
&Router::new()
|
||||
.upload(&API_METHOD_UPLOAD_BACKUP_LOG)
|
||||
),
|
||||
(
|
||||
"verify",
|
||||
&Router::new()
|
||||
.post(&API_METHOD_VERIFY)
|
||||
),
|
||||
];
|
||||
|
||||
const DATASTORE_INFO_ROUTER: Router = Router::new()
|
||||
|
@ -10,7 +10,7 @@ use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironm
|
||||
use proxmox::api::router::SubdirMap;
|
||||
use proxmox::api::schema::*;
|
||||
|
||||
use crate::tools::{self, WrappedReaderStream};
|
||||
use crate::tools;
|
||||
use crate::server::{WorkerTask, H2Service};
|
||||
use crate::backup::*;
|
||||
use crate::api2::types::*;
|
||||
@ -199,7 +199,6 @@ pub const BACKUP_API_SUBDIRS: SubdirMap = &[
|
||||
),
|
||||
(
|
||||
"dynamic_index", &Router::new()
|
||||
.download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
|
||||
.post(&API_METHOD_CREATE_DYNAMIC_INDEX)
|
||||
.put(&API_METHOD_DYNAMIC_APPEND)
|
||||
),
|
||||
@ -222,10 +221,13 @@ pub const BACKUP_API_SUBDIRS: SubdirMap = &[
|
||||
),
|
||||
(
|
||||
"fixed_index", &Router::new()
|
||||
.download(&API_METHOD_FIXED_CHUNK_INDEX)
|
||||
.post(&API_METHOD_CREATE_FIXED_INDEX)
|
||||
.put(&API_METHOD_FIXED_APPEND)
|
||||
),
|
||||
(
|
||||
"previous", &Router::new()
|
||||
.download(&API_METHOD_DOWNLOAD_PREVIOUS)
|
||||
),
|
||||
(
|
||||
"speedtest", &Router::new()
|
||||
.upload(&API_METHOD_UPLOAD_SPEEDTEST)
|
||||
@ -610,20 +612,17 @@ fn finish_backup (
|
||||
}
|
||||
|
||||
#[sortable]
|
||||
pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
|
||||
&ApiHandler::AsyncHttp(&dynamic_chunk_index),
|
||||
pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
|
||||
&ApiHandler::AsyncHttp(&download_previous),
|
||||
&ObjectSchema::new(
|
||||
r###"
|
||||
Download the dynamic chunk index from the previous backup.
|
||||
Simply returns an empty list if this is the first backup.
|
||||
"### ,
|
||||
"Download archive from previous backup.",
|
||||
&sorted!([
|
||||
("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
|
||||
]),
|
||||
)
|
||||
);
|
||||
|
||||
fn dynamic_chunk_index(
|
||||
fn download_previous(
|
||||
_parts: Parts,
|
||||
_req_body: Body,
|
||||
param: Value,
|
||||
@ -636,130 +635,38 @@ fn dynamic_chunk_index(
|
||||
|
||||
let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
|
||||
|
||||
if !archive_name.ends_with(".didx") {
|
||||
bail!("wrong archive extension: '{}'", archive_name);
|
||||
}
|
||||
|
||||
let empty_response = {
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::empty())?
|
||||
};
|
||||
|
||||
let last_backup = match &env.last_backup {
|
||||
Some(info) => info,
|
||||
None => return Ok(empty_response),
|
||||
None => bail!("no previous backup"),
|
||||
};
|
||||
|
||||
let mut path = last_backup.backup_dir.relative_path();
|
||||
let mut path = env.datastore.snapshot_path(&last_backup.backup_dir);
|
||||
path.push(&archive_name);
|
||||
|
||||
let index = match env.datastore.open_dynamic_reader(path) {
|
||||
Ok(index) => index,
|
||||
Err(_) => {
|
||||
env.log(format!("there is no last backup for archive '{}'", archive_name));
|
||||
return Ok(empty_response);
|
||||
{
|
||||
let index: Option<Box<dyn IndexFile>> = match archive_type(&archive_name)? {
|
||||
ArchiveType::FixedIndex => {
|
||||
let index = env.datastore.open_fixed_reader(&path)?;
|
||||
Some(Box::new(index))
|
||||
}
|
||||
ArchiveType::DynamicIndex => {
|
||||
let index = env.datastore.open_dynamic_reader(&path)?;
|
||||
Some(Box::new(index))
|
||||
}
|
||||
_ => { None }
|
||||
};
|
||||
if let Some(index) = index {
|
||||
env.log(format!("register chunks in '{}' from previous backup.", archive_name));
|
||||
|
||||
env.log(format!("download last backup index for archive '{}'", archive_name));
|
||||
|
||||
let count = index.index_count();
|
||||
for pos in 0..count {
|
||||
let info = index.chunk_info(pos)?;
|
||||
let size = info.size() as u32;
|
||||
env.register_chunk(info.digest, size)?;
|
||||
for pos in 0..index.index_count() {
|
||||
let info = index.chunk_info(pos).unwrap();
|
||||
let size = info.range.end - info.range.start;
|
||||
env.register_chunk(info.digest, size as u32)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let reader = DigestListEncoder::new(Box::new(index));
|
||||
|
||||
let stream = WrappedReaderStream::new(reader);
|
||||
|
||||
// fixme: set size, content type?
|
||||
let response = http::Response::builder()
|
||||
.status(200)
|
||||
.body(Body::wrap_stream(stream))?;
|
||||
|
||||
Ok(response)
|
||||
}.boxed()
|
||||
}
|
||||
|
||||
#[sortable]
|
||||
pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
|
||||
&ApiHandler::AsyncHttp(&fixed_chunk_index),
|
||||
&ObjectSchema::new(
|
||||
r###"
|
||||
Download the fixed chunk index from the previous backup.
|
||||
Simply returns an empty list if this is the first backup.
|
||||
"### ,
|
||||
&sorted!([
|
||||
("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
|
||||
]),
|
||||
)
|
||||
);
|
||||
|
||||
fn fixed_chunk_index(
|
||||
_parts: Parts,
|
||||
_req_body: Body,
|
||||
param: Value,
|
||||
_info: &ApiMethod,
|
||||
rpcenv: Box<dyn RpcEnvironment>,
|
||||
) -> ApiResponseFuture {
|
||||
|
||||
async move {
|
||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||
|
||||
let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned();
|
||||
|
||||
if !archive_name.ends_with(".fidx") {
|
||||
bail!("wrong archive extension: '{}'", archive_name);
|
||||
}
|
||||
|
||||
let empty_response = {
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::empty())?
|
||||
};
|
||||
|
||||
let last_backup = match &env.last_backup {
|
||||
Some(info) => info,
|
||||
None => return Ok(empty_response),
|
||||
};
|
||||
|
||||
let mut path = last_backup.backup_dir.relative_path();
|
||||
path.push(&archive_name);
|
||||
|
||||
let index = match env.datastore.open_fixed_reader(path) {
|
||||
Ok(index) => index,
|
||||
Err(_) => {
|
||||
env.log(format!("there is no last backup for archive '{}'", archive_name));
|
||||
return Ok(empty_response);
|
||||
}
|
||||
};
|
||||
|
||||
env.log(format!("download last backup index for archive '{}'", archive_name));
|
||||
|
||||
let count = index.index_count();
|
||||
let image_size = index.index_bytes();
|
||||
for pos in 0..count {
|
||||
let digest = index.index_digest(pos).unwrap();
|
||||
// Note: last chunk can be smaller
|
||||
let start = (pos*index.chunk_size) as u64;
|
||||
let mut end = start + index.chunk_size as u64;
|
||||
if end > image_size { end = image_size; }
|
||||
let size = (end - start) as u32;
|
||||
env.register_chunk(*digest, size)?;
|
||||
}
|
||||
|
||||
let reader = DigestListEncoder::new(Box::new(index));
|
||||
|
||||
let stream = WrappedReaderStream::new(reader);
|
||||
|
||||
// fixme: set size, content type?
|
||||
let response = http::Response::builder()
|
||||
.status(200)
|
||||
.body(Body::wrap_stream(stream))?;
|
||||
|
||||
Ok(response)
|
||||
env.log(format!("download '{}' from previous backup.", archive_name));
|
||||
crate::api2::helpers::create_download_response(path).await
|
||||
}.boxed()
|
||||
}
|
||||
|
23
src/api2/helpers.rs
Normal file
23
src/api2/helpers.rs
Normal file
@ -0,0 +1,23 @@
|
||||
use std::path::PathBuf;
|
||||
use anyhow::Error;
|
||||
use futures::*;
|
||||
use hyper::{Body, Response, StatusCode, header};
|
||||
use proxmox::http_err;
|
||||
|
||||
pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
|
||||
let file = tokio::fs::File::open(path.clone())
|
||||
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path.clone(), err)))
|
||||
.await?;
|
||||
|
||||
let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
|
||||
.map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
|
||||
|
||||
let body = Body::wrap_stream(payload);
|
||||
|
||||
// fixme: set other headers ?
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(body)
|
||||
.unwrap())
|
||||
}
|
@ -17,6 +17,7 @@ use crate::server::{WorkerTask, H2Service};
|
||||
use crate::tools;
|
||||
use crate::config::acl::PRIV_DATASTORE_READ;
|
||||
use crate::config::cached_user_info::CachedUserInfo;
|
||||
use crate::api2::helpers;
|
||||
|
||||
mod environment;
|
||||
use environment::*;
|
||||
@ -187,26 +188,9 @@ fn download_file(
|
||||
path.push(env.backup_dir.relative_path());
|
||||
path.push(&file_name);
|
||||
|
||||
let path2 = path.clone();
|
||||
let path3 = path.clone();
|
||||
env.log(format!("download {:?}", path.clone()));
|
||||
|
||||
let file = tokio::fs::File::open(path)
|
||||
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
|
||||
.await?;
|
||||
|
||||
env.log(format!("download {:?}", path3));
|
||||
|
||||
let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
|
||||
.map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
|
||||
|
||||
let body = Body::wrap_stream(payload);
|
||||
|
||||
// fixme: set other headers ?
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(body)
|
||||
.unwrap())
|
||||
helpers::create_download_response(path).await
|
||||
}.boxed()
|
||||
}
|
||||
|
||||
|
@ -198,6 +198,9 @@ pub use prune::*;
|
||||
mod datastore;
|
||||
pub use datastore::*;
|
||||
|
||||
mod verify;
|
||||
pub use verify::*;
|
||||
|
||||
mod catalog_shell;
|
||||
pub use catalog_shell::*;
|
||||
|
||||
|
@ -141,6 +141,14 @@ impl BackupGroup {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BackupGroup {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let backup_type = self.backup_type();
|
||||
let id = self.backup_id();
|
||||
write!(f, "{}/{}", backup_type, id)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for BackupGroup {
|
||||
type Err = Error;
|
||||
|
||||
|
@ -4,7 +4,7 @@ use std::ops::Range;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::task::Context;
|
||||
use std::pin::Pin;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
@ -13,6 +13,7 @@ use proxmox::tools::io::ReadExt;
|
||||
use proxmox::tools::uuid::Uuid;
|
||||
use proxmox::tools::vec;
|
||||
use proxmox::tools::mmap::Mmap;
|
||||
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
|
||||
|
||||
use super::chunk_stat::ChunkStat;
|
||||
use super::chunk_store::ChunkStore;
|
||||
@ -123,25 +124,6 @@ impl DynamicIndexReader {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::cast_ptr_alignment)]
|
||||
pub fn chunk_info(&self, pos: usize) -> Result<ChunkReadInfo, Error> {
|
||||
if pos >= self.index.len() {
|
||||
bail!("chunk index out of range");
|
||||
}
|
||||
let start = if pos == 0 {
|
||||
0
|
||||
} else {
|
||||
self.index[pos - 1].end()
|
||||
};
|
||||
|
||||
let end = self.index[pos].end();
|
||||
|
||||
Ok(ChunkReadInfo {
|
||||
range: start..end,
|
||||
digest: self.index[pos].digest.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[allow(clippy::cast_ptr_alignment)]
|
||||
fn chunk_end(&self, pos: usize) -> u64 {
|
||||
@ -159,24 +141,6 @@ impl DynamicIndexReader {
|
||||
&self.index[pos].digest
|
||||
}
|
||||
|
||||
/// Compute checksum and data size
|
||||
pub fn compute_csum(&self) -> ([u8; 32], u64) {
|
||||
let mut csum = openssl::sha::Sha256::new();
|
||||
for entry in &self.index {
|
||||
csum.update(&entry.end_le.to_ne_bytes());
|
||||
csum.update(&entry.digest);
|
||||
}
|
||||
let csum = csum.finish();
|
||||
|
||||
(
|
||||
csum,
|
||||
self.index
|
||||
.last()
|
||||
.map(|entry| entry.end())
|
||||
.unwrap_or(0)
|
||||
)
|
||||
}
|
||||
|
||||
// TODO: can we use std::slice::binary_search with Mmap now?
|
||||
fn binary_search(
|
||||
&self,
|
||||
@ -224,6 +188,34 @@ impl IndexFile for DynamicIndexReader {
|
||||
self.chunk_end(self.index.len() - 1)
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_csum(&self) -> ([u8; 32], u64) {
|
||||
let mut csum = openssl::sha::Sha256::new();
|
||||
let mut chunk_end = 0;
|
||||
for pos in 0..self.index_count() {
|
||||
let info = self.chunk_info(pos).unwrap();
|
||||
chunk_end = info.range.end;
|
||||
csum.update(&chunk_end.to_le_bytes());
|
||||
csum.update(&info.digest);
|
||||
}
|
||||
let csum = csum.finish();
|
||||
(csum, chunk_end)
|
||||
}
|
||||
|
||||
#[allow(clippy::cast_ptr_alignment)]
|
||||
fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> {
|
||||
if pos >= self.index.len() {
|
||||
return None;
|
||||
}
|
||||
let start = if pos == 0 { 0 } else { self.index[pos - 1].end() };
|
||||
|
||||
let end = self.index[pos].end();
|
||||
|
||||
Some(ChunkReadInfo {
|
||||
range: start..end,
|
||||
digest: self.index[pos].digest.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct CachedChunk {
|
||||
@ -263,7 +255,10 @@ struct ChunkCacher<'a, S> {
|
||||
|
||||
impl<'a, S: ReadChunk> crate::tools::lru_cache::Cacher<usize, CachedChunk> for ChunkCacher<'a, S> {
|
||||
fn fetch(&mut self, index: usize) -> Result<Option<CachedChunk>, Error> {
|
||||
let info = self.index.chunk_info(index)?;
|
||||
let info = match self.index.chunk_info(index) {
|
||||
Some(info) => info,
|
||||
None => bail!("chunk index out of range"),
|
||||
};
|
||||
let range = info.range;
|
||||
let data = self.store.read_chunk(&info.digest)?;
|
||||
CachedChunk::new(range, data).map(Some)
|
||||
@ -416,19 +411,26 @@ impl<R: ReadChunk> LocalDynamicReadAt<R> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: ReadChunk> pxar::accessor::ReadAt for LocalDynamicReadAt<R> {
|
||||
fn poll_read_at(
|
||||
self: Pin<&Self>,
|
||||
impl<R: ReadChunk> ReadAt for LocalDynamicReadAt<R> {
|
||||
fn start_read_at<'a>(
|
||||
self: Pin<&'a Self>,
|
||||
_cx: &mut Context,
|
||||
buf: &mut [u8],
|
||||
buf: &'a mut [u8],
|
||||
offset: u64,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
|
||||
use std::io::Read;
|
||||
tokio::task::block_in_place(move || {
|
||||
MaybeReady::Ready(tokio::task::block_in_place(move || {
|
||||
let mut reader = self.inner.lock().unwrap();
|
||||
reader.seek(SeekFrom::Start(offset))?;
|
||||
Poll::Ready(Ok(reader.read(buf)?))
|
||||
})
|
||||
Ok(reader.read(buf)?)
|
||||
}))
|
||||
}
|
||||
|
||||
fn poll_complete<'a>(
|
||||
self: Pin<&'a Self>,
|
||||
_op: ReadAtOperation<'a>,
|
||||
) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
|
||||
panic!("LocalDynamicReadAt::start_read_at returned Pending");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,10 +1,9 @@
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use std::convert::TryInto;
|
||||
use std::io::{Seek, SeekFrom};
|
||||
|
||||
use super::chunk_stat::*;
|
||||
use super::chunk_store::*;
|
||||
use super::IndexFile;
|
||||
use super::{IndexFile, ChunkReadInfo};
|
||||
use crate::tools::{self, epoch_now_u64};
|
||||
|
||||
use chrono::{Local, TimeZone};
|
||||
@ -147,38 +146,6 @@ impl FixedIndexReader {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
|
||||
if pos >= self.index_length {
|
||||
bail!("chunk index out of range");
|
||||
}
|
||||
let start = (pos * self.chunk_size) as u64;
|
||||
let mut end = start + self.chunk_size as u64;
|
||||
|
||||
if end > self.size {
|
||||
end = self.size;
|
||||
}
|
||||
|
||||
let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
|
||||
unsafe {
|
||||
std::ptr::copy_nonoverlapping(
|
||||
self.index.add(pos * 32),
|
||||
(*digest.as_mut_ptr()).as_mut_ptr(),
|
||||
32,
|
||||
);
|
||||
}
|
||||
|
||||
Ok((start, end, unsafe { digest.assume_init() }))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
|
||||
if pos >= self.index_length {
|
||||
panic!("chunk index out of range");
|
||||
}
|
||||
let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos * 32), 32) };
|
||||
slice.try_into().unwrap()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn chunk_end(&self, pos: usize) -> u64 {
|
||||
if pos >= self.index_length {
|
||||
@ -193,20 +160,6 @@ impl FixedIndexReader {
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute checksum and data size
|
||||
pub fn compute_csum(&self) -> ([u8; 32], u64) {
|
||||
let mut csum = openssl::sha::Sha256::new();
|
||||
let mut chunk_end = 0;
|
||||
for pos in 0..self.index_length {
|
||||
chunk_end = self.chunk_end(pos);
|
||||
let digest = self.chunk_digest(pos);
|
||||
csum.update(digest);
|
||||
}
|
||||
let csum = csum.finish();
|
||||
|
||||
(csum, chunk_end)
|
||||
}
|
||||
|
||||
pub fn print_info(&self) {
|
||||
println!("Size: {}", self.size);
|
||||
println!("ChunkSize: {}", self.chunk_size);
|
||||
@ -234,6 +187,38 @@ impl IndexFile for FixedIndexReader {
|
||||
fn index_bytes(&self) -> u64 {
|
||||
self.size
|
||||
}
|
||||
|
||||
fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> {
|
||||
if pos >= self.index_length {
|
||||
return None;
|
||||
}
|
||||
|
||||
let start = (pos * self.chunk_size) as u64;
|
||||
let mut end = start + self.chunk_size as u64;
|
||||
|
||||
if end > self.size {
|
||||
end = self.size;
|
||||
}
|
||||
|
||||
let digest = self.index_digest(pos).unwrap();
|
||||
Some(ChunkReadInfo {
|
||||
range: start..end,
|
||||
digest: *digest,
|
||||
})
|
||||
}
|
||||
|
||||
fn compute_csum(&self) -> ([u8; 32], u64) {
|
||||
let mut csum = openssl::sha::Sha256::new();
|
||||
let mut chunk_end = 0;
|
||||
for pos in 0..self.index_count() {
|
||||
let info = self.chunk_info(pos).unwrap();
|
||||
chunk_end = info.range.end;
|
||||
csum.update(&info.digest);
|
||||
}
|
||||
let csum = csum.finish();
|
||||
|
||||
(csum, chunk_end)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FixedIndexWriter {
|
||||
@ -511,18 +496,17 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
|
||||
|
||||
fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
|
||||
let index = &self.index;
|
||||
let (start, end, digest) = index.chunk_info(idx)?;
|
||||
let info = match index.chunk_info(idx) {
|
||||
Some(info) => info,
|
||||
None => bail!("chunk index out of range"),
|
||||
};
|
||||
|
||||
// fixme: avoid copy
|
||||
|
||||
let data = self.store.read_chunk(&digest)?;
|
||||
|
||||
if (end - start) != data.len() as u64 {
|
||||
bail!(
|
||||
"read chunk with wrong size ({} != {}",
|
||||
(end - start),
|
||||
data.len()
|
||||
);
|
||||
let data = self.store.read_chunk(&info.digest)?;
|
||||
let size = info.range.end - info.range.start;
|
||||
if size != data.len() as u64 {
|
||||
bail!("read chunk with wrong size ({} != {}", size, data.len());
|
||||
}
|
||||
|
||||
self.read_buffer.clear();
|
||||
@ -530,8 +514,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
|
||||
|
||||
self.buffered_chunk_idx = idx;
|
||||
|
||||
self.buffered_chunk_start = start as u64;
|
||||
//println!("BUFFER {} {}", self.buffered_chunk_start, end);
|
||||
self.buffered_chunk_start = info.range.start as u64;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -1,11 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use anyhow::{format_err, Error};
|
||||
use futures::*;
|
||||
|
||||
pub struct ChunkReadInfo {
|
||||
pub range: Range<u64>,
|
||||
@ -26,6 +20,10 @@ pub trait IndexFile {
|
||||
fn index_count(&self) -> usize;
|
||||
fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
|
||||
fn index_bytes(&self) -> u64;
|
||||
fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
|
||||
|
||||
/// Compute index checksum and size
|
||||
fn compute_csum(&self) -> ([u8; 32], u64);
|
||||
|
||||
/// Returns most often used chunks
|
||||
fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> {
|
||||
@ -59,111 +57,3 @@ pub trait IndexFile {
|
||||
map
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode digest list from an `IndexFile` into a binary stream
|
||||
///
|
||||
/// The reader simply returns a birary stream of 32 byte digest values.
|
||||
pub struct DigestListEncoder {
|
||||
index: Box<dyn IndexFile + Send + Sync>,
|
||||
pos: usize,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl DigestListEncoder {
|
||||
|
||||
pub fn new(index: Box<dyn IndexFile + Send + Sync>) -> Self {
|
||||
let count = index.index_count();
|
||||
Self { index, pos: 0, count }
|
||||
}
|
||||
}
|
||||
|
||||
impl std::io::Read for DigestListEncoder {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
if buf.len() < 32 {
|
||||
panic!("read buffer too small");
|
||||
}
|
||||
|
||||
if self.pos < self.count {
|
||||
let mut written = 0;
|
||||
loop {
|
||||
let digest = self.index.index_digest(self.pos).unwrap();
|
||||
buf[written..(written + 32)].copy_from_slice(digest);
|
||||
self.pos += 1;
|
||||
written += 32;
|
||||
if self.pos >= self.count {
|
||||
break;
|
||||
}
|
||||
if (written + 32) >= buf.len() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(written)
|
||||
} else {
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]>
|
||||
///
|
||||
/// The reader simply returns a birary stream of 32 byte digest values.
|
||||
|
||||
pub struct DigestListDecoder<S: Unpin> {
|
||||
input: S,
|
||||
buffer: BytesMut,
|
||||
}
|
||||
|
||||
impl<S: Unpin> DigestListDecoder<S> {
|
||||
pub fn new(input: S) -> Self {
|
||||
Self { input, buffer: BytesMut::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Unpin> Unpin for DigestListDecoder<S> {}
|
||||
|
||||
impl<S: Unpin, E> Stream for DigestListDecoder<S>
|
||||
where
|
||||
S: Stream<Item=Result<Bytes, E>>,
|
||||
E: Into<Error>,
|
||||
{
|
||||
type Item = Result<[u8; 32], Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
loop {
|
||||
if this.buffer.len() >= 32 {
|
||||
let left = this.buffer.split_to(32);
|
||||
|
||||
let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
|
||||
unsafe {
|
||||
(*digest.as_mut_ptr()).copy_from_slice(&left[..]);
|
||||
return Poll::Ready(Some(Ok(digest.assume_init())));
|
||||
}
|
||||
}
|
||||
|
||||
match Pin::new(&mut this.input).poll_next(cx) {
|
||||
Poll::Pending => {
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Ready(Some(Err(err))) => {
|
||||
return Poll::Ready(Some(Err(err.into())));
|
||||
}
|
||||
Poll::Ready(Some(Ok(data))) => {
|
||||
this.buffer.extend_from_slice(&data);
|
||||
// continue
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
let rest = this.buffer.len();
|
||||
if rest == 0 {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
return Poll::Ready(Some(Err(format_err!(
|
||||
"got small digest ({} != 32).",
|
||||
rest,
|
||||
))));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
196
src/backup/verify.rs
Normal file
196
src/backup/verify.rs
Normal file
@ -0,0 +1,196 @@
|
||||
use anyhow::{bail, Error};
|
||||
|
||||
use crate::server::WorkerTask;
|
||||
|
||||
use super::{
|
||||
DataStore, BackupGroup, BackupDir, BackupInfo, IndexFile,
|
||||
ENCR_COMPR_BLOB_MAGIC_1_0, ENCRYPTED_BLOB_MAGIC_1_0,
|
||||
FileInfo, ArchiveType, archive_type,
|
||||
};
|
||||
|
||||
fn verify_blob(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
|
||||
|
||||
let (blob, raw_size) = datastore.load_blob(backup_dir, &info.filename)?;
|
||||
|
||||
let csum = openssl::sha::sha256(blob.raw_data());
|
||||
if raw_size != info.size {
|
||||
bail!("wrong size ({} != {})", info.size, raw_size);
|
||||
}
|
||||
|
||||
if csum != info.csum {
|
||||
bail!("wrong index checksum");
|
||||
}
|
||||
|
||||
blob.verify_crc()?;
|
||||
|
||||
let magic = blob.magic();
|
||||
|
||||
if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
blob.decode(None)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn verify_index_chunks(
|
||||
datastore: &DataStore,
|
||||
index: Box<dyn IndexFile>,
|
||||
worker: &WorkerTask,
|
||||
) -> Result<(), Error> {
|
||||
|
||||
for pos in 0..index.index_count() {
|
||||
|
||||
worker.fail_on_abort()?;
|
||||
|
||||
let info = index.chunk_info(pos).unwrap();
|
||||
let size = info.range.end - info.range.start;
|
||||
datastore.verify_stored_chunk(&info.digest, size)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn verify_fixed_index(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo, worker: &WorkerTask) -> Result<(), Error> {
|
||||
|
||||
let mut path = backup_dir.relative_path();
|
||||
path.push(&info.filename);
|
||||
|
||||
let index = datastore.open_fixed_reader(&path)?;
|
||||
|
||||
let (csum, size) = index.compute_csum();
|
||||
if size != info.size {
|
||||
bail!("wrong size ({} != {})", info.size, size);
|
||||
}
|
||||
|
||||
if csum != info.csum {
|
||||
bail!("wrong index checksum");
|
||||
}
|
||||
|
||||
verify_index_chunks(datastore, Box::new(index), worker)
|
||||
}
|
||||
|
||||
fn verify_dynamic_index(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo, worker: &WorkerTask) -> Result<(), Error> {
|
||||
let mut path = backup_dir.relative_path();
|
||||
path.push(&info.filename);
|
||||
|
||||
let index = datastore.open_dynamic_reader(&path)?;
|
||||
|
||||
let (csum, size) = index.compute_csum();
|
||||
if size != info.size {
|
||||
bail!("wrong size ({} != {})", info.size, size);
|
||||
}
|
||||
|
||||
if csum != info.csum {
|
||||
bail!("wrong index checksum");
|
||||
}
|
||||
|
||||
verify_index_chunks(datastore, Box::new(index), worker)
|
||||
}
|
||||
|
||||
/// Verify a single backup snapshot
|
||||
///
|
||||
/// This checks all archives inside a backup snapshot.
|
||||
/// Errors are logged to the worker log.
|
||||
///
|
||||
/// Returns
|
||||
/// - Ok(true) if verify is successful
|
||||
/// - Ok(false) if there were verification errors
|
||||
/// - Err(_) if task was aborted
|
||||
pub fn verify_backup_dir(datastore: &DataStore, backup_dir: &BackupDir, worker: &WorkerTask) -> Result<bool, Error> {
|
||||
|
||||
let manifest = match datastore.load_manifest(&backup_dir) {
|
||||
Ok((manifest, _)) => manifest,
|
||||
Err(err) => {
|
||||
worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err));
|
||||
return Ok(false);
|
||||
}
|
||||
};
|
||||
|
||||
worker.log(format!("verify {}:{}", datastore.name(), backup_dir));
|
||||
|
||||
let mut error_count = 0;
|
||||
|
||||
for info in manifest.files() {
|
||||
let result = proxmox::try_block!({
|
||||
worker.log(format!(" check {}", info.filename));
|
||||
match archive_type(&info.filename)? {
|
||||
ArchiveType::FixedIndex => verify_fixed_index(&datastore, &backup_dir, info, worker),
|
||||
ArchiveType::DynamicIndex => verify_dynamic_index(&datastore, &backup_dir, info, worker),
|
||||
ArchiveType::Blob => verify_blob(&datastore, &backup_dir, info),
|
||||
}
|
||||
});
|
||||
|
||||
worker.fail_on_abort()?;
|
||||
|
||||
if let Err(err) = result {
|
||||
worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err));
|
||||
error_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(error_count == 0)
|
||||
}
|
||||
|
||||
/// Verify all backups inside a backup group
|
||||
///
|
||||
/// Errors are logged to the worker log.
|
||||
///
|
||||
/// Returns
|
||||
/// - Ok(true) if verify is successful
|
||||
/// - Ok(false) if there were verification errors
|
||||
/// - Err(_) if task was aborted
|
||||
pub fn verify_backup_group(datastore: &DataStore, group: &BackupGroup, worker: &WorkerTask) -> Result<bool, Error> {
|
||||
|
||||
let mut list = match group.list_backups(&datastore.base_path()) {
|
||||
Ok(list) => list,
|
||||
Err(err) => {
|
||||
worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err));
|
||||
return Ok(false);
|
||||
}
|
||||
};
|
||||
|
||||
worker.log(format!("verify group {}:{}", datastore.name(), group));
|
||||
|
||||
let mut error_count = 0;
|
||||
|
||||
BackupInfo::sort_list(&mut list, false); // newest first
|
||||
for info in list {
|
||||
if !verify_backup_dir(datastore, &info.backup_dir, worker)? {
|
||||
error_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(error_count == 0)
|
||||
}
|
||||
|
||||
/// Verify all backups inside a datastore
|
||||
///
|
||||
/// Errors are logged to the worker log.
|
||||
///
|
||||
/// Returns
|
||||
/// - Ok(true) if verify is successful
|
||||
/// - Ok(false) if there were verification errors
|
||||
/// - Err(_) if task was aborted
|
||||
pub fn verify_all_backups(datastore: &DataStore, worker: &WorkerTask) -> Result<bool, Error> {
|
||||
|
||||
let list = match BackupGroup::list_groups(&datastore.base_path()) {
|
||||
Ok(list) => list,
|
||||
Err(err) => {
|
||||
worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err));
|
||||
return Ok(false);
|
||||
}
|
||||
};
|
||||
|
||||
worker.log(format!("verify datastore {}", datastore.name()));
|
||||
|
||||
let mut error_count = 0;
|
||||
for group in list {
|
||||
if !verify_backup_group(datastore, &group, worker)? {
|
||||
error_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(error_count == 0)
|
||||
}
|
@ -6,7 +6,7 @@ use std::os::unix::io::RawFd;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::task::Context;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use chrono::{Local, DateTime, Utc, TimeZone};
|
||||
@ -27,6 +27,7 @@ use proxmox::api::{ApiHandler, ApiMethod, RpcEnvironment};
|
||||
use proxmox::api::schema::*;
|
||||
use proxmox::api::cli::*;
|
||||
use proxmox::api::api;
|
||||
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
|
||||
|
||||
use proxmox_backup::tools;
|
||||
use proxmox_backup::api2::types::*;
|
||||
@ -260,16 +261,15 @@ async fn api_datastore_latest_snapshot(
|
||||
Ok((group.backup_type().to_owned(), group.backup_id().to_owned(), backup_time))
|
||||
}
|
||||
|
||||
|
||||
async fn backup_directory<P: AsRef<Path>>(
|
||||
client: &BackupWriter,
|
||||
previous_manifest: Option<Arc<BackupManifest>>,
|
||||
dir_path: P,
|
||||
archive_name: &str,
|
||||
chunk_size: Option<usize>,
|
||||
device_set: Option<HashSet<u64>>,
|
||||
verbose: bool,
|
||||
skip_lost_and_found: bool,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
|
||||
exclude_pattern: Vec<MatchEntry>,
|
||||
entries_max: usize,
|
||||
@ -299,7 +299,7 @@ async fn backup_directory<P: AsRef<Path>>(
|
||||
});
|
||||
|
||||
let stats = client
|
||||
.upload_stream(archive_name, stream, "dynamic", None, crypt_config)
|
||||
.upload_stream(previous_manifest, archive_name, stream, "dynamic", None)
|
||||
.await?;
|
||||
|
||||
Ok(stats)
|
||||
@ -307,12 +307,12 @@ async fn backup_directory<P: AsRef<Path>>(
|
||||
|
||||
async fn backup_image<P: AsRef<Path>>(
|
||||
client: &BackupWriter,
|
||||
previous_manifest: Option<Arc<BackupManifest>>,
|
||||
image_path: P,
|
||||
archive_name: &str,
|
||||
image_size: u64,
|
||||
chunk_size: Option<usize>,
|
||||
_verbose: bool,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
) -> Result<BackupStats, Error> {
|
||||
|
||||
let path = image_path.as_ref().to_owned();
|
||||
@ -325,7 +325,7 @@ async fn backup_image<P: AsRef<Path>>(
|
||||
let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
|
||||
|
||||
let stats = client
|
||||
.upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config)
|
||||
.upload_stream(previous_manifest, archive_name, stream, "fixed", Some(image_size))
|
||||
.await?;
|
||||
|
||||
Ok(stats)
|
||||
@ -708,8 +708,7 @@ async fn start_garbage_collection(param: Value) -> Result<Value, Error> {
|
||||
}
|
||||
|
||||
fn spawn_catalog_upload(
|
||||
client: Arc<BackupWriter>,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
client: Arc<BackupWriter>
|
||||
) -> Result<
|
||||
(
|
||||
Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
|
||||
@ -727,7 +726,7 @@ fn spawn_catalog_upload(
|
||||
|
||||
tokio::spawn(async move {
|
||||
let catalog_upload_result = client
|
||||
.upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config)
|
||||
.upload_stream(None, CATALOG_NAME, catalog_chunk_stream, "dynamic", None)
|
||||
.await;
|
||||
|
||||
if let Err(ref err) = catalog_upload_result {
|
||||
@ -958,6 +957,7 @@ async fn create_backup(
|
||||
|
||||
let client = BackupWriter::start(
|
||||
client,
|
||||
crypt_config.clone(),
|
||||
repo.store(),
|
||||
backup_type,
|
||||
&backup_id,
|
||||
@ -965,6 +965,12 @@ async fn create_backup(
|
||||
verbose,
|
||||
).await?;
|
||||
|
||||
let previous_manifest = if let Ok(previous_manifest) = client.download_previous_manifest().await {
|
||||
Some(Arc::new(previous_manifest))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
|
||||
let mut manifest = BackupManifest::new(snapshot);
|
||||
|
||||
@ -976,21 +982,21 @@ async fn create_backup(
|
||||
BackupSpecificationType::CONFIG => {
|
||||
println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target);
|
||||
let stats = client
|
||||
.upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
|
||||
.upload_blob_from_file(&filename, &target, true, Some(true))
|
||||
.await?;
|
||||
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
|
||||
}
|
||||
BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ?
|
||||
println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target);
|
||||
let stats = client
|
||||
.upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
|
||||
.upload_blob_from_file(&filename, &target, true, Some(true))
|
||||
.await?;
|
||||
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
|
||||
}
|
||||
BackupSpecificationType::PXAR => {
|
||||
// start catalog upload on first use
|
||||
if catalog.is_none() {
|
||||
let (cat, res) = spawn_catalog_upload(client.clone(), crypt_config.clone())?;
|
||||
let (cat, res) = spawn_catalog_upload(client.clone())?;
|
||||
catalog = Some(cat);
|
||||
catalog_result_tx = Some(res);
|
||||
}
|
||||
@ -1000,13 +1006,13 @@ async fn create_backup(
|
||||
catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
|
||||
let stats = backup_directory(
|
||||
&client,
|
||||
previous_manifest.clone(),
|
||||
&filename,
|
||||
&target,
|
||||
chunk_size_opt,
|
||||
devices.clone(),
|
||||
verbose,
|
||||
skip_lost_and_found,
|
||||
crypt_config.clone(),
|
||||
catalog.clone(),
|
||||
pattern_list.clone(),
|
||||
entries_max as usize,
|
||||
@ -1018,12 +1024,12 @@ async fn create_backup(
|
||||
println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
|
||||
let stats = backup_image(
|
||||
&client,
|
||||
previous_manifest.clone(),
|
||||
&filename,
|
||||
&target,
|
||||
size,
|
||||
chunk_size_opt,
|
||||
verbose,
|
||||
crypt_config.clone(),
|
||||
).await?;
|
||||
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
|
||||
}
|
||||
@ -1050,7 +1056,7 @@ async fn create_backup(
|
||||
let target = "rsa-encrypted.key";
|
||||
println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
|
||||
let stats = client
|
||||
.upload_blob_from_data(rsa_encrypted_key, target, None, false, false)
|
||||
.upload_blob_from_data(rsa_encrypted_key, target, false, None)
|
||||
.await?;
|
||||
manifest.add_file(format!("{}.blob", target), stats.size, stats.csum, is_encrypted)?;
|
||||
|
||||
@ -1070,7 +1076,7 @@ async fn create_backup(
|
||||
println!("Upload index.json to '{:?}'", repo);
|
||||
let manifest = serde_json::to_string_pretty(&manifest)?.into();
|
||||
client
|
||||
.upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true)
|
||||
.upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, true, Some(true))
|
||||
.await?;
|
||||
|
||||
client.finish().await?;
|
||||
@ -2029,19 +2035,26 @@ impl BufferedDynamicReadAt {
|
||||
}
|
||||
}
|
||||
|
||||
impl pxar::accessor::ReadAt for BufferedDynamicReadAt {
|
||||
fn poll_read_at(
|
||||
self: Pin<&Self>,
|
||||
impl ReadAt for BufferedDynamicReadAt {
|
||||
fn start_read_at<'a>(
|
||||
self: Pin<&'a Self>,
|
||||
_cx: &mut Context,
|
||||
buf: &mut [u8],
|
||||
buf: &'a mut [u8],
|
||||
offset: u64,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
|
||||
use std::io::Read;
|
||||
tokio::task::block_in_place(move || {
|
||||
MaybeReady::Ready(tokio::task::block_in_place(move || {
|
||||
let mut reader = self.inner.lock().unwrap();
|
||||
reader.seek(SeekFrom::Start(offset))?;
|
||||
Poll::Ready(Ok(reader.read(buf)?))
|
||||
})
|
||||
Ok(reader.read(buf)?)
|
||||
}))
|
||||
}
|
||||
|
||||
fn poll_complete<'a>(
|
||||
self: Pin<&'a Self>,
|
||||
_op: ReadAtOperation<'a>,
|
||||
) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
|
||||
panic!("LocalDynamicReadAt::start_read_at returned Pending");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -319,6 +319,40 @@ async fn pull_datastore(
|
||||
Ok(Value::Null)
|
||||
}
|
||||
|
||||
#[api(
|
||||
input: {
|
||||
properties: {
|
||||
"store": {
|
||||
schema: DATASTORE_SCHEMA,
|
||||
},
|
||||
"output-format": {
|
||||
schema: OUTPUT_FORMAT,
|
||||
optional: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
)]
|
||||
/// Verify backups
|
||||
async fn verify(
|
||||
store: String,
|
||||
param: Value,
|
||||
) -> Result<Value, Error> {
|
||||
|
||||
let output_format = get_output_format(¶m);
|
||||
|
||||
let mut client = connect()?;
|
||||
|
||||
let args = json!({});
|
||||
|
||||
let path = format!("api2/json/admin/datastore/{}/verify", store);
|
||||
|
||||
let result = client.post(&path, Some(args)).await?;
|
||||
|
||||
view_task_result(client, result, &output_format).await?;
|
||||
|
||||
Ok(Value::Null)
|
||||
}
|
||||
|
||||
fn main() {
|
||||
|
||||
proxmox_backup::tools::setup_safe_path_env();
|
||||
@ -342,8 +376,16 @@ fn main() {
|
||||
.completion_cb("local-store", config::datastore::complete_datastore_name)
|
||||
.completion_cb("remote", config::remote::complete_remote_name)
|
||||
.completion_cb("remote-store", complete_remote_datastore_name)
|
||||
)
|
||||
.insert(
|
||||
"verify",
|
||||
CliCommand::new(&API_METHOD_VERIFY)
|
||||
.arg_param(&["store"])
|
||||
.completion_cb("store", config::datastore::complete_datastore_name)
|
||||
);
|
||||
|
||||
|
||||
|
||||
let mut rpcenv = CliEnvironment::new();
|
||||
rpcenv.set_user(Some(String::from("root@pam")));
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::collections::HashSet;
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
@ -22,6 +23,7 @@ pub struct BackupWriter {
|
||||
h2: H2Client,
|
||||
abort: AbortHandle,
|
||||
verbose: bool,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
}
|
||||
|
||||
impl Drop for BackupWriter {
|
||||
@ -38,12 +40,13 @@ pub struct BackupStats {
|
||||
|
||||
impl BackupWriter {
|
||||
|
||||
fn new(h2: H2Client, abort: AbortHandle, verbose: bool) -> Arc<Self> {
|
||||
Arc::new(Self { h2, abort, verbose })
|
||||
fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
|
||||
Arc::new(Self { h2, abort, crypt_config, verbose })
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
client: HttpClient,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
datastore: &str,
|
||||
backup_type: &str,
|
||||
backup_id: &str,
|
||||
@ -64,7 +67,7 @@ impl BackupWriter {
|
||||
|
||||
let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
|
||||
|
||||
Ok(BackupWriter::new(h2, abort, debug))
|
||||
Ok(BackupWriter::new(h2, abort, crypt_config, debug))
|
||||
}
|
||||
|
||||
pub async fn get(
|
||||
@ -159,16 +162,19 @@ impl BackupWriter {
|
||||
&self,
|
||||
data: Vec<u8>,
|
||||
file_name: &str,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
compress: bool,
|
||||
sign_only: bool,
|
||||
crypt_or_sign: Option<bool>,
|
||||
) -> Result<BackupStats, Error> {
|
||||
|
||||
let blob = if let Some(ref crypt_config) = crypt_config {
|
||||
if sign_only {
|
||||
DataBlob::create_signed(&data, crypt_config, compress)?
|
||||
} else {
|
||||
let blob = if let Some(ref crypt_config) = self.crypt_config {
|
||||
if let Some(encrypt) = crypt_or_sign {
|
||||
if encrypt {
|
||||
DataBlob::encode(&data, Some(crypt_config), compress)?
|
||||
} else {
|
||||
DataBlob::create_signed(&data, crypt_config, compress)?
|
||||
}
|
||||
} else {
|
||||
DataBlob::encode(&data, None, compress)?
|
||||
}
|
||||
} else {
|
||||
DataBlob::encode(&data, None, compress)?
|
||||
@ -187,8 +193,8 @@ impl BackupWriter {
|
||||
&self,
|
||||
src_path: P,
|
||||
file_name: &str,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
compress: bool,
|
||||
crypt_or_sign: Option<bool>,
|
||||
) -> Result<BackupStats, Error> {
|
||||
|
||||
let src_path = src_path.as_ref();
|
||||
@ -203,25 +209,16 @@ impl BackupWriter {
|
||||
.await
|
||||
.map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
|
||||
|
||||
let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?;
|
||||
let raw_data = blob.into_inner();
|
||||
let size = raw_data.len() as u64;
|
||||
let csum = openssl::sha::sha256(&raw_data);
|
||||
let param = json!({
|
||||
"encoded-size": size,
|
||||
"file-name": file_name,
|
||||
});
|
||||
self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
|
||||
Ok(BackupStats { size, csum })
|
||||
self.upload_blob_from_data(contents, file_name, compress, crypt_or_sign).await
|
||||
}
|
||||
|
||||
pub async fn upload_stream(
|
||||
&self,
|
||||
previous_manifest: Option<Arc<BackupManifest>>,
|
||||
archive_name: &str,
|
||||
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
|
||||
prefix: &str,
|
||||
fixed_size: Option<u64>,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
) -> Result<BackupStats, Error> {
|
||||
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
|
||||
|
||||
@ -233,7 +230,18 @@ impl BackupWriter {
|
||||
let index_path = format!("{}_index", prefix);
|
||||
let close_path = format!("{}_close", prefix);
|
||||
|
||||
self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?;
|
||||
if let Some(manifest) = previous_manifest {
|
||||
// try, but ignore errors
|
||||
match archive_type(archive_name) {
|
||||
Ok(ArchiveType::FixedIndex) => {
|
||||
let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
|
||||
}
|
||||
Ok(ArchiveType::DynamicIndex) => {
|
||||
let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
|
||||
}
|
||||
_ => { /* do nothing */ }
|
||||
}
|
||||
}
|
||||
|
||||
let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
|
||||
|
||||
@ -244,7 +252,7 @@ impl BackupWriter {
|
||||
stream,
|
||||
&prefix,
|
||||
known_chunks.clone(),
|
||||
crypt_config,
|
||||
self.crypt_config.clone(),
|
||||
self.verbose,
|
||||
)
|
||||
.await?;
|
||||
@ -374,41 +382,93 @@ impl BackupWriter {
|
||||
(verify_queue_tx, verify_result_rx)
|
||||
}
|
||||
|
||||
pub async fn download_chunk_list(
|
||||
pub async fn download_previous_fixed_index(
|
||||
&self,
|
||||
path: &str,
|
||||
archive_name: &str,
|
||||
manifest: &BackupManifest,
|
||||
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<FixedIndexReader, Error> {
|
||||
|
||||
let mut tmpfile = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.read(true)
|
||||
.custom_flags(libc::O_TMPFILE)
|
||||
.open("/tmp")?;
|
||||
|
||||
let param = json!({ "archive-name": archive_name });
|
||||
let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap();
|
||||
self.h2.download("previous", Some(param), &mut tmpfile).await?;
|
||||
|
||||
let h2request = self.h2.send_request(request, None).await?;
|
||||
let resp = h2request.await?;
|
||||
let index = FixedIndexReader::new(tmpfile)
|
||||
.map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
|
||||
// Note: do not use values stored in index (not trusted) - instead, computed them again
|
||||
let (csum, size) = index.compute_csum();
|
||||
manifest.verify_file(archive_name, &csum, size)?;
|
||||
|
||||
let status = resp.status();
|
||||
|
||||
if !status.is_success() {
|
||||
H2Client::h2api_response(resp).await?; // raise error
|
||||
unreachable!();
|
||||
}
|
||||
|
||||
let mut body = resp.into_body();
|
||||
let mut flow_control = body.flow_control().clone();
|
||||
|
||||
let mut stream = DigestListDecoder::new(body.map_err(Error::from));
|
||||
|
||||
while let Some(chunk) = stream.try_next().await? {
|
||||
let _ = flow_control.release_capacity(chunk.len());
|
||||
known_chunks.lock().unwrap().insert(chunk);
|
||||
// add index chunks to known chunks
|
||||
let mut known_chunks = known_chunks.lock().unwrap();
|
||||
for i in 0..index.index_count() {
|
||||
known_chunks.insert(*index.index_digest(i).unwrap());
|
||||
}
|
||||
|
||||
if self.verbose {
|
||||
println!("{}: known chunks list length is {}", archive_name, known_chunks.lock().unwrap().len());
|
||||
println!("{}: known chunks list length is {}", archive_name, index.index_count());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
pub async fn download_previous_dynamic_index(
|
||||
&self,
|
||||
archive_name: &str,
|
||||
manifest: &BackupManifest,
|
||||
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||
) -> Result<DynamicIndexReader, Error> {
|
||||
|
||||
let mut tmpfile = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.read(true)
|
||||
.custom_flags(libc::O_TMPFILE)
|
||||
.open("/tmp")?;
|
||||
|
||||
let param = json!({ "archive-name": archive_name });
|
||||
self.h2.download("previous", Some(param), &mut tmpfile).await?;
|
||||
|
||||
let index = DynamicIndexReader::new(tmpfile)
|
||||
.map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
|
||||
// Note: do not use values stored in index (not trusted) - instead, computed them again
|
||||
let (csum, size) = index.compute_csum();
|
||||
manifest.verify_file(archive_name, &csum, size)?;
|
||||
|
||||
// add index chunks to known chunks
|
||||
let mut known_chunks = known_chunks.lock().unwrap();
|
||||
for i in 0..index.index_count() {
|
||||
known_chunks.insert(*index.index_digest(i).unwrap());
|
||||
}
|
||||
|
||||
if self.verbose {
|
||||
println!("{}: known chunks list length is {}", archive_name, index.index_count());
|
||||
}
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
/// Download backup manifest (index.json) of last backup
|
||||
pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
|
||||
|
||||
use std::convert::TryFrom;
|
||||
|
||||
let mut raw_data = Vec::with_capacity(64 * 1024);
|
||||
|
||||
let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
|
||||
self.h2.download("previous", Some(param), &mut raw_data).await?;
|
||||
|
||||
let blob = DataBlob::from_raw(raw_data)?;
|
||||
blob.verify_crc()?;
|
||||
let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
|
||||
let json: Value = serde_json::from_slice(&data[..])?;
|
||||
let manifest = BackupManifest::try_from(json)?;
|
||||
|
||||
Ok(manifest)
|
||||
}
|
||||
|
||||
fn upload_chunk_info_stream(
|
||||
|
@ -2,7 +2,7 @@ use std::collections::{HashSet, HashMap};
|
||||
use std::convert::TryFrom;
|
||||
use std::ffi::{CStr, CString, OsStr};
|
||||
use std::fmt;
|
||||
use std::io::{self, Write};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
use std::path::{Path, PathBuf};
|
||||
@ -20,6 +20,7 @@ use pxar::encoder::LinkOffset;
|
||||
use proxmox::c_str;
|
||||
use proxmox::sys::error::SysError;
|
||||
use proxmox::tools::fd::RawFdNum;
|
||||
use proxmox::tools::vec;
|
||||
|
||||
use crate::pxar::catalog::BackupCatalogWriter;
|
||||
use crate::pxar::Flags;
|
||||
@ -35,6 +36,7 @@ fn detect_fs_type(fd: RawFd) -> Result<i64, Error> {
|
||||
Ok(fs_stat.f_type)
|
||||
}
|
||||
|
||||
#[rustfmt::skip]
|
||||
pub fn is_virtual_file_system(magic: i64) -> bool {
|
||||
use proxmox::sys::linux::magic::*;
|
||||
|
||||
@ -114,6 +116,7 @@ struct Archiver<'a, 'b> {
|
||||
device_set: Option<HashSet<u64>>,
|
||||
hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
|
||||
errors: ErrorReporter,
|
||||
file_copy_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>;
|
||||
@ -178,6 +181,7 @@ where
|
||||
device_set,
|
||||
hardlinks: HashMap::new(),
|
||||
errors: ErrorReporter,
|
||||
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
|
||||
};
|
||||
|
||||
archiver.archive_dir_contents(&mut encoder, source_dir, true)?;
|
||||
@ -410,6 +414,24 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn report_file_shrunk_while_reading(&mut self) -> Result<(), Error> {
|
||||
write!(
|
||||
self.errors,
|
||||
"warning: file size shrunk while reading: {:?}, file will be padded with zeros!",
|
||||
self.path,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn report_file_grew_while_reading(&mut self) -> Result<(), Error> {
|
||||
write!(
|
||||
self.errors,
|
||||
"warning: file size increased while reading: {:?}, file will be truncated!",
|
||||
self.path,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_entry(
|
||||
&mut self,
|
||||
encoder: &mut Encoder,
|
||||
@ -591,8 +613,29 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
file_size: u64,
|
||||
) -> Result<LinkOffset, Error> {
|
||||
let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
|
||||
let offset = encoder.add_file(metadata, file_name, file_size, &mut file)?;
|
||||
Ok(offset)
|
||||
let mut remaining = file_size;
|
||||
let mut out = encoder.create_file(metadata, file_name, file_size)?;
|
||||
while remaining != 0 {
|
||||
let mut got = file.read(&mut self.file_copy_buffer[..])?;
|
||||
if got as u64 > remaining {
|
||||
self.report_file_grew_while_reading()?;
|
||||
got = remaining as usize;
|
||||
}
|
||||
out.write_all(&self.file_copy_buffer[..got])?;
|
||||
remaining -= got as u64;
|
||||
}
|
||||
if remaining > 0 {
|
||||
self.report_file_shrunk_while_reading()?;
|
||||
let to_zero = remaining.min(self.file_copy_buffer.len() as u64) as usize;
|
||||
vec::clear(&mut self.file_copy_buffer[..to_zero]);
|
||||
while remaining != 0 {
|
||||
let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
|
||||
out.write_all(&self.file_copy_buffer[..fill])?;
|
||||
remaining -= fill as u64;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(out.file_offset())
|
||||
}
|
||||
|
||||
fn add_symlink(
|
||||
|
@ -323,7 +323,7 @@ fn get_index(username: Option<String>, token: Option<String>, template: &Handleb
|
||||
|
||||
if let Some(query_str) = parts.uri.query() {
|
||||
for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
|
||||
if k == "debug" && v == "1" || v == "true" {
|
||||
if k == "debug" && v != "0" && v != "false" {
|
||||
debug = true;
|
||||
}
|
||||
}
|
||||
|
@ -213,6 +213,8 @@ pub fn upid_read_status(upid: &UPID) -> Result<String, Error> {
|
||||
Some(rest) => {
|
||||
if rest == "OK" {
|
||||
status = String::from(rest);
|
||||
} else if rest.starts_with("WARNINGS: ") {
|
||||
status = String::from(rest);
|
||||
} else if rest.starts_with("ERROR: ") {
|
||||
status = String::from(&rest[7..]);
|
||||
}
|
||||
@ -234,7 +236,7 @@ pub struct TaskListInfo {
|
||||
pub upid_str: String,
|
||||
/// Task `(endtime, status)` if already finished
|
||||
///
|
||||
/// The `status` ise iether `unknown`, `OK`, or `ERROR: ...`
|
||||
/// The `status` is either `unknown`, `OK`, `WARN`, or `ERROR: ...`
|
||||
pub state: Option<(i64, String)>, // endtime, status
|
||||
}
|
||||
|
||||
@ -385,6 +387,7 @@ impl std::fmt::Display for WorkerTask {
|
||||
struct WorkerTaskData {
|
||||
logger: FileLogger,
|
||||
progress: f64, // 0..1
|
||||
warn_count: u64,
|
||||
pub abort_listeners: Vec<oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
@ -424,6 +427,7 @@ impl WorkerTask {
|
||||
data: Mutex::new(WorkerTaskData {
|
||||
logger,
|
||||
progress: 0.0,
|
||||
warn_count: 0,
|
||||
abort_listeners: vec![],
|
||||
}),
|
||||
});
|
||||
@ -507,8 +511,11 @@ impl WorkerTask {
|
||||
/// Log task result, remove task from running list
|
||||
pub fn log_result(&self, result: &Result<(), Error>) {
|
||||
|
||||
let warn_count = self.data.lock().unwrap().warn_count;
|
||||
if let Err(err) = result {
|
||||
self.log(&format!("TASK ERROR: {}", err));
|
||||
} else if warn_count > 0 {
|
||||
self.log(format!("TASK WARNINGS: {}", warn_count));
|
||||
} else {
|
||||
self.log("TASK OK");
|
||||
}
|
||||
@ -524,6 +531,13 @@ impl WorkerTask {
|
||||
data.logger.log(msg);
|
||||
}
|
||||
|
||||
/// Log a message as warning.
|
||||
pub fn warn<S: AsRef<str>>(&self, msg: S) {
|
||||
let mut data = self.data.lock().unwrap();
|
||||
data.logger.log(format!("WARN: {}", msg.as_ref()));
|
||||
data.warn_count += 1;
|
||||
}
|
||||
|
||||
/// Set progress indicator
|
||||
pub fn progress(&self, progress: f64) {
|
||||
if progress >= 0.0 && progress <= 1.0 {
|
||||
|
@ -41,7 +41,7 @@ pub fn parse_u64(i: &str) -> IResult<&str, u64> {
|
||||
map_res(recognize(digit1), str::parse)(i)
|
||||
}
|
||||
|
||||
/// Parse complete input, generate vervose error message with line numbers
|
||||
/// Parse complete input, generate verbose error message with line numbers
|
||||
pub fn parse_complete<'a, F, O>(what: &str, i: &'a str, parser: F) -> Result<O, Error>
|
||||
where F: Fn(&'a str) -> IResult<&'a str, O>,
|
||||
{
|
||||
|
@ -64,7 +64,7 @@ Ext.define('PBS.DataStoreContent', {
|
||||
'text',
|
||||
'backup-time'
|
||||
]);
|
||||
Proxmox.Utils.monStoreErrors(view, view.store, true);
|
||||
Proxmox.Utils.monStoreErrors(view, this.store);
|
||||
this.reload(); // initial load
|
||||
},
|
||||
|
||||
@ -122,10 +122,11 @@ Ext.define('PBS.DataStoreContent', {
|
||||
return groups;
|
||||
},
|
||||
|
||||
onLoad: function(store, records, success) {
|
||||
onLoad: function(store, records, success, operation) {
|
||||
let view = this.getView();
|
||||
|
||||
if (!success) {
|
||||
Proxmox.Utils.setErrorMask(view, Proxmox.Utils.getResponseErrorMessage(operation.getError()));
|
||||
return;
|
||||
}
|
||||
|
||||
@ -176,6 +177,7 @@ Ext.define('PBS.DataStoreContent', {
|
||||
expanded: true,
|
||||
children: children
|
||||
});
|
||||
Proxmox.Utils.setErrorMask(view, false);
|
||||
},
|
||||
|
||||
onPrune: function() {
|
||||
@ -197,6 +199,34 @@ Ext.define('PBS.DataStoreContent', {
|
||||
win.show();
|
||||
},
|
||||
|
||||
onForget: function() {
|
||||
var view = this.getView();
|
||||
|
||||
let rec = view.selModel.getSelection()[0];
|
||||
if (!(rec && rec.data)) return;
|
||||
let data = rec.data;
|
||||
if (!data.leaf) return;
|
||||
|
||||
if (!view.datastore) return;
|
||||
|
||||
console.log(data);
|
||||
|
||||
Proxmox.Utils.API2Request({
|
||||
params: {
|
||||
"backup-type": data["backup-type"],
|
||||
"backup-id": data["backup-id"],
|
||||
"backup-time": (data['backup-time'].getTime()/1000).toFixed(0),
|
||||
},
|
||||
url: `/admin/datastore/${view.datastore}/snapshots`,
|
||||
method: 'DELETE',
|
||||
waitMsgTarget: view,
|
||||
failure: function(response, opts) {
|
||||
Ext.Msg.alert(gettext('Error'), response.htmlStatus);
|
||||
},
|
||||
callback: this.reload.bind(this),
|
||||
});
|
||||
},
|
||||
|
||||
openBackupFileDownloader: function() {
|
||||
let me = this;
|
||||
let view = me.getView();
|
||||
@ -334,6 +364,21 @@ Ext.define('PBS.DataStoreContent', {
|
||||
enableFn: function(record) { return !record.data.leaf; },
|
||||
handler: 'onPrune',
|
||||
},
|
||||
{
|
||||
xtype: 'proxmoxButton',
|
||||
text: gettext('Forget'),
|
||||
disabled: true,
|
||||
parentXType: 'pbsDataStoreContent',
|
||||
handler: 'onForget',
|
||||
confirmMsg: function(record) {
|
||||
console.log(record);
|
||||
let name = record.data.text;
|
||||
return Ext.String.format(gettext('Are you sure you want to remove snapshot {0}'), `'${name}'`);
|
||||
},
|
||||
enableFn: function(record) {
|
||||
return !!record.data.leaf;
|
||||
},
|
||||
},
|
||||
{
|
||||
xtype: 'proxmoxButton',
|
||||
text: gettext('Download Files'),
|
||||
|
@ -72,10 +72,15 @@ Ext.define('PBS.MainView', {
|
||||
let datastore = PBS.Utils.getDataStoreFromPath(path);
|
||||
obj = contentpanel.add({
|
||||
xtype: 'pbsDataStorePanel',
|
||||
nodename: 'localhost',
|
||||
datastore,
|
||||
});
|
||||
} else {
|
||||
obj = contentpanel.add({ xtype: path, border: false });
|
||||
obj = contentpanel.add({
|
||||
xtype: path,
|
||||
nodename: 'localhost',
|
||||
border: false
|
||||
});
|
||||
}
|
||||
|
||||
var treelist = me.lookupReference('navtree');
|
||||
|
@ -26,6 +26,7 @@ JSSRC= \
|
||||
dashboard/RunningTasks.js \
|
||||
dashboard/TaskSummary.js \
|
||||
Utils.js \
|
||||
ZFSList.js \
|
||||
DirectoryList.js \
|
||||
LoginView.js \
|
||||
VersionInfo.js \
|
||||
|
@ -69,6 +69,12 @@ Ext.define('PBS.store.NavigationStore', {
|
||||
path: 'pbsDirectoryList',
|
||||
leaf: true,
|
||||
},
|
||||
{
|
||||
text: "ZFS",
|
||||
iconCls: 'fa fa-th-large',
|
||||
path: 'pbsZFSList',
|
||||
leaf: true,
|
||||
},
|
||||
]
|
||||
}
|
||||
]
|
||||
|
@ -38,8 +38,9 @@ Ext.define('PBS.Utils', {
|
||||
let datastore = result[1], type = result[2], id = result[3];
|
||||
return `Datastore ${datastore} - ${what} ${type}/${id}`;
|
||||
}
|
||||
return what;
|
||||
return `Datastore ${id} - ${what}`;
|
||||
},
|
||||
|
||||
render_datastore_time_worker_id: function(id, what) {
|
||||
const res = id.match(/^(\S+)_([^_\s]+)_([^_\s]+)_([^_\s]+)$/);
|
||||
if (res) {
|
||||
@ -62,6 +63,9 @@ Ext.define('PBS.Utils', {
|
||||
prune: (type, id) => {
|
||||
return PBS.Utils.render_datastore_worker_id(id, gettext('Prune'));
|
||||
},
|
||||
verify: (type, id) => {
|
||||
return PBS.Utils.render_datastore_worker_id(id, gettext('Verify'));
|
||||
},
|
||||
backup: (type, id) => {
|
||||
return PBS.Utils.render_datastore_worker_id(id, gettext('Backup'));
|
||||
},
|
||||
|
137
www/ZFSList.js
Normal file
137
www/ZFSList.js
Normal file
@ -0,0 +1,137 @@
|
||||
Ext.define('PBS.admin.ZFSList', {
|
||||
extend: 'Ext.grid.Panel',
|
||||
xtype: 'pbsZFSList',
|
||||
|
||||
stateful: true,
|
||||
stateId: 'grid-node-zfs',
|
||||
|
||||
controller: {
|
||||
xclass: 'Ext.app.ViewController',
|
||||
|
||||
openCreateWindow: function() {
|
||||
let me = this;
|
||||
Ext.create('PBS.window.CreateZFS', {
|
||||
nodename: me.nodename,
|
||||
listeners: {
|
||||
destroy: function() { me.reload(); },
|
||||
}
|
||||
}).show();
|
||||
},
|
||||
|
||||
openDetailWindow: function() {
|
||||
let me = this;
|
||||
let view = me.getView();
|
||||
let selection = view.getSelection();
|
||||
if (!selection || selection.length < 1) return;
|
||||
|
||||
let rec = selection[0];
|
||||
let zpool = rec.get('name');
|
||||
|
||||
Ext.create('Proxmox.window.ZFSDetail', {
|
||||
zpool,
|
||||
nodename: view.nodename,
|
||||
}).show();
|
||||
},
|
||||
|
||||
reload: function() {
|
||||
let me = this;
|
||||
let view = me.getView();
|
||||
let store = view.getStore();
|
||||
store.load();
|
||||
store.sort();
|
||||
},
|
||||
|
||||
init: function(view) {
|
||||
let me = this;
|
||||
|
||||
if (!view.nodename) {
|
||||
throw "no nodename given";
|
||||
}
|
||||
|
||||
let url = `/api2/json/nodes/${view.nodename}/disks/zfs`;
|
||||
view.getStore().getProxy().setUrl(url)
|
||||
|
||||
Proxmox.Utils.monStoreErrors(view, view.getStore(), true);
|
||||
|
||||
me.reload();
|
||||
},
|
||||
},
|
||||
|
||||
columns: [
|
||||
{
|
||||
text: gettext('Name'),
|
||||
dataIndex: 'name',
|
||||
flex: 1
|
||||
},
|
||||
{
|
||||
header: gettext('Size'),
|
||||
renderer: Proxmox.Utils.format_size,
|
||||
dataIndex: 'size'
|
||||
},
|
||||
{
|
||||
header: gettext('Free'),
|
||||
renderer: Proxmox.Utils.format_size,
|
||||
dataIndex: 'free'
|
||||
},
|
||||
{
|
||||
header: gettext('Allocated'),
|
||||
renderer: Proxmox.Utils.format_size,
|
||||
dataIndex: 'alloc'
|
||||
},
|
||||
{
|
||||
header: gettext('Fragmentation'),
|
||||
renderer: function(value) {
|
||||
return value.toString() + '%';
|
||||
},
|
||||
dataIndex: 'frag'
|
||||
},
|
||||
{
|
||||
header: gettext('Health'),
|
||||
renderer: Proxmox.Utils.render_zfs_health,
|
||||
dataIndex: 'health'
|
||||
},
|
||||
{
|
||||
header: gettext('Deduplication'),
|
||||
hidden: true,
|
||||
renderer: function(value) {
|
||||
return value.toFixed(2).toString() + 'x';
|
||||
},
|
||||
dataIndex: 'dedup'
|
||||
}
|
||||
],
|
||||
|
||||
rootVisible: false,
|
||||
useArrows: true,
|
||||
|
||||
tbar: [
|
||||
{
|
||||
text: gettext('Reload'),
|
||||
iconCls: 'fa fa-refresh',
|
||||
handler: 'reload',
|
||||
},
|
||||
{
|
||||
text: gettext('Create') + ': ZFS',
|
||||
handler: 'openCreateWindow',
|
||||
},
|
||||
{
|
||||
text: gettext('Detail'),
|
||||
xtype: 'proxmoxButton',
|
||||
disabled: true,
|
||||
handler: function() {
|
||||
}
|
||||
}
|
||||
],
|
||||
|
||||
listeners: {
|
||||
itemdblclick: 'openDetailWindow',
|
||||
},
|
||||
|
||||
store: {
|
||||
fields: ['name', 'size', 'free', 'alloc', 'dedup', 'frag', 'health'],
|
||||
proxy: {
|
||||
type: 'proxmox',
|
||||
},
|
||||
sorters: 'name'
|
||||
},
|
||||
});
|
||||
|
@ -2,7 +2,19 @@ Ext.define('pbs-datastore-statistics', {
|
||||
extend: 'Ext.data.Model',
|
||||
|
||||
fields: [
|
||||
'store', 'total', 'used', 'avail', 'estimated-full-date', 'history',
|
||||
'store', 'total', 'used', 'avail', 'estimated-full-date',
|
||||
{
|
||||
name: 'history',
|
||||
convert: function(values) {
|
||||
let last = null;
|
||||
return values.map(v => {
|
||||
if (v !== undefined && v !== null) {
|
||||
last = v;
|
||||
}
|
||||
return last;
|
||||
});
|
||||
}
|
||||
},
|
||||
{
|
||||
name: 'usage',
|
||||
calculate: function(data) {
|
||||
|
@ -56,10 +56,16 @@ Ext.define('PBS.LongestTasks', {
|
||||
type: 'diff',
|
||||
autoDestroy: true,
|
||||
autoDestroyRstore: true,
|
||||
sorters: {
|
||||
sorters: [
|
||||
{
|
||||
property: 'duration',
|
||||
direction: 'DESC',
|
||||
},
|
||||
{
|
||||
property: 'upid',
|
||||
direction: 'ASC',
|
||||
},
|
||||
],
|
||||
rstore: {
|
||||
storeid: 'proxmox-tasks-dash',
|
||||
type: 'store',
|
||||
|
@ -52,7 +52,7 @@ Ext.define("PBS.window.FileBrowser", {
|
||||
extend: "Ext.window.Window",
|
||||
|
||||
width: 800,
|
||||
height: 400,
|
||||
height: 600,
|
||||
|
||||
modal: true,
|
||||
|
||||
@ -142,8 +142,13 @@ Ext.define("PBS.window.FileBrowser", {
|
||||
'backup-type': view['backup-type'],
|
||||
'backup-time': view['backup-time'],
|
||||
});
|
||||
store.load();
|
||||
store.getRoot().expand();
|
||||
store.load(() => {
|
||||
let root = store.getRoot();
|
||||
root.expand(); // always expand invisible root node
|
||||
if (root.childNodes.length === 1) {
|
||||
root.firstChild.expand();
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
control: {
|
||||
|
94
www/window/ZFSCreate.js
Normal file
94
www/window/ZFSCreate.js
Normal file
@ -0,0 +1,94 @@
|
||||
Ext.define('PBS.window.CreateZFS', {
|
||||
extend: 'Proxmox.window.Edit',
|
||||
xtype: 'pbsCreateZFS',
|
||||
|
||||
subject: 'ZFS',
|
||||
|
||||
showProgress: true,
|
||||
|
||||
onlineHelp: 'chapter_zfs',
|
||||
|
||||
width: 800,
|
||||
|
||||
url: '/nodes/localhost/disks/zfs',
|
||||
method: 'POST',
|
||||
items: [
|
||||
{
|
||||
xtype: 'inputpanel',
|
||||
onGetValues: function(values) {
|
||||
return values;
|
||||
},
|
||||
column1: [
|
||||
{
|
||||
xtype: 'proxmoxtextfield',
|
||||
name: 'name',
|
||||
fieldLabel: gettext('Name'),
|
||||
allowBlank: false
|
||||
},
|
||||
{
|
||||
xtype: 'proxmoxcheckbox',
|
||||
name: 'add-datastore',
|
||||
fieldLabel: gettext('Add Datastore'),
|
||||
value: '1'
|
||||
}
|
||||
],
|
||||
column2: [
|
||||
{
|
||||
xtype: 'proxmoxKVComboBox',
|
||||
fieldLabel: gettext('RAID Level'),
|
||||
name: 'raidlevel',
|
||||
value: 'single',
|
||||
comboItems: [
|
||||
['single', gettext('Single Disk')],
|
||||
['mirror', 'Mirror'],
|
||||
['raid10', 'RAID10'],
|
||||
['raidz', 'RAIDZ'],
|
||||
['raidz2', 'RAIDZ2'],
|
||||
['raidz3', 'RAIDZ3']
|
||||
]
|
||||
},
|
||||
{
|
||||
xtype: 'proxmoxKVComboBox',
|
||||
fieldLabel: gettext('Compression'),
|
||||
name: 'compression',
|
||||
value: 'on',
|
||||
comboItems: [
|
||||
['on', 'on'],
|
||||
['off', 'off'],
|
||||
['gzip', 'gzip'],
|
||||
['lz4', 'lz4'],
|
||||
['lzjb', 'lzjb'],
|
||||
['zle', 'zle']
|
||||
]
|
||||
},
|
||||
{
|
||||
xtype: 'proxmoxintegerfield',
|
||||
fieldLabel: gettext('ashift'),
|
||||
minValue: 9,
|
||||
maxValue: 16,
|
||||
value: '12',
|
||||
name: 'ashift'
|
||||
}
|
||||
],
|
||||
columnB: [
|
||||
{
|
||||
xtype: 'pmxMultiDiskSelector',
|
||||
name: 'devices',
|
||||
nodename: 'localhost',
|
||||
typeParam: 'usage-type',
|
||||
valueField: 'name',
|
||||
height: 200,
|
||||
emptyText: gettext('No Disks unused'),
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
xtype: 'displayfield',
|
||||
padding: '5 0 0 0',
|
||||
userCls: 'pmx-hint',
|
||||
value: 'Note: ZFS is not compatible with disks backed by a hardware ' +
|
||||
'RAID controller. For details see ' +
|
||||
'<a target="_blank" href="' + Proxmox.Utils.get_help_link('chapter_zfs') + '">the reference documentation</a>.',
|
||||
},
|
||||
],
|
||||
});
|
Reference in New Issue
Block a user