Compare commits

..

43 Commits

Author SHA1 Message Date
e9764238df make ReadChunk not require mutable self.
That way we can reduce lock contentions because we lock for much shorter
times.
2020-07-03 07:37:29 +02:00
26f499b17b ui: increase timeout for snapshot listing
the api call can take a very long time (for now), until we can
improve that, increase the timeout from the default of 30s

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-07-03 06:14:21 +02:00
cc7995ac40 src/bin/proxmox_backup_client/task.rs: split out task command 2020-07-02 18:04:29 +02:00
43abba4b4f src/bin/proxmox_backup_client/mount.rs: split out mount code 2020-07-02 17:49:59 +02:00
58f950c546 ui: consistently spell Datastore without space between words
Not even hard feeling on 'Datastore' vs. 'Data Store' but consistency
is desired in such names.
Talked shortly with Dominik, which also slightly favored the one
without space - so just go for that one.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
2020-07-02 17:20:41 +02:00
c426e65893 ui: disk create: sync and improve 'add-datastore' checkbox label
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
2020-07-02 17:06:37 +02:00
caea8d611f proxmox-backup-client: add benchmark command
This is just a start, We need to add more useful things here...
2020-07-02 14:01:57 +02:00
7d0754a6d2 pxar: fixup 'vanished-file' logic a bit
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
2020-06-30 14:41:42 +02:00
5afa0755ea pxar: fix missing newlines in warnings
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
2020-06-30 14:37:20 +02:00
40b63186a6 DataStoreConfig.js: add verify button 2020-06-30 13:28:42 +02:00
8f6088c130 DataStoreContent.js: add verify button 2020-06-30 13:22:02 +02:00
2162e2c15d src/api2/admin/datastore.rs: avoid slash in UPID strings 2020-06-30 13:11:22 +02:00
0d5ab04a90 bump version to 0.5.0-1 2020-06-29 13:01:11 +02:00
4059285649 fix typo 2020-06-29 12:59:25 +02:00
2e079b8bf2 partially revert commit 1f82f9b7b5
do it backwards compatible. Also, code was wrong because FixedIndexWriter
still computed old style csums...
2020-06-29 12:44:45 +02:00
4ff2c9b832 ui: allow to Forget (delete) backup snapshots. 2020-06-26 15:58:06 +02:00
a8e2940ff3 pxar: deal with files changing size during archiving
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
2020-06-26 11:49:51 +02:00
d5d5f2174e bump version to 0.4.0-1 2020-06-26 10:43:52 +02:00
2311238450 depend on proxmox 0.1.41 2020-06-26 10:40:47 +02:00
2ea501ffdf ui: add ZFS management
adds a ZFSList and ZFSCreate class, modeled after the one in pve

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 10:33:23 +02:00
4eb4e94918 fix test output
field separator for pools is always a tab when using -H

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 10:31:11 +02:00
817bcda848 src/backup/verify.rs: do not stop on server shutdown
This is a read-only task, so there is no need to stop.
2020-06-26 09:45:59 +02:00
f6de2c7359 WorkerTask: add warnings and count them
so that we have one level more between errors and OK

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 09:42:11 +02:00
3f0b9c10ec ui: dashboard: remove 'wobbling' of tasks that have the same duration
by sorting them by upid after sorting by duration

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 09:13:33 +02:00
2b66abbfab ui: dashboard: use last value for holes in history graph
it is only designed to be a quick overview, so having holes there
is not really pretty and since we do not even show any date
for the points, we can simply reuse the last value for holes

the 'real' graph with holes is still available on the
DataStoreStatistics panel

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 09:13:16 +02:00
402c8861d8 fix typo
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 09:12:29 +02:00
3f683799a8 improve 'debug' parameter
instead of checking on '1' or 'true', check that it is there and not
'0' and 'false'. this allows using simply

https://foo:8007/?debug

instead of

https://foo:8007/?debug=1

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 09:12:14 +02:00
573bcd9a92 ui: automatically add 'localhost' as nodename for all panels
this will make refactoring easier for panels that are reused from pve
(where we always have a hostname)

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 09:11:36 +02:00
90779237ae ui: show proper loadMask for DataStoreContent
we have to use the correct store, and we have to manually show the
error (since monStoreErrors only works for Proxmox Proxies)

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
2020-06-26 09:11:10 +02:00
1f82f9b7b5 src/backup/index.rs: add compute_csum
And use it for fixed and dynamic index. Please note that this
changes checksums for fixed indexes, so restore older backups
will fails now (not backward compatible).
2020-06-26 09:00:34 +02:00
19b5c3c43e examples/upload-speed.rs: fix compile error 2020-06-26 08:59:51 +02:00
fe3e65c3ea src/api2/backup.rs: call register_chunk in previous download api 2020-06-26 08:22:46 +02:00
fdaab0df4e src/backup/index.rs: add chunk_info method 2020-06-26 08:14:45 +02:00
b957aa81bd update backup api for incremental backup
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
2020-06-26 07:17:08 +02:00
8ea00f6e49 allow to abort verify jobs
And improve job description rendering on gui.
2020-06-25 12:56:36 +02:00
4bd789b0fa ui: file browser: expand child node if only one archive present
Get the first visible node through the Ext.data.NodeInterface defined
"firstChild" element and expand that if there's only one archive
present.

Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
2020-06-25 12:52:42 +02:00
2f050cf2ed ui: file browser: adapt height for 4:3 instead of weird 2:1 ratio
Signed-off-by: Thomas Lamprecht <t.lamprecht@proxmox.com>
2020-06-25 11:59:23 +02:00
e22f4882e7 extract create_download_response API helper
and put it into a new "api2::helpers" module.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
2020-06-25 11:57:37 +02:00
c65bc99a41 [chore] bump to using pxar 0.2.0
This breaks all previously created pxar archives!

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
2020-06-25 09:46:56 +02:00
355c055e81 src/bin/proxmox-backup-manager.rs: implement verify 2020-06-24 13:35:21 +02:00
c2009e5309 src/api2/admin/datastore.rs: add verify api 2020-06-24 13:35:21 +02:00
23f74c190e src/backup/backup_info.rs: impl Display for BackupGroup 2020-06-24 13:35:21 +02:00
a6f8728339 update to pxar 0.1.9, update ReadAt implementations
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
2020-06-24 11:57:12 +02:00
45 changed files with 1630 additions and 810 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "proxmox-backup"
version = "0.3.0"
version = "0.5.0"
authors = ["Dietmar Maurer <dietmar@proxmox.com>"]
edition = "2018"
license = "AGPL-3"
@ -38,11 +38,11 @@ pam-sys = "0.5"
percent-encoding = "2.1"
pin-utils = "0.1.0"
pathpatterns = "0.1.1"
proxmox = { version = "0.1.40", features = [ "sortable-macro", "api-macro" ] }
proxmox = { version = "0.1.41", features = [ "sortable-macro", "api-macro" ] }
#proxmox = { git = "ssh://gitolite3@proxdev.maurer-it.com/rust/proxmox", version = "0.1.2", features = [ "sortable-macro", "api-macro" ] }
#proxmox = { path = "../proxmox/proxmox", features = [ "sortable-macro", "api-macro" ] }
proxmox-fuse = "0.1.0"
pxar = { version = "0.1.8", features = [ "tokio-io", "futures-io" ] }
pxar = { version = "0.2.0", features = [ "tokio-io", "futures-io" ] }
#pxar = { path = "../pxar", features = [ "tokio-io", "futures-io" ] }
regex = "1.2"
rustyline = "6"

View File

@ -30,8 +30,6 @@ Chores:
* move tools/xattr.rs and tools/acl.rs to proxmox/sys/linux/
* recompute PXAR_ header types from strings: avoid using numbers from casync
* remove pbs-* systemd timers and services on package purge

20
debian/changelog vendored
View File

@ -1,6 +1,24 @@
rust-proxmox-backup (0.5.0-1) unstable; urgency=medium
* partially revert commit 1f82f9b7b5d231da22a541432d5617cb303c0000
* ui: allow to Forget (delete) backup snapshots
* pxar: deal with files changing size during archiving
-- Proxmox Support Team <support@proxmox.com> Mon, 29 Jun 2020 13:00:54 +0200
rust-proxmox-backup (0.4.0-1) unstable; urgency=medium
* change api for incremental backups mode
* zfs disk management gui
-- Proxmox Support Team <support@proxmox.com> Fri, 26 Jun 2020 10:43:27 +0200
rust-proxmox-backup (0.3.0-1) unstable; urgency=medium
* support incrtemental backups mode
* support incremental backups mode
* new disk management

View File

@ -17,7 +17,7 @@ async fn upload_speed() -> Result<usize, Error> {
let backup_time = chrono::Utc::now();
let client = BackupWriter::start(client, datastore, "host", "speedtest", backup_time, false).await?;
let client = BackupWriter::start(client, None, datastore, "host", "speedtest", backup_time, false).await?;
println!("start upload speed test");
let res = client.upload_speedtest().await?;

View File

@ -9,6 +9,7 @@ pub mod status;
pub mod types;
pub mod version;
pub mod pull;
mod helpers;
use proxmox::api::router::SubdirMap;
use proxmox::api::Router;

View File

@ -394,6 +394,90 @@ pub fn status(
crate::tools::disks::disk_usage(&datastore.base_path())
}
#[api(
input: {
properties: {
store: {
schema: DATASTORE_SCHEMA,
},
"backup-type": {
schema: BACKUP_TYPE_SCHEMA,
optional: true,
},
"backup-id": {
schema: BACKUP_ID_SCHEMA,
optional: true,
},
"backup-time": {
schema: BACKUP_TIME_SCHEMA,
optional: true,
},
},
},
returns: {
schema: UPID_SCHEMA,
},
access: {
permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP, true), // fixme
},
)]
/// Verify backups.
///
/// This function can verify a single backup snapshot, all backup from a backup group,
/// or all backups in the datastore.
pub fn verify(
store: String,
backup_type: Option<String>,
backup_id: Option<String>,
backup_time: Option<i64>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
let datastore = DataStore::lookup_datastore(&store)?;
let worker_id;
let mut backup_dir = None;
let mut backup_group = None;
match (backup_type, backup_id, backup_time) {
(Some(backup_type), Some(backup_id), Some(backup_time)) => {
worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_time);
let dir = BackupDir::new(backup_type, backup_id, backup_time);
backup_dir = Some(dir);
}
(Some(backup_type), Some(backup_id), None) => {
worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
let group = BackupGroup::new(backup_type, backup_id);
backup_group = Some(group);
}
(None, None, None) => {
worker_id = store.clone();
}
_ => bail!("parameters do not spefify a backup group or snapshot"),
}
let username = rpcenv.get_user().unwrap();
let to_stdout = if rpcenv.env_type() == RpcEnvironmentType::CLI { true } else { false };
let upid_str = WorkerTask::new_thread(
"verify", Some(worker_id.clone()), &username, to_stdout, move |worker|
{
let success = if let Some(backup_dir) = backup_dir {
verify_backup_dir(&datastore, &backup_dir, &worker)?
} else if let Some(backup_group) = backup_group {
verify_backup_group(&datastore, &backup_group, &worker)?
} else {
verify_all_backups(&datastore, &worker)?
};
if !success {
bail!("verfication failed - please check the log for details");
}
Ok(())
})?;
Ok(json!(upid_str))
}
#[macro_export]
macro_rules! add_common_prune_prameters {
( [ $( $list1:tt )* ] ) => {
@ -1261,6 +1345,11 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
&Router::new()
.upload(&API_METHOD_UPLOAD_BACKUP_LOG)
),
(
"verify",
&Router::new()
.post(&API_METHOD_VERIFY)
),
];
const DATASTORE_INFO_ROUTER: Router = Router::new()

View File

