src/api2/reader.rs: implement backup reader protocol
This commit is contained in:
parent
42a87f7b96
commit
dd066d28e2
|
@ -2,6 +2,7 @@ pub mod types;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod admin;
|
pub mod admin;
|
||||||
pub mod backup;
|
pub mod backup;
|
||||||
|
pub mod reader;
|
||||||
pub mod node;
|
pub mod node;
|
||||||
mod version;
|
mod version;
|
||||||
mod subscription;
|
mod subscription;
|
||||||
|
@ -18,6 +19,7 @@ pub fn router() -> Router {
|
||||||
.subdir("access", access::router())
|
.subdir("access", access::router())
|
||||||
.subdir("admin", admin::router())
|
.subdir("admin", admin::router())
|
||||||
.subdir("backup", backup::router())
|
.subdir("backup", backup::router())
|
||||||
|
.subdir("reader", reader::router())
|
||||||
.subdir("config", config::router())
|
.subdir("config", config::router())
|
||||||
.subdir("nodes", nodes)
|
.subdir("nodes", nodes)
|
||||||
.subdir("subscription", subscription::router())
|
.subdir("subscription", subscription::router())
|
||||||
|
|
|
@ -0,0 +1,193 @@
|
||||||
|
use failure::*;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use futures::*;
|
||||||
|
use hyper::header::{self, HeaderValue, UPGRADE};
|
||||||
|
use hyper::{Body, Response, StatusCode};
|
||||||
|
use hyper::http::request::Parts;
|
||||||
|
//use chrono::{Local, TimeZone};
|
||||||
|
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
use crate::tools;
|
||||||
|
use crate::api_schema::router::*;
|
||||||
|
use crate::api_schema::*;
|
||||||
|
use crate::server::{WorkerTask, H2Service};
|
||||||
|
use crate::backup::*;
|
||||||
|
//use crate::api2::types::*;
|
||||||
|
|
||||||
|
mod environment;
|
||||||
|
use environment::*;
|
||||||
|
|
||||||
|
pub fn router() -> Router {
|
||||||
|
Router::new()
|
||||||
|
.upgrade(api_method_upgrade_backup())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
|
||||||
|
ApiAsyncMethod::new(
|
||||||
|
upgrade_to_backup_reader_protocol,
|
||||||
|
ObjectSchema::new(concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!(), "')."))
|
||||||
|
.required("store", StringSchema::new("Datastore name."))
|
||||||
|
.required("backup-type", StringSchema::new("Backup type.")
|
||||||
|
.format(Arc::new(ApiStringFormat::Enum(&["vm", "ct", "host"]))))
|
||||||
|
.required("backup-id", StringSchema::new("Backup ID."))
|
||||||
|
.required("backup-time", IntegerSchema::new("Backup time (Unix epoch.)")
|
||||||
|
.minimum(1547797308))
|
||||||
|
.optional("debug", BooleanSchema::new("Enable verbose debug logging."))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn upgrade_to_backup_reader_protocol(
|
||||||
|
parts: Parts,
|
||||||
|
req_body: Body,
|
||||||
|
param: Value,
|
||||||
|
_info: &ApiAsyncMethod,
|
||||||
|
rpcenv: Box<dyn RpcEnvironment>,
|
||||||
|
) -> Result<BoxFut, Error> {
|
||||||
|
|
||||||
|
let debug = param["debug"].as_bool().unwrap_or(false);
|
||||||
|
|
||||||
|
let store = tools::required_string_param(¶m, "store")?.to_owned();
|
||||||
|
let datastore = DataStore::lookup_datastore(&store)?;
|
||||||
|
|
||||||
|
let backup_type = tools::required_string_param(¶m, "backup-type")?;
|
||||||
|
let backup_id = tools::required_string_param(¶m, "backup-id")?;
|
||||||
|
let backup_time = tools::required_integer_param(¶m, "backup-time")?;
|
||||||
|
|
||||||
|
let protocols = parts
|
||||||
|
.headers
|
||||||
|
.get("UPGRADE")
|
||||||
|
.ok_or_else(|| format_err!("missing Upgrade header"))?
|
||||||
|
.to_str()?;
|
||||||
|
|
||||||
|
if protocols != PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!() {
|
||||||
|
bail!("invalid protocol name");
|
||||||
|
}
|
||||||
|
|
||||||
|
if parts.version >= http::version::Version::HTTP_2 {
|
||||||
|
bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
|
||||||
|
}
|
||||||
|
|
||||||
|
let username = rpcenv.get_user().unwrap();
|
||||||
|
let env_type = rpcenv.env_type();
|
||||||
|
|
||||||
|
let backup_dir = BackupDir::new(backup_type, backup_id, backup_time);
|
||||||
|
let path = datastore.base_path();
|
||||||
|
|
||||||
|
//let files = BackupInfo::list_files(&path, &backup_dir)?;
|
||||||
|
|
||||||
|
let worker_id = format!("{}_{}_{}_{:08X}", store, backup_type, backup_id, backup_dir.backup_time().timestamp());
|
||||||
|
|
||||||
|
WorkerTask::spawn("reader", Some(worker_id), &username.clone(), true, move |worker| {
|
||||||
|
let mut env = ReaderEnvironment::new(
|
||||||
|
env_type, username.clone(), worker.clone(), datastore, backup_dir);
|
||||||
|
|
||||||
|
env.debug = debug;
|
||||||
|
|
||||||
|
env.log(format!("starting new backup reader datastore '{}': {:?}", store, path));
|
||||||
|
|
||||||
|
let service = H2Service::new(env.clone(), worker.clone(), &READER_ROUTER, debug);
|
||||||
|
|
||||||
|
let abort_future = worker.abort_future();
|
||||||
|
|
||||||
|
let env3 = env.clone();
|
||||||
|
|
||||||
|
req_body
|
||||||
|
.on_upgrade()
|
||||||
|
.map_err(Error::from)
|
||||||
|
.and_then(move |conn| {
|
||||||
|
env3.debug("protocol upgrade done");
|
||||||
|
|
||||||
|
let mut http = hyper::server::conn::Http::new();
|
||||||
|
http.http2_only(true);
|
||||||
|
// increase window size: todo - find optiomal size
|
||||||
|
let window_size = 32*1024*1024; // max = (1 << 31) - 2
|
||||||
|
http.http2_initial_stream_window_size(window_size);
|
||||||
|
http.http2_initial_connection_window_size(window_size);
|
||||||
|
|
||||||
|
http.serve_connection(conn, service)
|
||||||
|
.map_err(Error::from)
|
||||||
|
})
|
||||||
|
.select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); }))
|
||||||
|
.map_err(|(err, _)| err)
|
||||||
|
.and_then(move |(_result, _)| {
|
||||||
|
env.log("reader finished sucessfully");
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let response = Response::builder()
|
||||||
|
.status(StatusCode::SWITCHING_PROTOCOLS)
|
||||||
|
.header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
|
||||||
|
.body(Body::empty())?;
|
||||||
|
|
||||||
|
Ok(Box::new(futures::future::ok(response)))
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy_static!{
|
||||||
|
static ref READER_ROUTER: Router = reader_api();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reader_api() -> Router {
|
||||||
|
|
||||||
|
let router = Router::new()
|
||||||
|
.subdir(
|
||||||
|
"download", Router::new()
|
||||||
|
.download(api_method_download_file())
|
||||||
|
);
|
||||||
|
|
||||||
|
router
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn api_method_download_file() -> ApiAsyncMethod {
|
||||||
|
ApiAsyncMethod::new(
|
||||||
|
download_file,
|
||||||
|
ObjectSchema::new("Download specified file.")
|
||||||
|
.required("file-name", crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA.clone())
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn download_file(
|
||||||
|
_parts: Parts,
|
||||||
|
_req_body: Body,
|
||||||
|
param: Value,
|
||||||
|
_info: &ApiAsyncMethod,
|
||||||
|
rpcenv: Box<dyn RpcEnvironment>,
|
||||||
|
) -> Result<BoxFut, Error> {
|
||||||
|
|
||||||
|
let env: &ReaderEnvironment = rpcenv.as_ref();
|
||||||
|
let env2 = env.clone();
|
||||||
|
|
||||||
|
let file_name = tools::required_string_param(¶m, "file-name")?.to_owned();
|
||||||
|
|
||||||
|
let mut path = env.datastore.base_path();
|
||||||
|
path.push(env.backup_dir.relative_path());
|
||||||
|
path.push(&file_name);
|
||||||
|
|
||||||
|
let path2 = path.clone();
|
||||||
|
let path3 = path.clone();
|
||||||
|
|
||||||
|
let response_future = tokio::fs::File::open(path)
|
||||||
|
.map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err)))
|
||||||
|
.and_then(move |file| {
|
||||||
|
env2.log(format!("download {:?}", path3));
|
||||||
|
let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()).
|
||||||
|
map(|bytes| {
|
||||||
|
//sigh - howto avoid copy here? or the whole map() ??
|
||||||
|
hyper::Chunk::from(bytes.to_vec())
|
||||||
|
});
|
||||||
|
let body = Body::wrap_stream(payload);
|
||||||
|
|
||||||
|
// fixme: set other headers ?
|
||||||
|
Ok(Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||||
|
.body(body)
|
||||||
|
.unwrap())
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(Box::new(response_future))
|
||||||
|
}
|
|
@ -0,0 +1,94 @@
|
||||||
|
//use failure::*;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
|
||||||
|
use crate::server::WorkerTask;
|
||||||
|
use crate::backup::*;
|
||||||
|
use crate::server::formatter::*;
|
||||||
|
|
||||||
|
//use proxmox::tools;
|
||||||
|
|
||||||
|
/// `RpcEnvironmet` implementation for backup reader service
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ReaderEnvironment {
|
||||||
|
env_type: RpcEnvironmentType,
|
||||||
|
result_attributes: HashMap<String, Value>,
|
||||||
|
user: String,
|
||||||
|
pub debug: bool,
|
||||||
|
pub formatter: &'static OutputFormatter,
|
||||||
|
pub worker: Arc<WorkerTask>,
|
||||||
|
pub datastore: Arc<DataStore>,
|
||||||
|
pub backup_dir: BackupDir,
|
||||||
|
// state: Arc<Mutex<SharedBackupState>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReaderEnvironment {
|
||||||
|
pub fn new(
|
||||||
|
env_type: RpcEnvironmentType,
|
||||||
|
user: String,
|
||||||
|
worker: Arc<WorkerTask>,
|
||||||
|
datastore: Arc<DataStore>,
|
||||||
|
backup_dir: BackupDir,
|
||||||
|
) -> Self {
|
||||||
|
|
||||||
|
|
||||||
|
Self {
|
||||||
|
result_attributes: HashMap::new(),
|
||||||
|
env_type,
|
||||||
|
user,
|
||||||
|
worker,
|
||||||
|
datastore,
|
||||||
|
debug: false,
|
||||||
|
formatter: &JSON_FORMATTER,
|
||||||
|
backup_dir,
|
||||||
|
//state: Arc::new(Mutex::new(state)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn log<S: AsRef<str>>(&self, msg: S) {
|
||||||
|
self.worker.log(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn debug<S: AsRef<str>>(&self, msg: S) {
|
||||||
|
if self.debug { self.worker.log(msg); }
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RpcEnvironment for ReaderEnvironment {
|
||||||
|
|
||||||
|
fn set_result_attrib(&mut self, name: &str, value: Value) {
|
||||||
|
self.result_attributes.insert(name.into(), value);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_result_attrib(&self, name: &str) -> Option<&Value> {
|
||||||
|
self.result_attributes.get(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn env_type(&self) -> RpcEnvironmentType {
|
||||||
|
self.env_type
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_user(&mut self, _user: Option<String>) {
|
||||||
|
panic!("unable to change user");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_user(&self) -> Option<String> {
|
||||||
|
Some(self.user.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<ReaderEnvironment> for dyn RpcEnvironment {
|
||||||
|
fn as_ref(&self) -> &ReaderEnvironment {
|
||||||
|
self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<ReaderEnvironment> for Box<dyn RpcEnvironment> {
|
||||||
|
fn as_ref(&self) -> &ReaderEnvironment {
|
||||||
|
self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
|
||||||
|
}
|
||||||
|
}
|
|
@ -107,6 +107,11 @@ macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
|
||||||
() => { "proxmox-backup-protocol-v1" }
|
() => { "proxmox-backup-protocol-v1" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! PROXMOX_BACKUP_READER_PROTOCOL_ID_V1 {
|
||||||
|
() => { "proxmox-backup-reader-protocol-v1" }
|
||||||
|
}
|
||||||
|
|
||||||
mod file_formats;
|
mod file_formats;
|
||||||
pub use file_formats::*;
|
pub use file_formats::*;
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ use http::Uri;
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
use hyper::client::Client;
|
use hyper::client::Client;
|
||||||
use xdg::BaseDirectories;
|
use xdg::BaseDirectories;
|
||||||
use chrono::Utc;
|
use chrono::{DateTime, Local, Utc};
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
@ -275,6 +275,28 @@ impl HttpClient {
|
||||||
.map(|(h2, canceller)| BackupClient::new(h2, canceller))
|
.map(|(h2, canceller)| BackupClient::new(h2, canceller))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn start_backup_reader(
|
||||||
|
&self,
|
||||||
|
datastore: &str,
|
||||||
|
backup_type: &str,
|
||||||
|
backup_id: &str,
|
||||||
|
backup_time: DateTime<Local>,
|
||||||
|
debug: bool,
|
||||||
|
) -> impl Future<Item=BackupReader, Error=Error> {
|
||||||
|
|
||||||
|
let param = json!({
|
||||||
|
"backup-type": backup_type,
|
||||||
|
"backup-id": backup_id,
|
||||||
|
"backup-time": backup_time.timestamp(),
|
||||||
|
"store": datastore,
|
||||||
|
"debug": debug,
|
||||||
|
});
|
||||||
|
let req = Self::request_builder(&self.server, "GET", "/api2/json/reader", Some(param)).unwrap();
|
||||||
|
|
||||||
|
self.start_h2_connection(req, String::from(PROXMOX_BACKUP_READER_PROTOCOL_ID_V1!()))
|
||||||
|
.map(|(h2, canceller)| BackupReader::new(h2, canceller))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn start_h2_connection(
|
pub fn start_h2_connection(
|
||||||
&self,
|
&self,
|
||||||
mut req: Request<Body>,
|
mut req: Request<Body>,
|
||||||
|
@ -428,12 +450,69 @@ impl HttpClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//#[derive(Clone)]
|
|
||||||
|
pub struct BackupReader {
|
||||||
|
h2: H2Client,
|
||||||
|
canceller: Option<Canceller>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for BackupReader {
|
||||||
|
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(canceller) = self.canceller.take() {
|
||||||
|
canceller.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BackupReader {
|
||||||
|
|
||||||
|
pub fn new(h2: H2Client, canceller: Canceller) -> Self {
|
||||||
|
Self { h2, canceller: Some(canceller) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
self.h2.get(path, param)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn put(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
self.h2.put(path, param)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn post(&self, path: &str, param: Option<Value>) -> impl Future<Item=Value, Error=Error> {
|
||||||
|
self.h2.post(path, param)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn download<W: Write>(
|
||||||
|
&self,
|
||||||
|
file_name: &str,
|
||||||
|
output: W,
|
||||||
|
) -> impl Future<Item=W, Error=Error> {
|
||||||
|
let path = "download";
|
||||||
|
let param = json!({ "file-name": file_name });
|
||||||
|
self.h2.download(path, Some(param), output)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn force_close(mut self) {
|
||||||
|
if let Some(canceller) = self.canceller.take() {
|
||||||
|
canceller.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct BackupClient {
|
pub struct BackupClient {
|
||||||
h2: H2Client,
|
h2: H2Client,
|
||||||
canceller: Option<Canceller>,
|
canceller: Option<Canceller>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for BackupClient {
|
||||||
|
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(canceller) = self.canceller.take() {
|
||||||
|
canceller.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl BackupClient {
|
impl BackupClient {
|
||||||
|
|
||||||
|
@ -462,7 +541,9 @@ impl BackupClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn force_close(mut self) {
|
pub fn force_close(mut self) {
|
||||||
self.canceller.take().unwrap().cancel();
|
if let Some(canceller) = self.canceller.take() {
|
||||||
|
canceller.cancel();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn upload_blob_from_data(
|
pub fn upload_blob_from_data(
|
||||||
|
@ -905,6 +986,34 @@ impl H2Client {
|
||||||
self.request(req)
|
self.request(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn download<W: Write>(&self, path: &str, param: Option<Value>, output: W) -> impl Future<Item=W, Error=Error> {
|
||||||
|
let request = Self::request_builder("localhost", "GET", path, param).unwrap();
|
||||||
|
|
||||||
|
self.send_request(request, None)
|
||||||
|
.and_then(move |response| {
|
||||||
|
response
|
||||||
|
.map_err(Error::from)
|
||||||
|
.and_then(move |resp| {
|
||||||
|
let status = resp.status();
|
||||||
|
if !status.is_success() {
|
||||||
|
future::Either::A(
|
||||||
|
H2Client::h2api_response(resp)
|
||||||
|
.and_then(|_| { bail!("unknown error"); })
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
future::Either::B(
|
||||||
|
resp.into_body()
|
||||||
|
.map_err(Error::from)
|
||||||
|
.fold(output, move |mut acc, chunk| {
|
||||||
|
acc.write_all(&chunk)?;
|
||||||
|
Ok::<_, Error>(acc)
|
||||||
|
})
|
||||||
|
)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
|
pub fn upload(&self, path: &str, param: Option<Value>, data: Vec<u8>) -> impl Future<Item=Value, Error=Error> {
|
||||||
let request = Self::request_builder("localhost", "POST", path, param).unwrap();
|
let request = Self::request_builder("localhost", "POST", path, param).unwrap();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue