@ -1,13 +1,13 @@
|
||||
use std::sync::{Arc,RwLock};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
|
||||
|
||||
use pbs_api_types::Authid;
|
||||
use pbs_datastore::backup_info::BackupDir;
|
||||
use pbs_datastore::DataStore;
|
||||
use pbs_api_types::Authid;
|
||||
use proxmox_rest_server::formatter::*;
|
||||
use proxmox_rest_server::WorkerTask;
|
||||
|
||||
@ -22,7 +22,7 @@ pub struct ReaderEnvironment {
|
||||
pub worker: Arc<WorkerTask>,
|
||||
pub datastore: Arc<DataStore>,
|
||||
pub backup_dir: BackupDir,
|
||||
allowed_chunks: Arc<RwLock<HashSet<[u8;32]>>>,
|
||||
allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
|
||||
}
|
||||
|
||||
impl ReaderEnvironment {
|
||||
@ -33,8 +33,6 @@ impl ReaderEnvironment {
|
||||
datastore: Arc<DataStore>,
|
||||
backup_dir: BackupDir,
|
||||
) -> Self {
|
||||
|
||||
|
||||
Self {
|
||||
result_attributes: json!({}),
|
||||
env_type,
|
||||
@ -53,22 +51,22 @@ impl ReaderEnvironment {
|
||||
}
|
||||
|
||||
pub fn debug<S: AsRef<str>>(&self, msg: S) {
|
||||
if self.debug { self.worker.log_message(msg); }
|
||||
if self.debug {
|
||||
self.worker.log_message(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn register_chunk(&self, digest: [u8;32]) {
|
||||
pub fn register_chunk(&self, digest: [u8; 32]) {
|
||||
let mut allowed_chunks = self.allowed_chunks.write().unwrap();
|
||||
allowed_chunks.insert(digest);
|
||||
}
|
||||
|
||||
pub fn check_chunk_access(&self, digest: [u8;32]) -> bool {
|
||||
self.allowed_chunks.read().unwrap().contains(&digest)
|
||||
pub fn check_chunk_access(&self, digest: [u8; 32]) -> bool {
|
||||
self.allowed_chunks.read().unwrap().contains(&digest)
|
||||
}
|
||||
}
|
||||
|
||||
impl RpcEnvironment for ReaderEnvironment {
|
||||
|
||||
fn result_attrib_mut(&mut self) -> &mut Value {
|
||||
&mut self.result_attributes
|
||||
}
|
||||
|
@ -2,58 +2,66 @@
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use futures::*;
|
||||
use hex::FromHex;
|
||||
use hyper::header::{self, HeaderValue, UPGRADE};
|
||||
use hyper::http::request::Parts;
|
||||
use hyper::{Body, Response, Request, StatusCode};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use serde_json::Value;
|
||||
use hex::FromHex;
|
||||
|
||||
use proxmox_sys::sortable;
|
||||
use proxmox_router::{
|
||||
http_err, list_subdirs_api_method, ApiHandler, ApiMethod, ApiResponseFuture, Permission,
|
||||
Router, RpcEnvironment, SubdirMap,
|
||||
};
|
||||
use proxmox_schema::{BooleanSchema, ObjectSchema};
|
||||
use proxmox_sys::sortable;
|
||||
|
||||
use pbs_api_types::{
|
||||
Authid, Operation, DATASTORE_SCHEMA, BACKUP_TYPE_SCHEMA, BACKUP_TIME_SCHEMA,
|
||||
BACKUP_ID_SCHEMA, CHUNK_DIGEST_SCHEMA, PRIV_DATASTORE_READ, PRIV_DATASTORE_BACKUP,
|
||||
BACKUP_ARCHIVE_NAME_SCHEMA,
|
||||
Authid, Operation, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA,
|
||||
BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP,
|
||||
PRIV_DATASTORE_READ,
|
||||
};
|
||||
use proxmox_sys::fs::lock_dir_noblock_shared;
|
||||
use pbs_tools::json::{required_integer_param, required_string_param};
|
||||
use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
|
||||
use pbs_config::CachedUserInfo;
|
||||
use pbs_datastore::backup_info::BackupDir;
|
||||
use pbs_datastore::index::IndexFile;
|
||||
use pbs_datastore::manifest::{archive_type, ArchiveType};
|
||||
use pbs_config::CachedUserInfo;
|
||||
use proxmox_rest_server::{WorkerTask, H2Service};
|
||||
use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
|
||||
use pbs_tools::json::{required_integer_param, required_string_param};
|
||||
use proxmox_rest_server::{H2Service, WorkerTask};
|
||||
use proxmox_sys::fs::lock_dir_noblock_shared;
|
||||
|
||||
use crate::api2::helpers;
|
||||
|
||||
mod environment;
|
||||
use environment::*;
|
||||
|
||||
pub const ROUTER: Router = Router::new()
|
||||
.upgrade(&API_METHOD_UPGRADE_BACKUP);
|
||||
pub const ROUTER: Router = Router::new().upgrade(&API_METHOD_UPGRADE_BACKUP);
|
||||
|
||||
#[sortable]
|
||||
pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
|
||||
&ApiHandler::AsyncHttp(&upgrade_to_backup_reader_protocol),
|
||||
&ObjectSchema::new(
|
||||
concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."),
|
||||
concat!(
|
||||
"Upgraded to backup protocol ('",
|
||||
PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(),
|
||||
"')."
|
||||
),
|
||||
&sorted!([
|
||||
("store", false, &DATASTORE_SCHEMA),
|
||||
("backup-type", false, &BACKUP_TYPE_SCHEMA),
|
||||
("backup-id", false, &BACKUP_ID_SCHEMA),
|
||||
("backup-time", false, &BACKUP_TIME_SCHEMA),
|
||||
("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
|
||||
(
|
||||
"debug",
|
||||
true,
|
||||
&BooleanSchema::new("Enable verbose debug logging.").schema()
|
||||
),
|
||||
]),
|
||||
)
|
||||
).access(
|
||||
),
|
||||
)
|
||||
.access(
|
||||
// Note: parameter 'store' is no uri parameter, so we need to test inside function body
|
||||
Some("The user needs Datastore.Read privilege on /datastore/{store}."),
|
||||
&Permission::Anybody
|
||||
&Permission::Anybody,
|
||||
);
|
||||
|
||||
fn upgrade_to_backup_reader_protocol(
|
||||
@ -63,7 +71,6 @@ fn upgrade_to_backup_reader_protocol(
|
||||
_info: &ApiMethod,
|
||||
rpcenv: Box<dyn RpcEnvironment>,
|
||||
) -> ApiResponseFuture {
|
||||
|
||||
async move {
|
||||
let debug = param["debug"].as_bool().unwrap_or(false);
|
||||
|
||||
@ -91,14 +98,17 @@ fn upgrade_to_backup_reader_protocol(
|
||||
.headers
|
||||
.get("UPGRADE")
|
||||
.ok_or_else(|| format_err!("missing Upgrade header"))?
|
||||
.to_str()?;
|
||||
.to_str()?;
|
||||
|
||||
if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
|
||||
bail!("invalid protocol name");
|
||||
}
|
||||
|
||||
if parts.version >= http::version::Version::HTTP_2 {
|
||||
bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
|
||||
if parts.version >= http::version::Version::HTTP_2 {
|
||||
bail!(
|
||||
"unexpected http version '{:?}' (expected version < 2)",
|
||||
parts.version
|
||||
);
|
||||
}
|
||||
|
||||
let env_type = rpcenv.env_type();
|
||||
@ -107,8 +117,7 @@ fn upgrade_to_backup_reader_protocol(
|
||||
if !priv_read {
|
||||
let owner = datastore.get_owner(backup_dir.group())?;
|
||||
let correct_owner = owner == auth_id
|
||||
|| (owner.is_token()
|
||||
&& Authid::from(owner.user().clone()) == auth_id);
|
||||
|| (owner.is_token() && Authid::from(owner.user().clone()) == auth_id);
|
||||
if !correct_owner {
|
||||
bail!("backup owner check failed!");
|
||||
}
|
||||
@ -117,83 +126,100 @@ fn upgrade_to_backup_reader_protocol(
|
||||
let _guard = lock_dir_noblock_shared(
|
||||
&datastore.snapshot_path(&backup_dir),
|
||||
"snapshot",
|
||||
"locked by another operation")?;
|
||||
"locked by another operation",
|
||||
)?;
|
||||
|
||||
let path = datastore.base_path();
|
||||
|
||||
//let files = BackupInfo::list_files(&path, &backup_dir)?;
|
||||
|
||||
let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
|
||||
let worker_id = format!(
|
||||
"{}:{}/{}/{:08X}",
|
||||
store,
|
||||
backup_type,
|
||||
backup_id,
|
||||
backup_dir.backup_time()
|
||||
);
|
||||
|
||||
WorkerTask::spawn("reader", Some(worker_id), auth_id.to_string(), true, move |worker| async move {
|
||||
let _guard = _guard;
|
||||
WorkerTask::spawn(
|
||||
"reader",
|
||||
Some(worker_id),
|
||||
auth_id.to_string(),
|
||||
true,
|
||||
move |worker| async move {
|
||||
let _guard = _guard;
|
||||
|
||||
let mut env = ReaderEnvironment::new(
|
||||
env_type,
|
||||
auth_id,
|
||||
worker.clone(),
|
||||
datastore,
|
||||
backup_dir,
|
||||
);
|
||||
let mut env = ReaderEnvironment::new(
|
||||
env_type,
|
||||
auth_id,
|
||||
worker.clone(),
|
||||
datastore,
|
||||
backup_dir,
|
||||
);
|
||||
|
||||
env.debug = debug;
|
||||
env.debug = debug;
|
||||
|
||||
env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
|
||||
env.log(format!(
|
||||
"starting new backup reader datastore '{}': {:?}",
|
||||
store, path
|
||||
));
|
||||
|
||||
let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
|
||||
let service =
|
||||
H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
|
||||
|
||||
let mut abort_future = worker.abort_future()
|
||||
.map(|_| Err(format_err!("task aborted")));
|
||||
let mut abort_future = worker
|
||||
.abort_future()
|
||||
.map(|_| Err(format_err!("task aborted")));
|
||||
|
||||
let env2 = env.clone();
|
||||
let req_fut = async move {
|
||||
let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
|
||||
env2.debug("protocol upgrade done");
|
||||
let env2 = env.clone();
|
||||
let req_fut = async move {
|
||||
let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
|
||||
env2.debug("protocol upgrade done");
|
||||
|
||||
let mut http = hyper::server::conn::Http::new();
|
||||
http.http2_only(true);
|
||||
// increase window size: todo - find optiomal size
|
||||
let window_size = 32*1024*1024; // max = (1 << 31) - 2
|
||||
http.http2_initial_stream_window_size(window_size);
|
||||
http.http2_initial_connection_window_size(window_size);
|
||||
http.http2_max_frame_size(4*1024*1024);
|
||||
let mut http = hyper::server::conn::Http::new();
|
||||
http.http2_only(true);
|
||||
// increase window size: todo - find optiomal size
|
||||
let window_size = 32 * 1024 * 1024; // max = (1 << 31) - 2
|
||||
http.http2_initial_stream_window_size(window_size);
|
||||
http.http2_initial_connection_window_size(window_size);
|
||||
http.http2_max_frame_size(4 * 1024 * 1024);
|
||||
|
||||
http.serve_connection(conn, service)
|
||||
.map_err(Error::from).await
|
||||
};
|
||||
http.serve_connection(conn, service)
|
||||
.map_err(Error::from)
|
||||
.await
|
||||
};
|
||||
|
||||
futures::select!{
|
||||
req = req_fut.fuse() => req?,
|
||||
abort = abort_future => abort?,
|
||||
};
|
||||
futures::select! {
|
||||
req = req_fut.fuse() => req?,
|
||||
abort = abort_future => abort?,
|
||||
};
|
||||
|
||||
env.log("reader finished successfully");
|
||||
env.log("reader finished successfully");
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
|
||||
let response = Response::builder()
|
||||
.status(StatusCode::SWITCHING_PROTOCOLS)
|
||||
.header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
|
||||
.header(
|
||||
UPGRADE,
|
||||
HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()),
|
||||
)
|
||||
.body(Body::empty())?;
|
||||
|
||||
Ok(response)
|
||||
}.boxed()
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
const READER_API_SUBDIRS: SubdirMap = &[
|
||||
("chunk", &Router::new().download(&API_METHOD_DOWNLOAD_CHUNK)),
|
||||
(
|
||||
"chunk", &Router::new()
|
||||
.download(&API_METHOD_DOWNLOAD_CHUNK)
|
||||
),
|
||||
(
|
||||
"download", &Router::new()
|
||||
.download(&API_METHOD_DOWNLOAD_FILE)
|
||||
),
|
||||
(
|
||||
"speedtest", &Router::new()
|
||||
.download(&API_METHOD_SPEEDTEST)
|
||||
"download",
|
||||
&Router::new().download(&API_METHOD_DOWNLOAD_FILE),
|
||||
),
|
||||
("speedtest", &Router::new().download(&API_METHOD_SPEEDTEST)),
|
||||
];
|
||||
|
||||
pub const READER_API_ROUTER: Router = Router::new()
|
||||
@ -205,10 +231,8 @@ pub const API_METHOD_DOWNLOAD_FILE: ApiMethod = ApiMethod::new(
|
||||
&ApiHandler::AsyncHttp(&download_file),
|
||||
&ObjectSchema::new(
|
||||
"Download specified file.",
|
||||
&sorted!([
|
||||
("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),
|
||||
]),
|
||||
)
|
||||
&sorted!([("file-name", false, &BACKUP_ARCHIVE_NAME_SCHEMA),]),
|
||||
),
|
||||
);
|
||||
|
||||
fn download_file(
|
||||
@ -218,7 +242,6 @@ fn download_file(
|
||||
_info: &ApiMethod,
|
||||
rpcenv: Box<dyn RpcEnvironment>,
|
||||
) -> ApiResponseFuture {
|
||||
|
||||
async move {
|
||||
let env: &ReaderEnvironment = rpcenv.as_ref();
|
||||
|
||||
@ -239,11 +262,14 @@ fn download_file(
|
||||
let index = env.datastore.open_dynamic_reader(&path)?;
|
||||
Some(Box::new(index))
|
||||
}
|
||||
_ => { None }
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(index) = index {
|
||||
env.log(format!("register chunks in '{}' as downloadable.", file_name));
|
||||
env.log(format!(
|
||||
"register chunks in '{}' as downloadable.",
|
||||
file_name
|
||||
));
|
||||
|
||||
for pos in 0..index.index_count() {
|
||||
let info = index.chunk_info(pos).unwrap();
|
||||
@ -252,7 +278,8 @@ fn download_file(
|
||||
}
|
||||
|
||||
helpers::create_download_response(path).await
|
||||
}.boxed()
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
#[sortable]
|
||||
@ -260,10 +287,8 @@ pub const API_METHOD_DOWNLOAD_CHUNK: ApiMethod = ApiMethod::new(
|
||||
&ApiHandler::AsyncHttp(&download_chunk),
|
||||
&ObjectSchema::new(
|
||||
"Download specified chunk.",
|
||||
&sorted!([
|
||||
("digest", false, &CHUNK_DIGEST_SCHEMA),
|
||||
]),
|
||||
)
|
||||
&sorted!([("digest", false, &CHUNK_DIGEST_SCHEMA),]),
|
||||
),
|
||||
);
|
||||
|
||||
fn download_chunk(
|
||||
@ -273,7 +298,6 @@ fn download_chunk(
|
||||
_info: &ApiMethod,
|
||||
rpcenv: Box<dyn RpcEnvironment>,
|
||||
) -> ApiResponseFuture {
|
||||
|
||||
async move {
|
||||
let env: &ReaderEnvironment = rpcenv.as_ref();
|
||||
|
||||
@ -281,8 +305,15 @@ fn download_chunk(
|
||||
let digest = <[u8; 32]>::from_hex(digest_str)?;
|
||||
|
||||
if !env.check_chunk_access(digest) {
|
||||
env.log(format!("attempted to download chunk {} which is not in registered chunk list", digest_str));
|
||||
return Err(http_err!(UNAUTHORIZED, "download chunk {} not allowed", digest_str));
|
||||
env.log(format!(
|
||||
"attempted to download chunk {} which is not in registered chunk list",
|
||||
digest_str
|
||||
));
|
||||
return Err(http_err!(
|
||||
UNAUTHORIZED,
|
||||
"download chunk {} not allowed",
|
||||
digest_str
|
||||
));
|
||||
}
|
||||
|
||||
let (path, _) = env.datastore.chunk_path(&digest);
|
||||
@ -290,18 +321,21 @@ fn download_chunk(
|
||||
|
||||
env.debug(format!("download chunk {:?}", path));
|
||||
|
||||
let data = proxmox_async::runtime::block_in_place(|| std::fs::read(path))
|
||||
.map_err(move |err| http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err))?;
|
||||
let data =
|
||||
proxmox_async::runtime::block_in_place(|| std::fs::read(path)).map_err(move |err| {
|
||||
http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err)
|
||||
})?;
|
||||
|
||||
let body = Body::from(data);
|
||||
|
||||
// fixme: set other headers ?
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(body)
|
||||
.unwrap())
|
||||
}.boxed()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.body(body)
|
||||
.unwrap())
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/* this is too slow
|
||||
@ -347,7 +381,7 @@ fn download_chunk_old(
|
||||
|
||||
pub const API_METHOD_SPEEDTEST: ApiMethod = ApiMethod::new(
|
||||
&ApiHandler::AsyncHttp(&speedtest),
|
||||
&ObjectSchema::new("Test 1M block download speed.", &[])
|
||||
&ObjectSchema::new("Test 1M block download speed.", &[]),
|
||||
);
|
||||
|
||||
fn speedtest(
|
||||
@ -357,8 +391,7 @@ fn speedtest(
|
||||
_info: &ApiMethod,
|
||||
_rpcenv: Box<dyn RpcEnvironment>,
|
||||
) -> ApiResponseFuture {
|
||||
|
||||
let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A...]
|
||||
let buffer = vec![65u8; 1024 * 1024]; // nonsense [A,A,A...]
|
||||
|
||||
let body = Body::from(buffer);
|
||||
|
||||
|
Reference in New Issue
Block a user