src/api2/admin/datastore/backup.rs: implement upload chunk

This commit is contained in:
Dietmar Maurer 2019-05-09 13:06:09 +02:00
parent 4ebf0eabb0
commit 21ee7912fa
4 changed files with 123 additions and 11 deletions

View File

@ -14,6 +14,7 @@ use crate::tools;
use crate::api_schema::router::*;
use crate::api_schema::*;
use crate::server::WorkerTask;
use crate::backup::*;
mod environment;
use environment::*;
@ -21,6 +22,10 @@ use environment::*;
mod service;
use service::*;
mod upload_chunk;
use upload_chunk::*;
pub fn api_method_upgrade_backup() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upgrade_to_backup_protocol,
@ -43,6 +48,8 @@ fn upgrade_to_backup_protocol(
static PROXMOX_BACKUP_PROTOCOL_ID: &str = "proxmox-backup-protocol-h2";
let store = tools::required_string_param(&param, "store")?;
let datastore = DataStore::lookup_datastore(store)?;
let backup_type = tools::required_string_param(&param, "backup-type")?;
let backup_id = tools::required_string_param(&param, "backup-id")?;
@ -68,7 +75,7 @@ fn upgrade_to_backup_protocol(
let env_type = rpcenv.env_type();
WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
let backup_env = BackupEnvironment::new(env_type, username.clone(), worker.clone());
let backup_env = BackupEnvironment::new(env_type, username.clone(), worker.clone(), datastore);
let service = BackupService::new(backup_env, worker.clone());
let abort_future = worker.abort_future();
@ -116,7 +123,11 @@ fn backup_api() -> Router {
)
);
let chunks = Router::new()
.upload(api_method_upload_chunk());
let router = Router::new()
.subdir("chunks", chunks)
.subdir("test1", test1)
.subdir("test2", test2)
.list_subdirs();
@ -124,10 +135,6 @@ fn backup_api() -> Router {
router
}
fn get_backup_environment(rpcenv: &mut RpcEnvironment) -> &BackupEnvironment {
rpcenv.as_any().downcast_ref::<BackupEnvironment>().unwrap()
}
fn test1_get (
_param: Value,
_info: &ApiMethod,
@ -136,7 +143,7 @@ fn test1_get (
println!("TYPEID {:?}", (*rpcenv).type_id());
let env = get_backup_environment(rpcenv);
let env: &BackupEnvironment = rpcenv.as_ref();
env.log("Inside test1_get()");

View File

@ -5,6 +5,7 @@ use serde_json::Value;
use crate::api_schema::router::{RpcEnvironment, RpcEnvironmentType};
use crate::server::WorkerTask;
use crate::backup::*;
/// `RpcEnvironmet` implementation for backup service
#[derive(Clone)]
@ -12,17 +13,18 @@ pub struct BackupEnvironment {
env_type: RpcEnvironmentType,
result_attributes: HashMap<String, Value>,
user: String,
worker: Arc<WorkerTask>,
pub worker: Arc<WorkerTask>,
pub datastore: Arc<DataStore>,
}
impl BackupEnvironment {
pub fn new(env_type: RpcEnvironmentType, user: String, worker: Arc<WorkerTask>) -> Self {
pub fn new(env_type: RpcEnvironmentType, user: String, worker: Arc<WorkerTask>, datastore: Arc<DataStore>) -> Self {
Self {
result_attributes: HashMap::new(),
env_type,
user,
worker,
datastore,
}
}
@ -53,3 +55,9 @@ impl RpcEnvironment for BackupEnvironment {
Some(self.user.clone())
}
}
impl AsRef<BackupEnvironment> for RpcEnvironment {
fn as_ref(&self) -> &BackupEnvironment {
self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
}
}

View File

@ -0,0 +1,97 @@
use failure::*;
use futures::*;
use std::sync::Arc;
use hyper::http::request::Parts;
use hyper::Body;
use serde_json::{json, Value};
use crate::tools;
use crate::backup::*;
use crate::api_schema::*;
use crate::api_schema::router::*;
use super::environment::*;
pub struct UploadChunk {
stream: Body,
store: Arc<DataStore>,
size: u64,
chunk: Vec<u8>,
}
impl UploadChunk {
pub fn new(stream: Body, store: Arc<DataStore>, size: u64) -> Self {
Self { stream, store, size, chunk: vec![] }
}
}
impl Future for UploadChunk {
type Item = Value;
type Error = failure::Error;
fn poll(&mut self) -> Poll<Value, failure::Error> {
loop {
match try_ready!(self.stream.poll()) {
Some(chunk) => {
if (self.chunk.len() + chunk.len()) > (self.size as usize) {
bail!("uploaded chunk is larger than announced.");
}
self.chunk.extend_from_slice(&chunk);
}
None => {
let (is_duplicate, digest, _compressed_size) = self.store.insert_chunk(&self.chunk)?;
let result = json!({
"digest": tools::digest_to_hex(&digest),
"duplicate": is_duplicate,
});
return Ok(Async::Ready(result))
}
}
}
}
}
pub fn api_method_upload_chunk() -> ApiAsyncMethod {
ApiAsyncMethod::new(
upload_chunk,
ObjectSchema::new("Upload chunk.")
.required("size", IntegerSchema::new("Chunk size.")
.minimum(1)
.maximum(1024*16)
)
)
}
fn upload_chunk(
_parts: Parts,
req_body: Body,
param: Value,
_info: &ApiAsyncMethod,
rpcenv: &mut RpcEnvironment,
) -> Result<BoxFut, Error> {
let size = tools::required_integer_param(&param, "size")?;
let env: &BackupEnvironment = rpcenv.as_ref();
let upload = UploadChunk::new(req_body, env.datastore.clone(), size as u64);
let abort_future = env.worker.abort_future().then(|_| Ok(Value::Null));
let resp = upload.select(abort_future)
.then(move |result| {
use crate::server::formatter::*;
match result {
Ok((result,_)) => Ok(json_response(result)),
Err((err, _)) => Ok(json_format_error(err)),
}
});
Ok(Box::new(resp))
}

View File

@ -19,7 +19,7 @@ pub struct OutputFormatter {
static JSON_CONTENT_TYPE: &str = "application/json;charset=UTF-8";
fn json_response(result: Value) -> Response<Body> {
pub fn json_response(result: Value) -> Response<Body> {
let json_str = result.to_string();
@ -50,7 +50,7 @@ fn json_format_result(data: Value, rpcenv: &RpcEnvironment) -> Response<Body> {
json_response(result)
}
fn json_format_error(err: Error) -> Response<Body> {
pub fn json_format_error(err: Error) -> Response<Body> {
let mut response = if let Some(apierr) = err.downcast_ref::<HttpError>() {
let mut resp = Response::new(Body::from(apierr.message.clone()));