@ -10,7 +10,7 @@ use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironm
use proxmox::api::router::SubdirMap;
use proxmox::api::schema::*;
use crate::tools::{self, WrappedReaderStream};
use crate::tools;
use crate::server::{WorkerTask, H2Service};
use crate::backup::*;
use crate::api2::types::*;
@ -199,7 +199,6 @@ pub const BACKUP_API_SUBDIRS: SubdirMap = &[
),
(
"dynamic_index", &Router::new()
.download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
.post(&API_METHOD_CREATE_DYNAMIC_INDEX)
.put(&API_METHOD_DYNAMIC_APPEND)
),
@ -222,10 +221,13 @@ pub const BACKUP_API_SUBDIRS: SubdirMap = &[
),
(
"fixed_index", &Router::new()
.download(&API_METHOD_FIXED_CHUNK_INDEX)
.post(&API_METHOD_CREATE_FIXED_INDEX)
.put(&API_METHOD_FIXED_APPEND)
),
(
"previous", &Router::new()
.download(&API_METHOD_DOWNLOAD_PREVIOUS)
),
(
"speedtest", &Router::new()
.upload(&API_METHOD_UPLOAD_SPEEDTEST)
@ -610,20 +612,17 @@ fn finish_backup (
}
#[sortable]
pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
&ApiHandler::AsyncHttp(&dynamic_chunk_index),
pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new(
&ApiHandler::AsyncHttp(&download_previous),
&ObjectSchema::new(
r###"
Download the dynamic chunk index from the previous backup.
Simply returns an empty list if this is the first backup.
"### ,
"Download archive from previous backup.",
&sorted!([
("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
]),
)
);
fn dynamic_chunk_index(
fn download_previous(
_parts: Parts,
_req_body: Body,
param: Value,
@ -636,130 +635,38 @@ fn dynamic_chunk_index(
let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
if !archive_name.ends_with(".didx") {
bail!("wrong archive extension: '{}'", archive_name);
}
let empty_response = {
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?
};
let last_backup = match &env.last_backup {
Some(info) => info,
None => return Ok(empty_response),
None => bail!("no previous backup"),
};
let mut path = last_backup.backup_dir.relative_path();
let mut path = env.datastore.snapshot_path(&last_backup.backup_dir);
path.push(&archive_name);
let index = match env.datastore.open_dynamic_reader(path) {
Ok(index) => index,
Err(_) => {
env.log(format!("there is no last backup for archive '{}'", archive_name));
return Ok(empty_response);
{
let index: Option<Box<dyn IndexFile>> = match archive_type(&archive_name)? {
ArchiveType::FixedIndex => {
let index = env.datastore.open_fixed_reader(&path)?;
Some(Box::new(index))
}
ArchiveType::DynamicIndex => {
let index = env.datastore.open_dynamic_reader(&path)?;
Some(Box::new(index))
}
_ => { None }
};
if let Some(index) = index {
env.log(format!("register chunks in '{}' from previous backup.", archive_name));
env.log(format!("download last backup index for archive '{}'", archive_name));
let count = index.index_count();
for pos in 0..count {
let info = index.chunk_info(pos)?;
let size = info.size() as u32;
env.register_chunk(info.digest, size)?;
for pos in 0..index.index_count() {
let info = index.chunk_info(pos).unwrap();
let size = info.range.end - info.range.start;
env.register_chunk(info.digest, size as u32)?;
}
}
}
let reader = DigestListEncoder::new(Box::new(index));
let stream = WrappedReaderStream::new(reader);
// fixme: set size, content type?
let response = http::Response::builder()
.status(200)
.body(Body::wrap_stream(stream))?;
Ok(response)
}.boxed()
}
#[sortable]
pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
&ApiHandler::AsyncHttp(&fixed_chunk_index),
&ObjectSchema::new(
r###"
Download the fixed chunk index from the previous backup.
Simply returns an empty list if this is the first backup.
"### ,
&sorted!([
("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
]),
)
);
fn fixed_chunk_index(
_parts: Parts,
_req_body: Body,
param: Value,
_info: &ApiMethod,
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
let env: &BackupEnvironment = rpcenv.as_ref();
let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
if !archive_name.ends_with(".fidx") {
bail!("wrong archive extension: '{}'", archive_name);
}
let empty_response = {
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?
};
let last_backup = match &env.last_backup {
Some(info) => info,
None => return Ok(empty_response),
};
let mut path = last_backup.backup_dir.relative_path();
path.push(&archive_name);
let index = match env.datastore.open_fixed_reader(path) {
Ok(index) => index,
Err(_) => {
env.log(format!("there is no last backup for archive '{}'", archive_name));
return Ok(empty_response);
}
};
env.log(format!("download last backup index for archive '{}'", archive_name));
let count = index.index_count();
let image_size = index.index_bytes();
for pos in 0..count {
let digest = index.index_digest(pos).unwrap();
// Note: last chunk can be smaller
let start = (pos*index.chunk_size) as u64;
let mut end = start + index.chunk_size as u64;
if end > image_size { end = image_size; }
let size = (end - start) as u32;
env.register_chunk(*digest, size)?;
}
let reader = DigestListEncoder::new(Box::new(index));
let stream = WrappedReaderStream::new(reader);
// fixme: set size, content type?
let response = http::Response::builder()
.status(200)
.body(Body::wrap_stream(stream))?;
Ok(response)
env.log(format!("download '{}' from previous backup.", archive_name));
crate::api2::helpers::create_download_response(path).await
}.boxed()
}

23
src/api2/helpers.rs Normal file
View File

@ -0,0 +1,23 @@
use std::path::PathBuf;
use anyhow::Error;
use futures::*;
use hyper::{Body, Response, StatusCode, header};
use proxmox::http_err;
pub async fn create_download_response(path: PathBuf) -> Result<Response<Body>, Error> {
let file = tokio::fs::File::open(path.clone())
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path.clone(), err)))
.await?;
let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
.map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
let body = Body::wrap_stream(payload);
// fixme: set other headers ?
Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(body)
.unwrap())
}

View File

@ -17,6 +17,7 @@ use crate::server::{WorkerTask, H2Service};
use crate::tools;
use crate::config::acl::PRIV_DATASTORE_READ;
use crate::config::cached_user_info::CachedUserInfo;
use crate::api2::helpers;
mod environment;
use environment::*;
@ -187,26 +188,9 @@ fn download_file(
path.push(env.backup_dir.relative_path());
path.push(&file_name);
let path2 = path.clone();
let path3 = path.clone();
env.log(format!("download {:?}", path.clone()));
let file = tokio::fs::File::open(path)
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
.await?;
env.log(format!("download {:?}", path3));
let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
.map_ok(|bytes| hyper::body::Bytes::from(bytes.freeze()));
let body = Body::wrap_stream(payload);
// fixme: set other headers ?
Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body(body)
.unwrap())
helpers::create_download_response(path).await
}.boxed()
}

View File

@ -198,6 +198,9 @@ pub use prune::*;
mod datastore;
pub use datastore::*;
mod verify;
pub use verify::*;
mod catalog_shell;
pub use catalog_shell::*;

View File

@ -45,7 +45,7 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
}
impl<S, I> AsyncRead for AsyncIndexReader<S, I> where
S: AsyncReadChunk + Unpin + 'static,
S: AsyncReadChunk + Unpin + Sync + 'static,
I: IndexFile + Unpin
{
fn poll_read(
@ -74,7 +74,7 @@ I: IndexFile + Unpin
this.current_chunk_digest = digest;
let mut store = match this.store.take() {
let store = match this.store.take() {
Some(store) => store,
None => {
return Poll::Ready(Err(io_format_err!("could not find store")));

View File

@ -141,6 +141,14 @@ impl BackupGroup {
}
}
impl std::fmt::Display for BackupGroup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let backup_type = self.backup_type();
let id = self.backup_id();
write!(f, "{}/{}", backup_type, id)
}
}
impl std::str::FromStr for BackupGroup {
type Err = Error;

View File

@ -4,7 +4,7 @@ use std::ops::Range;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::task::Context;
use std::pin::Pin;
use anyhow::{bail, format_err, Error};
@ -13,6 +13,7 @@ use proxmox::tools::io::ReadExt;
use proxmox::tools::uuid::Uuid;
use proxmox::tools::vec;
use proxmox::tools::mmap::Mmap;
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
use super::chunk_stat::ChunkStat;
use super::chunk_store::ChunkStore;
@ -123,25 +124,6 @@ impl DynamicIndexReader {
})
}
#[allow(clippy::cast_ptr_alignment)]
pub fn chunk_info(&self, pos: usize) -> Result<ChunkReadInfo, Error> {
if pos >= self.index.len() {
bail!("chunk index out of range");
}
let start = if pos == 0 {
0
} else {
self.index[pos - 1].end()
};
let end = self.index[pos].end();
Ok(ChunkReadInfo {
range: start..end,
digest: self.index[pos].digest.clone(),
})
}
#[inline]
#[allow(clippy::cast_ptr_alignment)]
fn chunk_end(&self, pos: usize) -> u64 {
@ -159,24 +141,6 @@ impl DynamicIndexReader {
&self.index[pos].digest
}
/// Compute checksum and data size
pub fn compute_csum(&self) -> ([u8; 32], u64) {
let mut csum = openssl::sha::Sha256::new();
for entry in &self.index {
csum.update(&entry.end_le.to_ne_bytes());
csum.update(&entry.digest);
}
let csum = csum.finish();
(
csum,
self.index
.last()
.map(|entry| entry.end())
.unwrap_or(0)
)
}
// TODO: can we use std::slice::binary_search with Mmap now?
fn binary_search(
&self,
@ -224,6 +188,34 @@ impl IndexFile for DynamicIndexReader {
self.chunk_end(self.index.len() - 1)
}
}
fn compute_csum(&self) -> ([u8; 32], u64) {
let mut csum = openssl::sha::Sha256::new();
let mut chunk_end = 0;
for pos in 0..self.index_count() {
let info = self.chunk_info(pos).unwrap();
chunk_end = info.range.end;
csum.update(&chunk_end.to_le_bytes());
csum.update(&info.digest);
}
let csum = csum.finish();
(csum, chunk_end)
}
#[allow(clippy::cast_ptr_alignment)]
fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> {
if pos >= self.index.len() {
return None;
}
let start = if pos == 0 { 0 } else { self.index[pos - 1].end() };
let end = self.index[pos].end();
Some(ChunkReadInfo {
range: start..end,
digest: self.index[pos].digest.clone(),
})
}
}
struct CachedChunk {
@ -263,7 +255,10 @@ struct ChunkCacher<'a, S> {
impl<'a, S: ReadChunk> crate::tools::lru_cache::Cacher<usize, CachedChunk> for ChunkCacher<'a, S> {
fn fetch(&mut self, index: usize) -> Result<Option<CachedChunk>, Error> {
let info = self.index.chunk_info(index)?;
let info = match self.index.chunk_info(index) {
Some(info) => info,
None => bail!("chunk index out of range"),
};
let range = info.range;
let data = self.store.read_chunk(&info.digest)?;
CachedChunk::new(range, data).map(Some)
@ -416,19 +411,26 @@ impl<R: ReadChunk> LocalDynamicReadAt<R> {
}
}
impl<R: ReadChunk> pxar::accessor::ReadAt for LocalDynamicReadAt<R> {
fn poll_read_at(
self: Pin<&Self>,
impl<R: ReadChunk> ReadAt for LocalDynamicReadAt<R> {
fn start_read_at<'a>(
self: Pin<&'a Self>,
_cx: &mut Context,
buf: &mut [u8],
buf: &'a mut [u8],
offset: u64,
) -> Poll<io::Result<usize>> {
) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
use std::io::Read;
tokio::task::block_in_place(move || {
MaybeReady::Ready(tokio::task::block_in_place(move || {
let mut reader = self.inner.lock().unwrap();
reader.seek(SeekFrom::Start(offset))?;
Poll::Ready(Ok(reader.read(buf)?))
})
Ok(reader.read(buf)?)
}))
}
fn poll_complete<'a>(
self: Pin<&'a Self>,
_op: ReadAtOperation<'a>,
) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
panic!("LocalDynamicReadAt::start_read_at returned Pending");
}
}

View File

@ -1,10 +1,9 @@
use anyhow::{bail, format_err, Error};
use std::convert::TryInto;
use std::io::{Seek, SeekFrom};
use super::chunk_stat::*;
use super::chunk_store::*;
use super::IndexFile;
use super::{IndexFile, ChunkReadInfo};
use crate::tools::{self, epoch_now_u64};
use chrono::{Local, TimeZone};
@ -147,38 +146,6 @@ impl FixedIndexReader {
Ok(())
}
pub fn chunk_info(&self, pos: usize) -> Result<(u64, u64, [u8; 32]), Error> {
if pos >= self.index_length {
bail!("chunk index out of range");
}
let start = (pos * self.chunk_size) as u64;
let mut end = start + self.chunk_size as u64;
if end > self.size {
end = self.size;
}
let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
unsafe {
std::ptr::copy_nonoverlapping(
self.index.add(pos * 32),
(*digest.as_mut_ptr()).as_mut_ptr(),
32,
);
}
Ok((start, end, unsafe { digest.assume_init() }))
}
#[inline]
fn chunk_digest(&self, pos: usize) -> &[u8; 32] {
if pos >= self.index_length {
panic!("chunk index out of range");
}
let slice = unsafe { std::slice::from_raw_parts(self.index.add(pos * 32), 32) };
slice.try_into().unwrap()
}
#[inline]
fn chunk_end(&self, pos: usize) -> u64 {
if pos >= self.index_length {
@ -193,20 +160,6 @@ impl FixedIndexReader {
}
}
/// Compute checksum and data size
pub fn compute_csum(&self) -> ([u8; 32], u64) {
let mut csum = openssl::sha::Sha256::new();
let mut chunk_end = 0;
for pos in 0..self.index_length {
chunk_end = self.chunk_end(pos);
let digest = self.chunk_digest(pos);
csum.update(digest);
}
let csum = csum.finish();
(csum, chunk_end)
}
pub fn print_info(&self) {
println!("Size: {}", self.size);
println!("ChunkSize: {}", self.chunk_size);
@ -234,6 +187,38 @@ impl IndexFile for FixedIndexReader {
fn index_bytes(&self) -> u64 {
self.size
}
fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo> {
if pos >= self.index_length {
return None;
}
let start = (pos * self.chunk_size) as u64;
let mut end = start + self.chunk_size as u64;
if end > self.size {
end = self.size;
}
let digest = self.index_digest(pos).unwrap();
Some(ChunkReadInfo {
range: start..end,
digest: *digest,
})
}
fn compute_csum(&self) -> ([u8; 32], u64) {
let mut csum = openssl::sha::Sha256::new();
let mut chunk_end = 0;
for pos in 0..self.index_count() {
let info = self.chunk_info(pos).unwrap();
chunk_end = info.range.end;
csum.update(&info.digest);
}
let csum = csum.finish();
(csum, chunk_end)
}
}
pub struct FixedIndexWriter {
@ -511,18 +496,17 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
let index = &self.index;
let (start, end, digest) = index.chunk_info(idx)?;
let info = match index.chunk_info(idx) {
Some(info) => info,
None => bail!("chunk index out of range"),
};
// fixme: avoid copy
let data = self.store.read_chunk(&digest)?;
if (end - start) != data.len() as u64 {
bail!(
"read chunk with wrong size ({} != {}",
(end - start),
data.len()
);
let data = self.store.read_chunk(&info.digest)?;
let size = info.range.end - info.range.start;
if size != data.len() as u64 {
bail!("read chunk with wrong size ({} != {}", size, data.len());
}
self.read_buffer.clear();
@ -530,8 +514,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
self.buffered_chunk_idx = idx;
self.buffered_chunk_start = start as u64;
//println!("BUFFER {} {}", self.buffered_chunk_start, end);
self.buffered_chunk_start = info.range.start as u64;
Ok(())
}
}

View File

@ -1,11 +1,5 @@
use std::collections::HashMap;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut};
use anyhow::{format_err, Error};
use futures::*;
pub struct ChunkReadInfo {
pub range: Range<u64>,
@ -26,6 +20,10 @@ pub trait IndexFile {
fn index_count(&self) -> usize;
fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>;
fn index_bytes(&self) -> u64;
fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
/// Compute index checksum and size
fn compute_csum(&self) -> ([u8; 32], u64);
/// Returns most often used chunks
fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> {
@ -59,111 +57,3 @@ pub trait IndexFile {
map
}
}
/// Encode digest list from an `IndexFile` into a binary stream
///
/// The reader simply returns a birary stream of 32 byte digest values.
pub struct DigestListEncoder {
index: Box<dyn IndexFile + Send + Sync>,
pos: usize,
count: usize,
}
impl DigestListEncoder {
pub fn new(index: Box<dyn IndexFile + Send + Sync>) -> Self {
let count = index.index_count();
Self { index, pos: 0, count }
}
}
impl std::io::Read for DigestListEncoder {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
if buf.len() < 32 {
panic!("read buffer too small");
}
if self.pos < self.count {
let mut written = 0;
loop {
let digest = self.index.index_digest(self.pos).unwrap();
buf[written..(written + 32)].copy_from_slice(digest);
self.pos += 1;
written += 32;
if self.pos >= self.count {
break;
}
if (written + 32) >= buf.len() {
break;
}
}
Ok(written)
} else {
Ok(0)
}
}
}
/// Decodes a Stream<Item=Bytes> into Stream<Item=<[u8;32]>
///
/// The reader simply returns a birary stream of 32 byte digest values.
pub struct DigestListDecoder<S: Unpin> {
input: S,
buffer: BytesMut,
}
impl<S: Unpin> DigestListDecoder<S> {
pub fn new(input: S) -> Self {
Self { input, buffer: BytesMut::new() }
}
}
impl<S: Unpin> Unpin for DigestListDecoder<S> {}
impl<S: Unpin, E> Stream for DigestListDecoder<S>
where
S: Stream<Item=Result<Bytes, E>>,
E: Into<Error>,
{
type Item = Result<[u8; 32], Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if this.buffer.len() >= 32 {
let left = this.buffer.split_to(32);
let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit();
unsafe {
(*digest.as_mut_ptr()).copy_from_slice(&left[..]);
return Poll::Ready(Some(Ok(digest.assume_init())));
}
}
match Pin::new(&mut this.input).poll_next(cx) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Some(Err(err.into())));
}
Poll::Ready(Some(Ok(data))) => {
this.buffer.extend_from_slice(&data);
// continue
}
Poll::Ready(None) => {
let rest = this.buffer.len();
if rest == 0 {
return Poll::Ready(None);
}
return Poll::Ready(Some(Err(format_err!(
"got small digest ({} != 32).",
rest,
))));
}
}
}
}
}

