From e2d007f76e77500dd7bba5bf506750d46d70bd48 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Wed, 6 Mar 2019 10:21:22 +0100 Subject: [PATCH] api2/admin/datastore: add a backup protocol test api path Signed-off-by: Wolfgang Bumiller --- Cargo.toml | 1 + src/api2/admin/datastore.rs | 5 + src/api2/admin/datastore/upload.rs | 252 +++++++++++++++++++++++++++++ 3 files changed, 258 insertions(+) create mode 100644 src/api2/admin/datastore/upload.rs diff --git a/Cargo.toml b/Cargo.toml index 9d5f03ec..7cd4b010 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ name = "proxmox_backup" path = "src/lib.rs" [dependencies] +proxmox-protocol = { path = "proxmox-protocol" } log = "0.4" syslog = "4.0" failure = "0.1" diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 404e420a..81be0d8c 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -18,6 +18,7 @@ use crate::config::datastore; use crate::backup::*; mod catar; +mod upload; fn group_backups(backup_list: Vec) -> HashMap> { @@ -381,6 +382,10 @@ pub fn router() -> Router { Router::new() .download(catar::api_method_download_catar()) .upload(catar::api_method_upload_catar())) + .subdir( + "test-upload", + Router::new() + .upgrade(upload::api_method_upgrade_upload())) .subdir( "gc", Router::new() diff --git a/src/api2/admin/datastore/upload.rs b/src/api2/admin/datastore/upload.rs new file mode 100644 index 00000000..afe1c355 --- /dev/null +++ b/src/api2/admin/datastore/upload.rs @@ -0,0 +1,252 @@ +use std::path::PathBuf; +use std::sync::Arc; + +use failure::*; +use futures::future::{ok, poll_fn}; +use futures::{Async, Future}; +use hyper::header::{HeaderValue, UPGRADE}; +use hyper::http::request::Parts; +use hyper::rt; +use hyper::{Body, Response, StatusCode}; +use serde_json::Value; + +use proxmox_protocol::protocol::DynamicChunk; +use proxmox_protocol::server as pmx_server; +use proxmox_protocol::{ChunkEntry, FixedChunk}; + +use crate::api_schema::router::*; +use crate::api_schema::*; +use crate::backup::{BackupDir, DataStore, DynamicIndexWriter, FixedIndexWriter, IndexFile}; +use crate::tools; + +type Result = std::result::Result; + +pub fn api_method_upgrade_upload() -> ApiAsyncMethod { + ApiAsyncMethod::new( + upgrade_upload, + ObjectSchema::new("Download .catar backup file.") + .required("store", StringSchema::new("Datastore name.")), + ) +} + +fn upgrade_upload( + parts: Parts, + req_body: Body, + param: Value, + _info: &ApiAsyncMethod, + _rpcenv: &mut RpcEnvironment, +) -> Result { + let store = tools::required_string_param(¶m, "store")?.to_string(); + let expected_protocol: &'static str = "proxmox-backup-protocol-1"; + + let protocols = parts + .headers + .get("UPGRADE") + .ok_or_else(|| format_err!("missing Upgrade header"))? + .to_str()?; + + if protocols != expected_protocol { + bail!("invalid protocol name"); + } + + rt::spawn( + req_body + .on_upgrade() + .map_err(|e| Error::from(e)) + .and_then(move |conn| backup_protocol_handler(conn, &store)) + .map_err(|e| eprintln!("error during upgrade: {}", e)) + .flatten(), + ); + + Ok(Box::new(ok(Response::builder() + .status(StatusCode::SWITCHING_PROTOCOLS) + .header(UPGRADE, HeaderValue::from_static(expected_protocol)) + .body(Body::empty()) + .unwrap()))) +} + +struct BackupClientHandler { + store: Arc, +} + +struct ChunkLister(Box, usize); + +impl pmx_server::ChunkList for ChunkLister { + fn next(&mut self) -> Result> { + if self.1 == self.0.index_count() { + Ok(None) + } else { + let chunk = self.0.index_digest(self.1); + self.1 += 1; + Ok(chunk) + } + } +} + +impl pmx_server::HandleClient for BackupClientHandler { + fn error(&self) { + eprintln!("There was an error!"); + } + + fn get_chunk_list( + &self, + backup_name: &str, + ) -> Result> { + Ok(Box::new(ChunkLister(self.store.open_index(backup_name)?, 0))) + } + + fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result { + let (new, _csize) = self.store.insert_chunk_noverify(&chunk.hash, data)?; + Ok(new) + } + + fn create_backup( + &self, + backup_type: &str, + backup_id: &str, + backup_timestamp: i64, + new: bool, + ) -> Result> { + let (path, is_new) = self.store.create_backup_dir( + &BackupDir::new(backup_type, backup_id, backup_timestamp) + )?; + + if new && !is_new { + bail!("client requested to create a new backup, but it already existed"); + } + + Ok(Box::new(BackupHandler { + store: Arc::clone(&self.store), + path, + })) + } +} + +struct BackupHandler { + store: Arc, + path: PathBuf, +} + +impl pmx_server::HandleBackup for BackupHandler { + fn finish(&mut self) -> Result<()> { + bail!("TODO: finish"); + } + + fn create_file( + &self, + name: &str, + fixed_size: Option, + chunk_size: usize, + ) -> Result> { + if name.find('/').is_some() { + bail!("invalid file name"); + } + + let mut path_str = self.path + .to_str() + .ok_or_else(|| format_err!("generated non-utf8 path"))? + .to_string(); + path_str.push('/'); + path_str.push_str(name); + + match fixed_size { + None => { + path_str.push_str(".didx"); + let path = PathBuf::from(path_str.as_str()); + let writer = self.store.create_dynamic_writer(path, chunk_size)?; + Ok(Box::new(DynamicFile { + writer: Some(writer), + path: path_str, + })) + } + Some(file_size) => { + path_str.push_str(".fidx"); + let path = PathBuf::from(path_str.as_str()); + let writer = self.store.create_fixed_writer(path, file_size as usize, chunk_size)?; + Ok(Box::new(FixedFile { + writer: Some(writer), + path: path_str, + })) + } + } + } +} + +struct DynamicFile { + writer: Option, + path: String, +} + +impl pmx_server::BackupFile for DynamicFile { + fn relative_path(&self) -> &str { + self.path.as_str() + } + + fn add_fixed_data(&mut self, _index: u64, _hash: &FixedChunk) -> Result<()> { + bail!("add_fixed_data data on dynamic index writer!"); + } + + fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()> { + self.writer.as_mut().unwrap() + .add_chunk(chunk.offset, &chunk.digest) + .map_err(Error::from) + } + + fn finish(&mut self) -> Result<()> { + self.writer.take().unwrap().close() + } +} + +struct FixedFile { + writer: Option, + path: String, +} + +impl pmx_server::BackupFile for FixedFile { + fn relative_path(&self) -> &str { + self.path.as_str() + } + + fn add_fixed_data(&mut self, index: u64, hash: &FixedChunk) -> Result<()> { + self.writer.as_mut().unwrap() + .add_digest(index as usize, &hash.0) + } + + fn add_dynamic_data(&mut self, _chunk: &DynamicChunk) -> Result<()> { + bail!("add_dynamic_data data on fixed index writer!"); + } + + fn finish(&mut self) -> Result<()> { + self.writer.take().unwrap().close() + } +} + +fn backup_protocol_handler( + conn: hyper::upgrade::Upgraded, + store_name: &str, +) -> Result + Send>> { + let store = DataStore::lookup_datastore(store_name)?; + let handler = BackupClientHandler { store }; + let mut protocol = pmx_server::Connection::new(conn, handler)?; + Ok(Box::new(poll_fn(move || { + match protocol.main() { + Ok(_) => { + if protocol.eof() { + eprintln!("is eof!"); + } + Ok(Async::NotReady) + } + Err(e) => { + if let Some(e) = e.downcast_ref::() { + if e.kind() == std::io::ErrorKind::WouldBlock { + eprintln!("Got EWOULDBLOCK"); + return Ok(Async::NotReady); + } + } + // end the future + eprintln!("Backup protocol error: {}", e); + Err(()) + } + } + }))) +}