View File

@ -11,10 +11,10 @@ use super::datastore::DataStore;
/// The ReadChunk trait allows reading backup data chunks (local or remote)
pub trait ReadChunk {
/// Returns the encoded chunk data
fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error>;
fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error>;
/// Returns the decoded chunk data
fn read_chunk(&mut self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
}
#[derive(Clone)]
@ -33,7 +33,7 @@ impl LocalChunkReader {
}
impl ReadChunk for LocalChunkReader {
fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
let (path, _) = self.store.chunk_path(digest);
let raw_data = proxmox::tools::fs::file_get_contents(&path)?;
let chunk = DataBlob::from_raw(raw_data)?;
@ -42,7 +42,7 @@ impl ReadChunk for LocalChunkReader {
Ok(chunk)
}
fn read_chunk(&mut self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
let chunk = ReadChunk::read_raw_chunk(self, digest)?;
let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
@ -56,20 +56,20 @@ impl ReadChunk for LocalChunkReader {
pub trait AsyncReadChunk: Send {
/// Returns the encoded chunk data
fn read_raw_chunk<'a>(
&'a mut self,
&'a self,
digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>>;
/// Returns the decoded chunk data
fn read_chunk<'a>(
&'a mut self,
&'a self,
digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>>;
}
impl AsyncReadChunk for LocalChunkReader {
fn read_raw_chunk<'a>(
&'a mut self,
&'a self,
digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
Box::pin(async move{
@ -84,7 +84,7 @@ impl AsyncReadChunk for LocalChunkReader {
}
fn read_chunk<'a>(
&'a mut self,
&'a self,
digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
Box::pin(async move {

196
src/backup/verify.rs Normal file
View File

@ -0,0 +1,196 @@
use anyhow::{bail, Error};
use crate::server::WorkerTask;
use super::{
DataStore, BackupGroup, BackupDir, BackupInfo, IndexFile,
ENCR_COMPR_BLOB_MAGIC_1_0, ENCRYPTED_BLOB_MAGIC_1_0,
FileInfo, ArchiveType, archive_type,
};
fn verify_blob(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
let (blob, raw_size) = datastore.load_blob(backup_dir, &info.filename)?;
let csum = openssl::sha::sha256(blob.raw_data());
if raw_size != info.size {
bail!("wrong size ({} != {})", info.size, raw_size);
}
if csum != info.csum {
bail!("wrong index checksum");
}
blob.verify_crc()?;
let magic = blob.magic();
if magic == &ENCR_COMPR_BLOB_MAGIC_1_0 || magic == &ENCRYPTED_BLOB_MAGIC_1_0 {
return Ok(());
}
blob.decode(None)?;
Ok(())
}
fn verify_index_chunks(
datastore: &DataStore,
index: Box<dyn IndexFile>,
worker: &WorkerTask,
) -> Result<(), Error> {
for pos in 0..index.index_count() {
worker.fail_on_abort()?;
let info = index.chunk_info(pos).unwrap();
let size = info.range.end - info.range.start;
datastore.verify_stored_chunk(&info.digest, size)?;
}
Ok(())
}
fn verify_fixed_index(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo, worker: &WorkerTask) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
path.push(&info.filename);
let index = datastore.open_fixed_reader(&path)?;
let (csum, size) = index.compute_csum();
if size != info.size {
bail!("wrong size ({} != {})", info.size, size);
}
if csum != info.csum {
bail!("wrong index checksum");
}
verify_index_chunks(datastore, Box::new(index), worker)
}
fn verify_dynamic_index(datastore: &DataStore, backup_dir: &BackupDir, info: &FileInfo, worker: &WorkerTask) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
path.push(&info.filename);
let index = datastore.open_dynamic_reader(&path)?;
let (csum, size) = index.compute_csum();
if size != info.size {
bail!("wrong size ({} != {})", info.size, size);
}
if csum != info.csum {
bail!("wrong index checksum");
}
verify_index_chunks(datastore, Box::new(index), worker)
}
/// Verify a single backup snapshot
///
/// This checks all archives inside a backup snapshot.
/// Errors are logged to the worker log.
///
/// Returns
/// - Ok(true) if verify is successful
/// - Ok(false) if there were verification errors
/// - Err(_) if task was aborted
pub fn verify_backup_dir(datastore: &DataStore, backup_dir: &BackupDir, worker: &WorkerTask) -> Result<bool, Error> {
let manifest = match datastore.load_manifest(&backup_dir) {
Ok((manifest, _)) => manifest,
Err(err) => {
worker.log(format!("verify {}:{} - manifest load error: {}", datastore.name(), backup_dir, err));
return Ok(false);
}
};
worker.log(format!("verify {}:{}", datastore.name(), backup_dir));
let mut error_count = 0;
for info in manifest.files() {
let result = proxmox::try_block!({
worker.log(format!(" check {}", info.filename));
match archive_type(&info.filename)? {
ArchiveType::FixedIndex => verify_fixed_index(&datastore, &backup_dir, info, worker),
ArchiveType::DynamicIndex => verify_dynamic_index(&datastore, &backup_dir, info, worker),
ArchiveType::Blob => verify_blob(&datastore, &backup_dir, info),
}
});
worker.fail_on_abort()?;
if let Err(err) = result {
worker.log(format!("verify {}:{}/{} failed: {}", datastore.name(), backup_dir, info.filename, err));
error_count += 1;
}
}
Ok(error_count == 0)
}
/// Verify all backups inside a backup group
///
/// Errors are logged to the worker log.
///
/// Returns
/// - Ok(true) if verify is successful
/// - Ok(false) if there were verification errors
/// - Err(_) if task was aborted
pub fn verify_backup_group(datastore: &DataStore, group: &BackupGroup, worker: &WorkerTask) -> Result<bool, Error> {
let mut list = match group.list_backups(&datastore.base_path()) {
Ok(list) => list,
Err(err) => {
worker.log(format!("verify group {}:{} - unable to list backups: {}", datastore.name(), group, err));
return Ok(false);
}
};
worker.log(format!("verify group {}:{}", datastore.name(), group));
let mut error_count = 0;
BackupInfo::sort_list(&mut list, false); // newest first
for info in list {
if !verify_backup_dir(datastore, &info.backup_dir, worker)? {
error_count += 1;
}
}
Ok(error_count == 0)
}
/// Verify all backups inside a datastore
///
/// Errors are logged to the worker log.
///
/// Returns
/// - Ok(true) if verify is successful
/// - Ok(false) if there were verification errors
/// - Err(_) if task was aborted
pub fn verify_all_backups(datastore: &DataStore, worker: &WorkerTask) -> Result<bool, Error> {
let list = match BackupGroup::list_groups(&datastore.base_path()) {
Ok(list) => list,
Err(err) => {
worker.log(format!("verify datastore {} - unable to list backups: {}", datastore.name(), err));
return Ok(false);
}
};
worker.log(format!("verify datastore {}", datastore.name()));
let mut error_count = 0;
for group in list {
if !verify_backup_group(datastore, &group, worker)? {
error_count += 1;
}
}
Ok(error_count == 0)
}

View File

@ -1,21 +1,16 @@
use std::collections::{HashSet, HashMap};
use std::ffi::OsStr;
use std::io::{self, Write, Seek, SeekFrom};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::RawFd;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::task::Context;
use anyhow::{bail, format_err, Error};
use chrono::{Local, DateTime, Utc, TimeZone};
use futures::future::FutureExt;
use futures::select;
use futures::stream::{StreamExt, TryStreamExt};
use nix::unistd::{fork, ForkResult, pipe};
use serde_json::{json, Value};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use xdg::BaseDirectories;
@ -27,6 +22,7 @@ use proxmox::api::{ApiHandler, ApiMethod, RpcEnvironment};
use proxmox::api::schema::*;
use proxmox::api::cli::*;
use proxmox::api::api;
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
use proxmox_backup::tools;
use proxmox_backup::api2::types::*;
@ -59,16 +55,19 @@ use proxmox_backup::backup::{
Shell,
};
mod proxmox_backup_client;
use proxmox_backup_client::*;
const ENV_VAR_PBS_FINGERPRINT: &str = "PBS_FINGERPRINT";
const ENV_VAR_PBS_PASSWORD: &str = "PBS_PASSWORD";
const REPO_URL_SCHEMA: Schema = StringSchema::new("Repository URL.")
pub const REPO_URL_SCHEMA: Schema = StringSchema::new("Repository URL.")
.format(&BACKUP_REPO_URL)
.max_length(256)
.schema();
const KEYFILE_SCHEMA: Schema = StringSchema::new(
pub const KEYFILE_SCHEMA: Schema = StringSchema::new(
"Path to encryption key. All data will be encrypted using this key.")
.schema();
@ -83,7 +82,7 @@ fn get_default_repository() -> Option<String> {
std::env::var("PBS_REPOSITORY").ok()
}
fn extract_repository_from_value(
pub fn extract_repository_from_value(
param: &Value,
) -> Result<BackupRepository, Error> {
@ -156,7 +155,7 @@ fn record_repository(repo: &BackupRepository) {
let _ = replace_file(path, new_data.to_string().as_bytes(), CreateOptions::new());
}
fn complete_repository(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
pub fn complete_repository(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
let mut result = vec![];
@ -240,7 +239,7 @@ async fn api_datastore_list_snapshots(
Ok(result["data"].take())
}
async fn api_datastore_latest_snapshot(
pub async fn api_datastore_latest_snapshot(
client: &HttpClient,
store: &str,
group: BackupGroup,
@ -260,16 +259,15 @@ async fn api_datastore_latest_snapshot(
Ok((group.backup_type().to_owned(), group.backup_id().to_owned(), backup_time))
}
async fn backup_directory<P: AsRef<Path>>(
client: &BackupWriter,
previous_manifest: Option<Arc<BackupManifest>>,
dir_path: P,
archive_name: &str,
chunk_size: Option<usize>,
device_set: Option<HashSet<u64>>,
verbose: bool,
skip_lost_and_found: bool,
crypt_config: Option<Arc<CryptConfig>>,
catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
exclude_pattern: Vec<MatchEntry>,
entries_max: usize,
@ -299,7 +297,7 @@ async fn backup_directory<P: AsRef<Path>>(
});
let stats = client
.upload_stream(archive_name, stream, "dynamic", None, crypt_config)
.upload_stream(previous_manifest, archive_name, stream, "dynamic", None)
.await?;
Ok(stats)
@ -307,12 +305,12 @@ async fn backup_directory<P: AsRef<Path>>(
async fn backup_image<P: AsRef<Path>>(
client: &BackupWriter,
previous_manifest: Option<Arc<BackupManifest>>,
image_path: P,
archive_name: &str,
image_size: u64,
chunk_size: Option<usize>,
_verbose: bool,
crypt_config: Option<Arc<CryptConfig>>,
) -> Result<BackupStats, Error> {
let path = image_path.as_ref().to_owned();
@ -325,7 +323,7 @@ async fn backup_image<P: AsRef<Path>>(
let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
let stats = client
.upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config)
.upload_stream(previous_manifest, archive_name, stream, "fixed", Some(image_size))
.await?;
Ok(stats)
@ -708,8 +706,7 @@ async fn start_garbage_collection(param: Value) -> Result<Value, Error> {
}
fn spawn_catalog_upload(
client: Arc<BackupWriter>,
crypt_config: Option<Arc<CryptConfig>>,
client: Arc<BackupWriter>
) -> Result<
(
Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
@ -727,7 +724,7 @@ fn spawn_catalog_upload(
tokio::spawn(async move {
let catalog_upload_result = client
.upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config)
.upload_stream(None, CATALOG_NAME, catalog_chunk_stream, "dynamic", None)
.await;
if let Err(ref err) = catalog_upload_result {
@ -958,6 +955,7 @@ async fn create_backup(
let client = BackupWriter::start(
client,
crypt_config.clone(),
repo.store(),
backup_type,
&backup_id,
@ -965,6 +963,12 @@ async fn create_backup(
verbose,
).await?;
let previous_manifest = if let Ok(previous_manifest) = client.download_previous_manifest().await {
Some(Arc::new(previous_manifest))
} else {
None
};
let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp());
let mut manifest = BackupManifest::new(snapshot);
@ -976,21 +980,21 @@ async fn create_backup(
BackupSpecificationType::CONFIG => {
println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target);
let stats = client
.upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
.upload_blob_from_file(&filename, &target, true, Some(true))
.await?;
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
}
BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ?
println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target);
let stats = client
.upload_blob_from_file(&filename, &target, crypt_config.clone(), true)
.upload_blob_from_file(&filename, &target, true, Some(true))
.await?;
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
}
BackupSpecificationType::PXAR => {
// start catalog upload on first use
if catalog.is_none() {
let (cat, res) = spawn_catalog_upload(client.clone(), crypt_config.clone())?;
let (cat, res) = spawn_catalog_upload(client.clone())?;
catalog = Some(cat);
catalog_result_tx = Some(res);
}
@ -1000,13 +1004,13 @@ async fn create_backup(
catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
let stats = backup_directory(
&client,
previous_manifest.clone(),
&filename,
&target,
chunk_size_opt,
devices.clone(),
verbose,
skip_lost_and_found,
crypt_config.clone(),
catalog.clone(),
pattern_list.clone(),
entries_max as usize,
@ -1018,12 +1022,12 @@ async fn create_backup(
println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
let stats = backup_image(
&client,
previous_manifest.clone(),
&filename,
&target,
size,
chunk_size_opt,
verbose,
crypt_config.clone(),
).await?;
manifest.add_file(target, stats.size, stats.csum, is_encrypted)?;
}
@ -1050,7 +1054,7 @@ async fn create_backup(
let target = "rsa-encrypted.key";
println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
let stats = client
.upload_blob_from_data(rsa_encrypted_key, target, None, false, false)
.upload_blob_from_data(rsa_encrypted_key, target, false, None)
.await?;
manifest.add_file(format!("{}.blob", target), stats.size, stats.csum, is_encrypted)?;
@ -1070,7 +1074,7 @@ async fn create_backup(
println!("Upload index.json to '{:?}'", repo);
let manifest = serde_json::to_string_pretty(&manifest)?.into();
client
.upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true)
.upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, true, Some(true))
.await?;
client.finish().await?;
@ -1115,7 +1119,7 @@ async fn dump_image<W: Write>(
let most_used = index.find_most_used_chunks(8);
let mut chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
// Note: we avoid using BufferedFixedReader, because that add an additional buffer/copy
// and thus slows down reading. Instead, directly use RemoteChunkReader
@ -1609,7 +1613,7 @@ async fn complete_backup_group_do(param: &HashMap<String, String>) -> Vec<String
result
}
fn complete_group_or_snapshot(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
pub fn complete_group_or_snapshot(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
proxmox_backup::tools::runtime::main(async { complete_group_or_snapshot_do(arg, param).await })
}
@ -1710,7 +1714,7 @@ fn complete_archive_name(arg: &str, param: &HashMap<String, String>) -> Vec<Stri
.collect()
}
fn complete_pxar_archive_name(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
pub fn complete_pxar_archive_name(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
complete_server_file_name(arg, param)
.iter()
.filter_map(|v| {
@ -1979,36 +1983,6 @@ fn key_mgmt_cli() -> CliCommandMap {
.insert("change-passphrase", key_change_passphrase_cmd_def)
}
fn mount(
param: Value,
_info: &ApiMethod,
_rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
let verbose = param["verbose"].as_bool().unwrap_or(false);
if verbose {
// This will stay in foreground with debug output enabled as None is
// passed for the RawFd.
return proxmox_backup::tools::runtime::main(mount_do(param, None));
}
// Process should be deamonized.
// Make sure to fork before the async runtime is instantiated to avoid troubles.
let pipe = pipe()?;
match fork() {
Ok(ForkResult::Parent { .. }) => {
nix::unistd::close(pipe.1).unwrap();
// Blocks the parent process until we are ready to go in the child
let _res = nix::unistd::read(pipe.0, &mut [0]).unwrap();
Ok(Value::Null)
}
Ok(ForkResult::Child) => {
nix::unistd::close(pipe.0).unwrap();
nix::unistd::setsid().unwrap();
proxmox_backup::tools::runtime::main(mount_do(param, Some(pipe.1)))
}
Err(_) => bail!("failed to daemonize process"),
}
}
use proxmox_backup::client::RemoteChunkReader;
/// This is a workaround until we have cleaned up the chunk/reader/... infrastructure for better
@ -2017,7 +1991,7 @@ use proxmox_backup::client::RemoteChunkReader;
/// Ideally BufferedDynamicReader gets replaced so the LruCache maps to `BroadcastFuture<Chunk>`,
/// so that we can properly access it from multiple threads simultaneously while not issuing
/// duplicate simultaneous reads over http.
struct BufferedDynamicReadAt {
pub struct BufferedDynamicReadAt {
inner: Mutex<BufferedDynamicReader<RemoteChunkReader>>,
}
@ -2029,118 +2003,29 @@ impl BufferedDynamicReadAt {
}
}
impl pxar::accessor::ReadAt for BufferedDynamicReadAt {
fn poll_read_at(
self: Pin<&Self>,
impl ReadAt for BufferedDynamicReadAt {
fn start_read_at<'a>(
self: Pin<&'a Self>,
_cx: &mut Context,
buf: &mut [u8],
buf: &'a mut [u8],
offset: u64,
) -> Poll<io::Result<usize>> {
) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
use std::io::Read;
tokio::task::block_in_place(move || {
MaybeReady::Ready(tokio::task::block_in_place(move || {
let mut reader = self.inner.lock().unwrap();
reader.seek(SeekFrom::Start(offset))?;
Poll::Ready(Ok(reader.read(buf)?))
})
Ok(reader.read(buf)?)
}))
}
fn poll_complete<'a>(
self: Pin<&'a Self>,
_op: ReadAtOperation<'a>,
) -> MaybeReady<io::Result<usize>, ReadAtOperation<'a>> {
panic!("LocalDynamicReadAt::start_read_at returned Pending");
}
}
async fn mount_do(param: Value, pipe: Option<RawFd>) -> Result<Value, Error> {
let repo = extract_repository_from_value(&param)?;
let archive_name = tools::required_string_param(&param, "archive-name")?;
let target = tools::required_string_param(&param, "target")?;
let client = connect(repo.host(), repo.user())?;
record_repository(&repo);
let path = tools::required_string_param(&param, "snapshot")?;
let (backup_type, backup_id, backup_time) = if path.matches('/').count() == 1 {
let group: BackupGroup = path.parse()?;
api_datastore_latest_snapshot(&client, repo.store(), group).await?
} else {
let snapshot: BackupDir = path.parse()?;
(snapshot.group().backup_type().to_owned(), snapshot.group().backup_id().to_owned(), snapshot.backup_time())
};
let keyfile = param["keyfile"].as_str().map(PathBuf::from);
let crypt_config = match keyfile {
None => None,
Some(path) => {
let (key, _) = load_and_decrypt_key(&path, &get_encryption_key_password)?;
Some(Arc::new(CryptConfig::new(key)?))
}
};
let server_archive_name = if archive_name.ends_with(".pxar") {
format!("{}.didx", archive_name)
} else {
bail!("Can only mount pxar archives.");
};
let client = BackupReader::start(
client,
crypt_config.clone(),
repo.store(),
&backup_type,
&backup_id,
backup_time,
true,
).await?;
let manifest = client.download_manifest().await?;
if server_archive_name.ends_with(".didx") {
let index = client.download_dynamic_index(&manifest, &server_archive_name).await?;
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
let reader = BufferedDynamicReader::new(index, chunk_reader);
let archive_size = reader.archive_size();
let reader: proxmox_backup::pxar::fuse::Reader =
Arc::new(BufferedDynamicReadAt::new(reader));
let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
let options = OsStr::new("ro,default_permissions");
let session = proxmox_backup::pxar::fuse::Session::mount(
decoder,
&options,
false,
Path::new(target),
)
.map_err(|err| format_err!("pxar mount failed: {}", err))?;
if let Some(pipe) = pipe {
nix::unistd::chdir(Path::new("/")).unwrap();
// Finish creation of daemon by redirecting filedescriptors.
let nullfd = nix::fcntl::open(
"/dev/null",
nix::fcntl::OFlag::O_RDWR,
nix::sys::stat::Mode::empty(),
).unwrap();
nix::unistd::dup2(nullfd, 0).unwrap();
nix::unistd::dup2(nullfd, 1).unwrap();
nix::unistd::dup2(nullfd, 2).unwrap();
if nullfd > 2 {
nix::unistd::close(nullfd).unwrap();
}
// Signal the parent process that we are done with the setup and it can
// terminate.
nix::unistd::write(pipe, &[0u8])?;
nix::unistd::close(pipe).unwrap();
}
let mut interrupt = signal(SignalKind::interrupt())?;
select! {
res = session.fuse() => res?,
_ = interrupt.recv().fuse() => {
// exit on interrupted
}
}
} else {
bail!("unknown archive file extension (expected .pxar)");
}
Ok(Value::Null)
}
#[api(
input: {
@ -2275,138 +2160,6 @@ fn catalog_mgmt_cli() -> CliCommandMap {
.insert("shell", catalog_shell_cmd_def)
}
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
limit: {
description: "The maximal number of tasks to list.",
type: Integer,
optional: true,
minimum: 1,
maximum: 1000,
default: 50,
},
"output-format": {
schema: OUTPUT_FORMAT,
optional: true,
},
all: {
type: Boolean,
description: "Also list stopped tasks.",
optional: true,
},
}
}
)]
/// List running server tasks for this repo user
async fn task_list(param: Value) -> Result<Value, Error> {
let output_format = get_output_format(&param);
let repo = extract_repository_from_value(&param)?;
let client = connect(repo.host(), repo.user())?;
let limit = param["limit"].as_u64().unwrap_or(50) as usize;
let running = !param["all"].as_bool().unwrap_or(false);
let args = json!({
"running": running,
"start": 0,
"limit": limit,
"userfilter": repo.user(),
"store": repo.store(),
});
let mut result = client.get("api2/json/nodes/localhost/tasks", Some(args)).await?;
let mut data = result["data"].take();
let schema = &proxmox_backup::api2::node::tasks::API_RETURN_SCHEMA_LIST_TASKS;
let options = default_table_format_options()
.column(ColumnConfig::new("starttime").right_align(false).renderer(tools::format::render_epoch))
.column(ColumnConfig::new("endtime").right_align(false).renderer(tools::format::render_epoch))
.column(ColumnConfig::new("upid"))
.column(ColumnConfig::new("status").renderer(tools::format::render_task_status));
format_and_print_result_full(&mut data, schema, &output_format, &options);
Ok(Value::Null)
}
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
upid: {
schema: UPID_SCHEMA,
},
}
}
)]
/// Display the task log.
async fn task_log(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(&param)?;
let upid = tools::required_string_param(&param, "upid")?;
let client = connect(repo.host(), repo.user())?;
display_task_log(client, upid, true).await?;
Ok(Value::Null)
}
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
upid: {
schema: UPID_SCHEMA,
},
}
}
)]
/// Try to stop a specific task.
async fn task_stop(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(&param)?;
let upid_str = tools::required_string_param(&param, "upid")?;
let mut client = connect(repo.host(), repo.user())?;
let path = format!("api2/json/nodes/localhost/tasks/{}", upid_str);
let _ = client.delete(&path, None).await?;
Ok(Value::Null)
}
fn task_mgmt_cli() -> CliCommandMap {
let task_list_cmd_def = CliCommand::new(&API_METHOD_TASK_LIST)
.completion_cb("repository", complete_repository);
let task_log_cmd_def = CliCommand::new(&API_METHOD_TASK_LOG)
.arg_param(&["upid"]);
let task_stop_cmd_def = CliCommand::new(&API_METHOD_TASK_STOP)
.arg_param(&["upid"]);
CliCommandMap::new()
.insert("log", task_log_cmd_def)
.insert("list", task_list_cmd_def)
.insert("stop", task_stop_cmd_def)
}
fn main() {
let backup_cmd_def = CliCommand::new(&API_METHOD_CREATE_BACKUP)
@ -2416,6 +2169,10 @@ fn main() {
.completion_cb("keyfile", tools::complete_file_name)
.completion_cb("chunk-size", complete_chunk_size);
let benchmark_cmd_def = CliCommand::new(&API_METHOD_BENCHMARK)
.completion_cb("repository", complete_repository)
.completion_cb("keyfile", tools::complete_file_name);
let upload_log_cmd_def = CliCommand::new(&API_METHOD_UPLOAD_LOG)
.arg_param(&["snapshot", "logfile"])
.completion_cb("snapshot", complete_backup_snapshot)
@ -2465,30 +2222,6 @@ fn main() {
let logout_cmd_def = CliCommand::new(&API_METHOD_API_LOGOUT)
.completion_cb("repository", complete_repository);
#[sortable]
const API_METHOD_MOUNT: ApiMethod = ApiMethod::new(
&ApiHandler::Sync(&mount),
&ObjectSchema::new(
"Mount pxar archive.",
&sorted!([
("snapshot", false, &StringSchema::new("Group/Snapshot path.").schema()),
("archive-name", false, &StringSchema::new("Backup archive name.").schema()),
("target", false, &StringSchema::new("Target directory path.").schema()),
("repository", true, &REPO_URL_SCHEMA),
("keyfile", true, &StringSchema::new("Path to encryption key.").schema()),
("verbose", true, &BooleanSchema::new("Verbose output.").default(false).schema()),
]),
)
);
let mount_cmd_def = CliCommand::new(&API_METHOD_MOUNT)
.arg_param(&["snapshot", "archive-name", "target"])
.completion_cb("repository", complete_repository)
.completion_cb("snapshot", complete_group_or_snapshot)
.completion_cb("archive-name", complete_pxar_archive_name)
.completion_cb("target", tools::complete_file_name);
let cmd_def = CliCommandMap::new()
.insert("backup", backup_cmd_def)
.insert("upload-log", upload_log_cmd_def)
@ -2503,9 +2236,10 @@ fn main() {
.insert("files", files_cmd_def)
.insert("status", status_cmd_def)
.insert("key", key_mgmt_cli())
.insert("mount", mount_cmd_def)
.insert("mount", mount_cmd_def())
.insert("catalog", catalog_mgmt_cli())
.insert("task", task_mgmt_cli());
.insert("task", task_mgmt_cli())
.insert("benchmark", benchmark_cmd_def);
let rpcenv = CliEnvironment::new();
run_cli_command(cmd_def, rpcenv, Some(|future| {

View File

@ -319,6 +319,40 @@ async fn pull_datastore(
Ok(Value::Null)
}
#[api(
input: {
properties: {
"store": {
schema: DATASTORE_SCHEMA,
},
"output-format": {
schema: OUTPUT_FORMAT,
optional: true,
},
}
}
)]
/// Verify backups
async fn verify(
store: String,
param: Value,
) -> Result<Value, Error> {
let output_format = get_output_format(&param);
let mut client = connect()?;
let args = json!({});
let path = format!("api2/json/admin/datastore/{}/verify", store);
let result = client.post(&path, Some(args)).await?;
view_task_result(client, result, &output_format).await?;
Ok(Value::Null)
}
fn main() {
proxmox_backup::tools::setup_safe_path_env();
@ -342,8 +376,16 @@ fn main() {
.completion_cb("local-store", config::datastore::complete_datastore_name)
.completion_cb("remote", config::remote::complete_remote_name)
.completion_cb("remote-store", complete_remote_datastore_name)
)
.insert(
"verify",
CliCommand::new(&API_METHOD_VERIFY)
.arg_param(&["store"])
.completion_cb("store", config::datastore::complete_datastore_name)
);
let mut rpcenv = CliEnvironment::new();
rpcenv.set_user(Some(String::from("root@pam")));

View File

@ -0,0 +1,82 @@
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Error};
use serde_json::Value;
use chrono::{TimeZone, Utc};
use proxmox::api::{ApiMethod, RpcEnvironment};
use proxmox::api::api;
use proxmox_backup::backup::{
load_and_decrypt_key,
CryptConfig,
};
use proxmox_backup::client::*;
use crate::{
KEYFILE_SCHEMA, REPO_URL_SCHEMA,
extract_repository_from_value,
get_encryption_key_password,
record_repository,
connect,
};
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
keyfile: {
schema: KEYFILE_SCHEMA,
optional: true,
},
}
}
)]
/// Run benchmark tests
pub async fn benchmark(
param: Value,
_info: &ApiMethod,
_rpcenv: &mut dyn RpcEnvironment,
) -> Result<(), Error> {
let repo = extract_repository_from_value(&param)?;
let keyfile = param["keyfile"].as_str().map(PathBuf::from);
let crypt_config = match keyfile {
None => None,
Some(path) => {
let (key, _) = load_and_decrypt_key(&path, &get_encryption_key_password)?;
let crypt_config = CryptConfig::new(key)?;
Some(Arc::new(crypt_config))
}
};
let backup_time = Utc.timestamp(Utc::now().timestamp(), 0);
let client = connect(repo.host(), repo.user())?;
record_repository(&repo);
let client = BackupWriter::start(
client,
crypt_config.clone(),
repo.store(),
"host",
"benshmark",
backup_time,
false,
).await?;
println!("Start upload speed test");
let speed = client.upload_speedtest().await?;
println!("Upload speed: {} MiB/s", speed);
Ok(())
}

View File

@ -0,0 +1,6 @@
mod benchmark;
pub use benchmark::*;
mod mount;
pub use mount::*;
mod task;
pub use task::*;

View File

@ -0,0 +1,196 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::os::unix::io::RawFd;
use std::path::Path;
use std::ffi::OsStr;
use anyhow::{bail, format_err, Error};
use serde_json::Value;
use tokio::signal::unix::{signal, SignalKind};
use nix::unistd::{fork, ForkResult, pipe};
use futures::select;
use futures::future::FutureExt;
use proxmox::{sortable, identity};
use proxmox::api::{ApiHandler, ApiMethod, RpcEnvironment, schema::*, cli::*};
use proxmox_backup::tools;
use proxmox_backup::backup::{
load_and_decrypt_key,
CryptConfig,
IndexFile,
BackupDir,
BackupGroup,
BufferedDynamicReader,
};
use proxmox_backup::client::*;
use crate::{
REPO_URL_SCHEMA,
extract_repository_from_value,
get_encryption_key_password,
complete_pxar_archive_name,
complete_group_or_snapshot,
complete_repository,
record_repository,
connect,
api_datastore_latest_snapshot,
BufferedDynamicReadAt,
};
#[sortable]
const API_METHOD_MOUNT: ApiMethod = ApiMethod::new(
&ApiHandler::Sync(&mount),
&ObjectSchema::new(
"Mount pxar archive.",
&sorted!([
("snapshot", false, &StringSchema::new("Group/Snapshot path.").schema()),
("archive-name", false, &StringSchema::new("Backup archive name.").schema()),
("target", false, &StringSchema::new("Target directory path.").schema()),
("repository", true, &REPO_URL_SCHEMA),
("keyfile", true, &StringSchema::new("Path to encryption key.").schema()),
("verbose", true, &BooleanSchema::new("Verbose output.").default(false).schema()),
]),
)
);
pub fn mount_cmd_def() -> CliCommand {
CliCommand::new(&API_METHOD_MOUNT)
.arg_param(&["snapshot", "archive-name", "target"])
.completion_cb("repository", complete_repository)
.completion_cb("snapshot", complete_group_or_snapshot)
.completion_cb("archive-name", complete_pxar_archive_name)
.completion_cb("target", tools::complete_file_name)
}
fn mount(
param: Value,
_info: &ApiMethod,
_rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
let verbose = param["verbose"].as_bool().unwrap_or(false);
if verbose {
// This will stay in foreground with debug output enabled as None is
// passed for the RawFd.
return proxmox_backup::tools::runtime::main(mount_do(param, None));
}
// Process should be deamonized.
// Make sure to fork before the async runtime is instantiated to avoid troubles.
let pipe = pipe()?;
match fork() {
Ok(ForkResult::Parent { .. }) => {
nix::unistd::close(pipe.1).unwrap();
// Blocks the parent process until we are ready to go in the child
let _res = nix::unistd::read(pipe.0, &mut [0]).unwrap();
Ok(Value::Null)
}
Ok(ForkResult::Child) => {
nix::unistd::close(pipe.0).unwrap();
nix::unistd::setsid().unwrap();
proxmox_backup::tools::runtime::main(mount_do(param, Some(pipe.1)))
}
Err(_) => bail!("failed to daemonize process"),
}
}
async fn mount_do(param: Value, pipe: Option<RawFd>) -> Result<Value, Error> {
let repo = extract_repository_from_value(&param)?;
let archive_name = tools::required_string_param(&param, "archive-name")?;
let target = tools::required_string_param(&param, "target")?;
let client = connect(repo.host(), repo.user())?;
record_repository(&repo);
let path = tools::required_string_param(&param, "snapshot")?;
let (backup_type, backup_id, backup_time) = if path.matches('/').count() == 1 {
let group: BackupGroup = path.parse()?;
api_datastore_latest_snapshot(&client, repo.store(), group).await?
} else {
let snapshot: BackupDir = path.parse()?;
(snapshot.group().backup_type().to_owned(), snapshot.group().backup_id().to_owned(), snapshot.backup_time())
};
let keyfile = param["keyfile"].as_str().map(PathBuf::from);
let crypt_config = match keyfile {
None => None,
Some(path) => {
let (key, _) = load_and_decrypt_key(&path, &get_encryption_key_password)?;
Some(Arc::new(CryptConfig::new(key)?))
}
};
let server_archive_name = if archive_name.ends_with(".pxar") {
format!("{}.didx", archive_name)
} else {
bail!("Can only mount pxar archives.");
};
let client = BackupReader::start(
client,
crypt_config.clone(),
repo.store(),
&backup_type,
&backup_id,
backup_time,
true,
).await?;
let manifest = client.download_manifest().await?;
if server_archive_name.ends_with(".didx") {
let index = client.download_dynamic_index(&manifest, &server_archive_name).await?;
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used);
let reader = BufferedDynamicReader::new(index, chunk_reader);
let archive_size = reader.archive_size();
let reader: proxmox_backup::pxar::fuse::Reader =
Arc::new(BufferedDynamicReadAt::new(reader));
let decoder = proxmox_backup::pxar::fuse::Accessor::new(reader, archive_size).await?;
let options = OsStr::new("ro,default_permissions");
let session = proxmox_backup::pxar::fuse::Session::mount(
decoder,
&options,
false,
Path::new(target),
)
.map_err(|err| format_err!("pxar mount failed: {}", err))?;
if let Some(pipe) = pipe {
nix::unistd::chdir(Path::new("/")).unwrap();
// Finish creation of daemon by redirecting filedescriptors.
let nullfd = nix::fcntl::open(
"/dev/null",
nix::fcntl::OFlag::O_RDWR,
nix::sys::stat::Mode::empty(),
).unwrap();
nix::unistd::dup2(nullfd, 0).unwrap();
nix::unistd::dup2(nullfd, 1).unwrap();
nix::unistd::dup2(nullfd, 2).unwrap();
if nullfd > 2 {
nix::unistd::close(nullfd).unwrap();
}
// Signal the parent process that we are done with the setup and it can
// terminate.
nix::unistd::write(pipe, &[0u8])?;
nix::unistd::close(pipe).unwrap();
}
let mut interrupt = signal(SignalKind::interrupt())?;
select! {
res = session.fuse() => res?,
_ = interrupt.recv().fuse() => {
// exit on interrupted
}
}
} else {
bail!("unknown archive file extension (expected .pxar)");
}
Ok(Value::Null)
}

View File

@ -0,0 +1,148 @@
use anyhow::{Error};
use serde_json::{json, Value};
use proxmox::api::{api, cli::*};
use proxmox_backup::tools;
use proxmox_backup::client::*;
use proxmox_backup::api2::types::UPID_SCHEMA;
use crate::{
REPO_URL_SCHEMA,
extract_repository_from_value,
complete_repository,
connect,
};
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
limit: {
description: "The maximal number of tasks to list.",
type: Integer,
optional: true,
minimum: 1,
maximum: 1000,
default: 50,
},
"output-format": {
schema: OUTPUT_FORMAT,
optional: true,
},
all: {
type: Boolean,
description: "Also list stopped tasks.",
optional: true,
},
}
}
)]
/// List running server tasks for this repo user
async fn task_list(param: Value) -> Result<Value, Error> {
let output_format = get_output_format(&param);
let repo = extract_repository_from_value(&param)?;
let client = connect(repo.host(), repo.user())?;
let limit = param["limit"].as_u64().unwrap_or(50) as usize;
let running = !param["all"].as_bool().unwrap_or(false);
let args = json!({
"running": running,
"start": 0,
"limit": limit,
"userfilter": repo.user(),
"store": repo.store(),
});
let mut result = client.get("api2/json/nodes/localhost/tasks", Some(args)).await?;
let mut data = result["data"].take();
let schema = &proxmox_backup::api2::node::tasks::API_RETURN_SCHEMA_LIST_TASKS;
let options = default_table_format_options()
.column(ColumnConfig::new("starttime").right_align(false).renderer(tools::format::render_epoch))
.column(ColumnConfig::new("endtime").right_align(false).renderer(tools::format::render_epoch))
.column(ColumnConfig::new("upid"))
.column(ColumnConfig::new("status").renderer(tools::format::render_task_status));
format_and_print_result_full(&mut data, schema, &output_format, &options);
Ok(Value::Null)
}
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
upid: {
schema: UPID_SCHEMA,
},
}
}
)]
/// Display the task log.
async fn task_log(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(&param)?;
let upid = tools::required_string_param(&param, "upid")?;
let client = connect(repo.host(), repo.user())?;
display_task_log(client, upid, true).await?;
Ok(Value::Null)
}
#[api(
input: {
properties: {
repository: {
schema: REPO_URL_SCHEMA,
optional: true,
},
upid: {
schema: UPID_SCHEMA,
},
}
}
)]
/// Try to stop a specific task.
async fn task_stop(param: Value) -> Result<Value, Error> {
let repo = extract_repository_from_value(&param)?;
let upid_str = tools::required_string_param(&param, "upid")?;
let mut client = connect(repo.host(), repo.user())?;
let path = format!("api2/json/nodes/localhost/tasks/{}", upid_str);
let _ = client.delete(&path, None).await?;
Ok(Value::Null)
}
pub fn task_mgmt_cli() -> CliCommandMap {
let task_list_cmd_def = CliCommand::new(&API_METHOD_TASK_LIST)
.completion_cb("repository", complete_repository);
let task_log_cmd_def = CliCommand::new(&API_METHOD_TASK_LOG)
.arg_param(&["upid"]);
let task_stop_cmd_def = CliCommand::new(&API_METHOD_TASK_STOP)
.arg_param(&["upid"]);
CliCommandMap::new()
.insert("log", task_log_cmd_def)
.insert("list", task_list_cmd_def)
.insert("stop", task_stop_cmd_def)
}

View File

@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::os::unix::fs::OpenOptionsExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
@ -22,6 +23,7 @@ pub struct BackupWriter {
h2: H2Client,
abort: AbortHandle,
verbose: bool,
crypt_config: Option<Arc<CryptConfig>>,
}
impl Drop for BackupWriter {
@ -38,12 +40,13 @@ pub struct BackupStats {
impl BackupWriter {
fn new(h2: H2Client, abort: AbortHandle, verbose: bool) -> Arc<Self> {
Arc::new(Self { h2, abort, verbose })
fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option<Arc<CryptConfig>>, verbose: bool) -> Arc<Self> {
Arc::new(Self { h2, abort, crypt_config, verbose })
}
pub async fn start(
client: HttpClient,
crypt_config: Option<Arc<CryptConfig>>,
datastore: &str,
backup_type: &str,
backup_id: &str,
@ -64,7 +67,7 @@ impl BackupWriter {
let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?;
Ok(BackupWriter::new(h2, abort, debug))
Ok(BackupWriter::new(h2, abort, crypt_config, debug))
}
pub async fn get(
@ -159,16 +162,19 @@ impl BackupWriter {
&self,
data: Vec<u8>,
file_name: &str,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
sign_only: bool,
crypt_or_sign: Option<bool>,
) -> Result<BackupStats, Error> {
let blob = if let Some(ref crypt_config) = crypt_config {
if sign_only {
DataBlob::create_signed(&data, crypt_config, compress)?
} else {
let blob = if let Some(ref crypt_config) = self.crypt_config {
if let Some(encrypt) = crypt_or_sign {
if encrypt {
DataBlob::encode(&data, Some(crypt_config), compress)?
} else {
DataBlob::create_signed(&data, crypt_config, compress)?
}
} else {
DataBlob::encode(&data, None, compress)?
}
} else {
DataBlob::encode(&data, None, compress)?
@ -187,8 +193,8 @@ impl BackupWriter {
&self,
src_path: P,
file_name: &str,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
crypt_or_sign: Option<bool>,
) -> Result<BackupStats, Error> {
let src_path = src_path.as_ref();
@ -203,25 +209,16 @@ impl BackupWriter {
.await
.map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?;
let raw_data = blob.into_inner();
let size = raw_data.len() as u64;
let csum = openssl::sha::sha256(&raw_data);
let param = json!({
"encoded-size": size,
"file-name": file_name,
});
self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?;
Ok(BackupStats { size, csum })
self.upload_blob_from_data(contents, file_name, compress, crypt_or_sign).await
}
pub async fn upload_stream(
&self,
previous_manifest: Option<Arc<BackupManifest>>,
archive_name: &str,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
prefix: &str,
fixed_size: Option<u64>,
crypt_config: Option<Arc<CryptConfig>>,
) -> Result<BackupStats, Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
@ -233,7 +230,18 @@ impl BackupWriter {
let index_path = format!("{}_index", prefix);
let close_path = format!("{}_close", prefix);
self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?;
if let Some(manifest) = previous_manifest {
// try, but ignore errors
match archive_type(archive_name) {
Ok(ArchiveType::FixedIndex) => {
let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await;
}
Ok(ArchiveType::DynamicIndex) => {
let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await;
}
_ => { /* do nothing */ }
}
}
let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
@ -244,7 +252,7 @@ impl BackupWriter {
stream,
&prefix,
known_chunks.clone(),
crypt_config,
self.crypt_config.clone(),
self.verbose,
)
.await?;
@ -374,41 +382,93 @@ impl BackupWriter {
(verify_queue_tx, verify_result_rx)
}
pub async fn download_chunk_list(
pub async fn download_previous_fixed_index(
&self,
path: &str,
archive_name: &str,
manifest: &BackupManifest,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<(), Error> {
) -> Result<FixedIndexReader, Error> {
let mut tmpfile = std::fs::OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_TMPFILE)
.open("/tmp")?;
let param = json!({ "archive-name": archive_name });
let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap();
self.h2.download("previous", Some(param), &mut tmpfile).await?;
let h2request = self.h2.send_request(request, None).await?;
let resp = h2request.await?;
let index = FixedIndexReader::new(tmpfile)
.map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?;
// Note: do not use values stored in index (not trusted) - instead, computed them again
let (csum, size) = index.compute_csum();
manifest.verify_file(archive_name, &csum, size)?;
let status = resp.status();
if !status.is_success() {
H2Client::h2api_response(resp).await?; // raise error
unreachable!();
}
let mut body = resp.into_body();
let mut flow_control = body.flow_control().clone();
let mut stream = DigestListDecoder::new(body.map_err(Error::from));
while let Some(chunk) = stream.try_next().await? {
let _ = flow_control.release_capacity(chunk.len());
known_chunks.lock().unwrap().insert(chunk);
// add index chunks to known chunks
let mut known_chunks = known_chunks.lock().unwrap();
for i in 0..index.index_count() {
known_chunks.insert(*index.index_digest(i).unwrap());
}
if self.verbose {
println!("{}: known chunks list length is {}", archive_name, known_chunks.lock().unwrap().len());
println!("{}: known chunks list length is {}", archive_name, index.index_count());
}
Ok(())
Ok(index)
}
pub async fn download_previous_dynamic_index(
&self,
archive_name: &str,
manifest: &BackupManifest,
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
) -> Result<DynamicIndexReader, Error> {
let mut tmpfile = std::fs::OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_TMPFILE)
.open("/tmp")?;
let param = json!({ "archive-name": archive_name });
self.h2.download("previous", Some(param), &mut tmpfile).await?;
let index = DynamicIndexReader::new(tmpfile)
.map_err(|err| format_err!("unable to read dynmamic index '{}' - {}", archive_name, err))?;
// Note: do not use values stored in index (not trusted) - instead, computed them again
let (csum, size) = index.compute_csum();
manifest.verify_file(archive_name, &csum, size)?;
// add index chunks to known chunks
let mut known_chunks = known_chunks.lock().unwrap();
for i in 0..index.index_count() {
known_chunks.insert(*index.index_digest(i).unwrap());
}
if self.verbose {
println!("{}: known chunks list length is {}", archive_name, index.index_count());
}
Ok(index)
}
/// Download backup manifest (index.json) of last backup
pub async fn download_previous_manifest(&self) -> Result<BackupManifest, Error> {
use std::convert::TryFrom;
let mut raw_data = Vec::with_capacity(64 * 1024);
let param = json!({ "archive-name": MANIFEST_BLOB_NAME });
self.h2.download("previous", Some(param), &mut raw_data).await?;
let blob = DataBlob::from_raw(raw_data)?;
blob.verify_crc()?;
let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?;
let json: Value = serde_json::from_slice(&data[..])?;
let manifest = BackupManifest::try_from(json)?;
Ok(manifest)
}
fn upload_chunk_info_stream(

View File

@ -1,7 +1,7 @@
use std::future::Future;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use anyhow::Error;
@ -14,7 +14,7 @@ pub struct RemoteChunkReader {
client: Arc<BackupReader>,
crypt_config: Option<Arc<CryptConfig>>,
cache_hint: HashMap<[u8; 32], usize>,
cache: HashMap<[u8; 32], Vec<u8>>,
cache: Mutex<HashMap<[u8; 32], Vec<u8>>>,
}
impl RemoteChunkReader {
@ -30,11 +30,11 @@ impl RemoteChunkReader {
client,
crypt_config,
cache_hint,
cache: HashMap::new(),
cache: Mutex::new(HashMap::new()),
}
}
pub async fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
let mut chunk_data = Vec::with_capacity(4 * 1024 * 1024);
self.client
@ -49,12 +49,12 @@ impl RemoteChunkReader {
}
impl ReadChunk for RemoteChunkReader {
fn read_raw_chunk(&mut self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
block_on(Self::read_raw_chunk(self, digest))
}
fn read_chunk(&mut self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
if let Some(raw_data) = self.cache.get(digest) {
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
return Ok(raw_data.to_vec());
}
@ -66,7 +66,7 @@ impl ReadChunk for RemoteChunkReader {
let use_cache = self.cache_hint.contains_key(digest);
if use_cache {
self.cache.insert(*digest, raw_data.to_vec());
(*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
}
Ok(raw_data)
@ -75,18 +75,18 @@ impl ReadChunk for RemoteChunkReader {
impl AsyncReadChunk for RemoteChunkReader {
fn read_raw_chunk<'a>(
&'a mut self,
&'a self,
digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
Box::pin(Self::read_raw_chunk(self, digest))
}
fn read_chunk<'a>(
&'a mut self,
&'a self,
digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
Box::pin(async move {
if let Some(raw_data) = self.cache.get(digest) {
if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
return Ok(raw_data.to_vec());
}
@ -98,7 +98,7 @@ impl AsyncReadChunk for RemoteChunkReader {
let use_cache = self.cache_hint.contains_key(digest);
if use_cache {
self.cache.insert(*digest, raw_data.to_vec());
(*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
}
Ok(raw_data)

View File

@ -2,7 +2,7 @@ use std::collections::{HashSet, HashMap};
use std::convert::TryFrom;
use std::ffi::{CStr, CString, OsStr};
use std::fmt;
use std::io::{self, Write};
use std::io::{self, Read, Write};
use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::path::{Path, PathBuf};
@ -20,6 +20,7 @@ use pxar::encoder::LinkOffset;
use proxmox::c_str;
use proxmox::sys::error::SysError;
use proxmox::tools::fd::RawFdNum;
use proxmox::tools::vec;
use crate::pxar::catalog::BackupCatalogWriter;
use crate::pxar::Flags;
@ -35,6 +36,7 @@ fn detect_fs_type(fd: RawFd) -> Result<i64, Error> {
Ok(fs_stat.f_type)
}
#[rustfmt::skip]
pub fn is_virtual_file_system(magic: i64) -> bool {
use proxmox::sys::linux::magic::*;
@ -114,6 +116,7 @@ struct Archiver<'a, 'b> {
device_set: Option<HashSet<u64>>,
hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
errors: ErrorReporter,
file_copy_buffer: Vec<u8>,
}
type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>;
@ -178,6 +181,7 @@ where
device_set,
hardlinks: HashMap::new(),
errors: ErrorReporter,
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
};
archiver.archive_dir_contents(&mut encoder, source_dir, true)?;
@ -244,11 +248,15 @@ impl<'a, 'b> Archiver<'a, 'b> {
}
/// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
///
/// The `existed` flag is set when iterating through a directory to note that we know the file
/// is supposed to exist and we should warn if it doesnt'.
fn open_file(
&mut self,
parent: RawFd,
file_name: &CStr,
oflags: OFlag,
existed: bool,
) -> Result<Option<Fd>, Error> {
match Fd::openat(
&unsafe { RawFdNum::from_raw_fd(parent) },
@ -257,9 +265,14 @@ impl<'a, 'b> Archiver<'a, 'b> {
Mode::empty(),
) {
Ok(fd) => Ok(Some(fd)),
Err(nix::Error::Sys(Errno::ENOENT)) => Ok(None),
Err(nix::Error::Sys(Errno::ENOENT)) => {
if existed {
self.report_vanished_file()?;
}
Ok(None)
}
Err(nix::Error::Sys(Errno::EACCES)) => {
write!(self.errors, "failed to open file: {:?}: access denied", file_name)?;
writeln!(self.errors, "failed to open file: {:?}: access denied", file_name)?;
Ok(None)
}
Err(other) => Err(Error::from(other)),
@ -271,6 +284,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
parent,
c_str!(".pxarexclude"),
OFlag::O_RDONLY | OFlag::O_CLOEXEC | OFlag::O_NOCTTY,
false,
)?;
let old_pattern_count = self.patterns.len();
@ -283,7 +297,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
let line = match line {
Ok(line) => line,
Err(err) => {
let _ = write!(
let _ = writeln!(
self.errors,
"ignoring .pxarexclude after read error in {:?}: {}",
self.path,
@ -303,7 +317,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
match MatchEntry::parse_pattern(line, PatternFlag::PATH_NAME, MatchType::Exclude) {
Ok(pattern) => self.patterns.push(pattern),
Err(err) => {
let _ = write!(self.errors, "bad pattern in {:?}: {}", self.path, err);
let _ = writeln!(self.errors, "bad pattern in {:?}: {}", self.path, err);
}
}
}
@ -406,7 +420,25 @@ impl<'a, 'b> Archiver<'a, 'b> {
}
fn report_vanished_file(&mut self) -> Result<(), Error> {
write!(self.errors, "warning: file vanished while reading: {:?}", self.path)?;
writeln!(self.errors, "warning: file vanished while reading: {:?}", self.path)?;
Ok(())
}
fn report_file_shrunk_while_reading(&mut self) -> Result<(), Error> {
writeln!(
self.errors,
"warning: file size shrunk while reading: {:?}, file will be padded with zeros!",
self.path,
)?;
Ok(())
}
fn report_file_grew_while_reading(&mut self) -> Result<(), Error> {
writeln!(
self.errors,
"warning: file size increased while reading: {:?}, file will be truncated!",
self.path,
)?;
Ok(())
}
@ -430,14 +462,12 @@ impl<'a, 'b> Archiver<'a, 'b> {
parent,
c_file_name,
open_mode | OFlag::O_RDONLY | OFlag::O_NOFOLLOW | OFlag::O_CLOEXEC | OFlag::O_NOCTTY,
true,
)?;
let fd = match fd {
Some(fd) => fd,
None => {
self.report_vanished_file()?;
return Ok(());
}
None => return Ok(()),
};
let metadata = get_metadata(fd.as_raw_fd(), &stat, self.flags(), self.fs_magic)?;
@ -591,8 +621,29 @@ impl<'a, 'b> Archiver<'a, 'b> {
file_size: u64,
) -> Result<LinkOffset, Error> {
let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
let offset = encoder.add_file(metadata, file_name, file_size, &mut file)?;
Ok(offset)
let mut remaining = file_size;
let mut out = encoder.create_file(metadata, file_name, file_size)?;
while remaining != 0 {
let mut got = file.read(&mut self.file_copy_buffer[..])?;
if got as u64 > remaining {
self.report_file_grew_while_reading()?;
got = remaining as usize;
}
out.write_all(&self.file_copy_buffer[..got])?;
remaining -= got as u64;
}
if remaining > 0 {
self.report_file_shrunk_while_reading()?;
let to_zero = remaining.min(self.file_copy_buffer.len() as u64) as usize;
vec::clear(&mut self.file_copy_buffer[..to_zero]);
while remaining != 0 {
let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
out.write_all(&self.file_copy_buffer[..fill])?;
remaining -= fill as u64;
}
}
Ok(out.file_offset())
}
fn add_symlink(

View File

@ -323,7 +323,7 @@ fn get_index(username: Option<String>, token: Option<String>, template: &Handleb
if let Some(query_str) = parts.uri.query() {
for (k, v) in form_urlencoded::parse(query_str.as_bytes()).into_owned() {
if k == "debug" && v == "1" || v == "true" {
if k == "debug" && v != "0" && v != "false" {
debug = true;
}
}

View File

@ -213,6 +213,8 @@ pub fn upid_read_status(upid: &UPID) -> Result<String, Error> {
Some(rest) => {
if rest == "OK" {
status = String::from(rest);
} else if rest.starts_with("WARNINGS: ") {
status = String::from(rest);
} else if rest.starts_with("ERROR: ") {
status = String::from(&rest[7..]);
}
@ -234,7 +236,7 @@ pub struct TaskListInfo {
pub upid_str: String,
/// Task `(endtime, status)` if already finished
///
/// The `status` ise iether `unknown`, `OK`, or `ERROR: ...`
/// The `status` is either `unknown`, `OK`, `WARN`, or `ERROR: ...`
pub state: Option<(i64, String)>, // endtime, status
}
@ -385,6 +387,7 @@ impl std::fmt::Display for WorkerTask {
struct WorkerTaskData {
logger: FileLogger,
progress: f64, // 0..1
warn_count: u64,
pub abort_listeners: Vec<oneshot::Sender<()>>,
}
@ -424,6 +427,7 @@ impl WorkerTask {
data: Mutex::new(WorkerTaskData {
logger,
progress: 0.0,
warn_count: 0,
abort_listeners: vec![],
}),
});
@ -507,8 +511,11 @@ impl WorkerTask {
/// Log task result, remove task from running list
pub fn log_result(&self, result: &Result<(), Error>) {
let warn_count = self.data.lock().unwrap().warn_count;
if let Err(err) = result {
self.log(&format!("TASK ERROR: {}", err));
} else if warn_count > 0 {
self.log(format!("TASK WARNINGS: {}", warn_count));
} else {
self.log("TASK OK");
}
@ -524,6 +531,13 @@ impl WorkerTask {
data.logger.log(msg);
}
/// Log a message as warning.
pub fn warn<S: AsRef<str>>(&self, msg: S) {
let mut data = self.data.lock().unwrap();
data.logger.log(format!("WARN: {}", msg.as_ref()));
data.warn_count += 1;
}
/// Set progress indicator
pub fn progress(&self, progress: f64) {
if progress >= 0.0 && progress <= 1.0 {

View File

@ -41,7 +41,7 @@ pub fn parse_u64(i: &str) -> IResult<&str, u64> {
map_res(recognize(digit1), str::parse)(i)
}
/// Parse complete input, generate vervose error message with line numbers
/// Parse complete input, generate verbose error message with line numbers
pub fn parse_complete<'a, F, O>(what: &str, i: &'a str, parser: F) -> Result<O, Error>
where F: Fn(&'a str) -> IResult<&'a str, O>,
{

View File

@ -64,7 +64,7 @@ Ext.define('PBS.DataStoreContent', {
'text',
'backup-time'
]);
Proxmox.Utils.monStoreErrors(view, view.store, true);
Proxmox.Utils.monStoreErrors(view, this.store);
this.reload(); // initial load
},
@ -79,6 +79,7 @@ Ext.define('PBS.DataStoreContent', {
let url = `/api2/json/admin/datastore/${view.datastore}/snapshots`;
this.store.setProxy({
type: 'proxmox',
timeout: 300*1000, // 5 minutes, we should make that api call faster
url: url
});
@ -122,10 +123,11 @@ Ext.define('PBS.DataStoreContent', {
return groups;
},
onLoad: function(store, records, success) {
onLoad: function(store, records, success, operation) {
let view = this.getView();
if (!success) {
Proxmox.Utils.setErrorMask(view, Proxmox.Utils.getResponseErrorMessage(operation.getError()));
return;
}
@ -176,6 +178,7 @@ Ext.define('PBS.DataStoreContent', {
expanded: true,
children: children
});
Proxmox.Utils.setErrorMask(view, false);
},
onPrune: function() {
@ -197,6 +200,73 @@ Ext.define('PBS.DataStoreContent', {
win.show();
},
onVerify: function() {
var view = this.getView();
if (!view.datastore) return;
let rec = view.selModel.getSelection()[0];
if (!(rec && rec.data)) return;
let data = rec.data;
let params;
if (data.leaf) {
params = {
"backup-type": data["backup-type"],
"backup-id": data["backup-id"],
"backup-time": (data['backup-time'].getTime()/1000).toFixed(0),
};
} else {
params = {
"backup-type": data.backup_type,
"backup-id": data.backup_id,
};
}
Proxmox.Utils.API2Request({
params: params,
url: `/admin/datastore/${view.datastore}/verify`,
method: 'POST',
failure: function(response) {
Ext.Msg.alert(gettext('Error'), response.htmlStatus);
},
success: function(response, options) {
Ext.create('Proxmox.window.TaskViewer', {
upid: response.result.data,
}).show();
},
});
},
onForget: function() {
var view = this.getView();
let rec = view.selModel.getSelection()[0];
if (!(rec && rec.data)) return;
let data = rec.data;
if (!data.leaf) return;
if (!view.datastore) return;
console.log(data);
Proxmox.Utils.API2Request({
params: {
"backup-type": data["backup-type"],
"backup-id": data["backup-id"],
"backup-time": (data['backup-time'].getTime()/1000).toFixed(0),
},
url: `/admin/datastore/${view.datastore}/snapshots`,
method: 'DELETE',
waitMsgTarget: view,
failure: function(response, opts) {
Ext.Msg.alert(gettext('Error'), response.htmlStatus);
},
callback: this.reload.bind(this),
});
},
openBackupFileDownloader: function() {
let me = this;
let view = me.getView();
@ -326,6 +396,14 @@ Ext.define('PBS.DataStoreContent', {
iconCls: 'fa fa-refresh',
handler: 'reload',
},
{
xtype: 'proxmoxButton',
text: gettext('Verify'),
disabled: true,
parentXType: 'pbsDataStoreContent',
enableFn: function(record) { return !!record.data; },
handler: 'onVerify',
},
{
xtype: 'proxmoxButton',
text: gettext('Prune'),
@ -334,6 +412,21 @@ Ext.define('PBS.DataStoreContent', {
enableFn: function(record) { return !record.data.leaf; },
handler: 'onPrune',
},
{
xtype: 'proxmoxButton',
text: gettext('Forget'),
disabled: true,
parentXType: 'pbsDataStoreContent',
handler: 'onForget',
confirmMsg: function(record) {
console.log(record);
let name = record.data.text;
return Ext.String.format(gettext('Are you sure you want to remove snapshot {0}'), `'${name}'`);
},
enableFn: function(record) {
return !!record.data.leaf;
},
},
{
xtype: 'proxmoxButton',
text: gettext('Download Files'),

View File

@ -40,7 +40,7 @@ Ext.define('PBS.DataStorePanel', {
initComponent: function() {
let me = this;
me.title = `${gettext("Data Store")}: ${me.datastore}`;
me.title = `${gettext("Datastore")}: ${me.datastore}`;
me.callParent();
},
});

View File

@ -72,10 +72,15 @@ Ext.define('PBS.MainView', {
let datastore = PBS.Utils.getDataStoreFromPath(path);
obj = contentpanel.add({
xtype: 'pbsDataStorePanel',
nodename: 'localhost',
datastore,
});
} else {
obj = contentpanel.add({ xtype: path, border: false });
obj = contentpanel.add({
xtype: path,
nodename: 'localhost',
border: false
});
}
var treelist = me.lookupReference('navtree');

View File

@ -26,6 +26,7 @@ JSSRC= \
dashboard/RunningTasks.js \
dashboard/TaskSummary.js \
Utils.js \
ZFSList.js \
DirectoryList.js \
LoginView.js \
VersionInfo.js \

View File

@ -69,12 +69,18 @@ Ext.define('PBS.store.NavigationStore', {
path: 'pbsDirectoryList',
leaf: true,
},
{
text: "ZFS",
iconCls: 'fa fa-th-large',
path: 'pbsZFSList',
leaf: true,
},
]
}
]
},
{
text: gettext('Data Store'),
text: gettext('Datastore'),
iconCls: 'fa fa-archive',
path: 'pbsDataStoreConfig',
expanded: true,

View File

@ -33,22 +33,18 @@ Ext.define('PBS.Utils', {
},
render_datastore_worker_id: function(id, what) {
const result = id.match(/^(\S+)_([^_\s]+)_([^_\s]+)$/);
if (result) {
let datastore = result[1], type = result[2], id = result[3];
return `Datastore ${datastore} - ${what} ${type}/${id}`;
}
return what;
},
render_datastore_time_worker_id: function(id, what) {
const res = id.match(/^(\S+)_([^_\s]+)_([^_\s]+)_([^_\s]+)$/);
const res = id.match(/^(\S+?)_(\S+?)_(\S+?)(_(.+))?$/);
if (res) {
let datastore = res[1], type = res[2], id = res[3];
let datetime = Ext.Date.parse(parseInt(res[4], 16), 'U');
if (res[4] !== undefined) {
let datetime = Ext.Date.parse(parseInt(res[5], 16), 'U');
let utctime = PBS.Utils.render_datetime_utc(datetime);
return `Datastore ${datastore} - ${what} ${type}/${id}/${utctime}`;
return `Datastore ${datastore} ${what} ${type}/${id}/${utctime}`;
} else {
return `Datastore ${datastore} ${what} ${type}/${id}`;
}
return what;
}
return `Datastore ${what} ${id}`;
},
constructor: function() {
@ -62,11 +58,14 @@ Ext.define('PBS.Utils', {
prune: (type, id) => {
return PBS.Utils.render_datastore_worker_id(id, gettext('Prune'));
},
verify: (type, id) => {
return PBS.Utils.render_datastore_worker_id(id, gettext('Verify'));
},
backup: (type, id) => {
return PBS.Utils.render_datastore_worker_id(id, gettext('Backup'));
},
reader: (type, id) => {
return PBS.Utils.render_datastore_time_worker_id(id, gettext('Read objects'));
return PBS.Utils.render_datastore_worker_id(id, gettext('Read objects'));
},
});
}

137
www/ZFSList.js Normal file
View File

@ -0,0 +1,137 @@
Ext.define('PBS.admin.ZFSList', {
extend: 'Ext.grid.Panel',
xtype: 'pbsZFSList',
stateful: true,
stateId: 'grid-node-zfs',
controller: {
xclass: 'Ext.app.ViewController',
openCreateWindow: function() {
let me = this;
Ext.create('PBS.window.CreateZFS', {
nodename: me.nodename,
listeners: {
destroy: function() { me.reload(); },
}
}).show();
},
openDetailWindow: function() {
let me = this;
let view = me.getView();
let selection = view.getSelection();
if (!selection || selection.length < 1) return;
let rec = selection[0];
let zpool = rec.get('name');
Ext.create('Proxmox.window.ZFSDetail', {
zpool,
nodename: view.nodename,
}).show();
},
reload: function() {
let me = this;
let view = me.getView();
let store = view.getStore();
store.load();
store.sort();
},
init: function(view) {
let me = this;
if (!view.nodename) {
throw "no nodename given";
}
let url = `/api2/json/nodes/${view.nodename}/disks/zfs`;
view.getStore().getProxy().setUrl(url)
Proxmox.Utils.monStoreErrors(view, view.getStore(), true);
me.reload();
},
},
columns: [
{
text: gettext('Name'),
dataIndex: 'name',
flex: 1
},
{
header: gettext('Size'),
renderer: Proxmox.Utils.format_size,
dataIndex: 'size'
},
{
header: gettext('Free'),
renderer: Proxmox.Utils.format_size,
dataIndex: 'free'
},
{
header: gettext('Allocated'),
renderer: Proxmox.Utils.format_size,
dataIndex: 'alloc'
},
{
header: gettext('Fragmentation'),
renderer: function(value) {
return value.toString() + '%';
},
dataIndex: 'frag'
},
{
header: gettext('Health'),
renderer: Proxmox.Utils.render_zfs_health,
dataIndex: 'health'
},
{
header: gettext('Deduplication'),
hidden: true,
renderer: function(value) {
return value.toFixed(2).toString() + 'x';
},
dataIndex: 'dedup'
}
],
rootVisible: false,
useArrows: true,
tbar: [
{
text: gettext('Reload'),
iconCls: 'fa fa-refresh',
handler: 'reload',
},
{
text: gettext('Create') + ': ZFS',
handler: 'openCreateWindow',
},
{
text: gettext('Detail'),
xtype: 'proxmoxButton',
disabled: true,
handler: function() {
}
}
],
listeners: {
itemdblclick: 'openDetailWindow',
},
store: {
fields: ['name', 'size', 'free', 'alloc', 'dedup', 'frag', 'health'],
proxy: {
type: 'proxmox',
},
sorters: 'name'
},
});

View File

@ -25,7 +25,7 @@ Ext.define('PBS.DataStoreConfig', {
extend: 'Ext.grid.GridPanel',
alias: 'widget.pbsDataStoreConfig',
title: gettext('Data Store Configuration'),
title: gettext('Datastore Configuration'),
controller: {
xclass: 'Ext.app.ViewController',
@ -58,6 +58,27 @@ Ext.define('PBS.DataStoreConfig', {
}).show();
},
onVerify: function() {
var view = this.getView();
let rec = view.selModel.getSelection()[0];
if (!(rec && rec.data)) return;
let data = rec.data;
Proxmox.Utils.API2Request({
url: `/admin/datastore/${data.name}/verify`,
method: 'POST',
failure: function(response) {
Ext.Msg.alert(gettext('Error'), response.htmlStatus);
},
success: function(response, options) {
Ext.create('Proxmox.window.TaskViewer', {
upid: response.result.data,
}).show();
},
});
},
garbageCollect: function() {
let me = this;
let view = me.getView();
@ -115,6 +136,12 @@ Ext.define('PBS.DataStoreConfig', {
},
// remove_btn
'-',
{
xtype: 'proxmoxButton',
text: gettext('Verify'),
disabled: true,
handler: 'onVerify',
},
{
xtype: 'proxmoxButton',
text: gettext('Start GC'),

View File

@ -2,7 +2,19 @@ Ext.define('pbs-datastore-statistics', {
extend: 'Ext.data.Model',
fields: [
'store', 'total', 'used', 'avail', 'estimated-full-date', 'history',
'store', 'total', 'used', 'avail', 'estimated-full-date',
{
name: 'history',
convert: function(values) {
let last = null;
return values.map(v => {
if (v !== undefined && v !== null) {
last = v;
}
return last;
});
}
},
{
name: 'usage',
calculate: function(data) {

View File

@ -56,10 +56,16 @@ Ext.define('PBS.LongestTasks', {
type: 'diff',
autoDestroy: true,
autoDestroyRstore: true,
sorters: {
sorters: [
{
property: 'duration',
direction: 'DESC',
},
{
property: 'upid',
direction: 'ASC',
},
],
rstore: {
storeid: 'proxmox-tasks-dash',
type: 'store',

View File

@ -16,7 +16,7 @@ Ext.define('PBS.form.DataStoreSelector', {
listConfig: {
columns: [
{
header: gettext('DataStore'),
header: gettext('Datastore'),
sortable: true,
dataIndex: 'store',
renderer: Ext.String.htmlEncode,

View File

@ -39,7 +39,7 @@ Ext.define('PBS.window.CreateDirectory', {
{
xtype: 'proxmoxcheckbox',
name: 'add-datastore',
fieldLabel: gettext('Add Data Store'),
fieldLabel: gettext('Add as Datastore'),
value: '1',
},
],

View File

@ -52,7 +52,7 @@ Ext.define("PBS.window.FileBrowser", {
extend: "Ext.window.Window",
width: 800,
height: 400,
height: 600,
modal: true,
@ -142,8 +142,13 @@ Ext.define("PBS.window.FileBrowser", {
'backup-type': view['backup-type'],
'backup-time': view['backup-time'],
});
store.load();
store.getRoot().expand();
store.load(() => {
let root = store.getRoot();
root.expand(); // always expand invisible root node
if (root.childNodes.length === 1) {
root.firstChild.expand();
}
});
},
control: {

94
www/window/ZFSCreate.js Normal file
View File

@ -0,0 +1,94 @@
Ext.define('PBS.window.CreateZFS', {
extend: 'Proxmox.window.Edit',
xtype: 'pbsCreateZFS',
subject: 'ZFS',
showProgress: true,
onlineHelp: 'chapter_zfs',
width: 800,
url: '/nodes/localhost/disks/zfs',
method: 'POST',
items: [
{
xtype: 'inputpanel',
onGetValues: function(values) {
return values;
},
column1: [
{
xtype: 'proxmoxtextfield',
name: 'name',
fieldLabel: gettext('Name'),
allowBlank: false
},
{
xtype: 'proxmoxcheckbox',
name: 'add-datastore',
fieldLabel: gettext('Add as Datastore'),
value: '1'
}
],
column2: [
{
xtype: 'proxmoxKVComboBox',
fieldLabel: gettext('RAID Level'),
name: 'raidlevel',
value: 'single',
comboItems: [
['single', gettext('Single Disk')],
['mirror', 'Mirror'],
['raid10', 'RAID10'],
['raidz', 'RAIDZ'],
['raidz2', 'RAIDZ2'],
['raidz3', 'RAIDZ3']
]
},
{
xtype: 'proxmoxKVComboBox',
fieldLabel: gettext('Compression'),
name: 'compression',
value: 'on',
comboItems: [
['on', 'on'],
['off', 'off'],
['gzip', 'gzip'],
['lz4', 'lz4'],
['lzjb', 'lzjb'],
['zle', 'zle']
]
},
{
xtype: 'proxmoxintegerfield',
fieldLabel: gettext('ashift'),
minValue: 9,
maxValue: 16,
value: '12',
name: 'ashift'
}
],
columnB: [
{
xtype: 'pmxMultiDiskSelector',
name: 'devices',
nodename: 'localhost',
typeParam: 'usage-type',
valueField: 'name',
height: 200,
emptyText: gettext('No Disks unused'),
}
]
},
{
xtype: 'displayfield',
padding: '5 0 0 0',
userCls: 'pmx-hint',
value: 'Note: ZFS is not compatible with disks backed by a hardware ' +
'RAID controller. For details see ' +
'<a target="_blank" href="' + Proxmox.Utils.get_help_link('chapter_zfs') + '">the reference documentation</a>.',
},
],
});