diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 7f97c08f..23795592 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -19,7 +19,6 @@ use crate::backup::*; use crate::server::WorkerTask; mod pxar; -mod upload; fn group_backups(backup_list: Vec) -> HashMap> { diff --git a/src/api2/admin/datastore/upload.rs b/src/api2/admin/datastore/upload.rs deleted file mode 100644 index 58bf26bb..00000000 --- a/src/api2/admin/datastore/upload.rs +++ /dev/null @@ -1,252 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; - -use failure::*; -use futures::future::{ok, poll_fn}; -use futures::{Async, Future}; -use hyper::header::{HeaderValue, UPGRADE}; -use hyper::http::request::Parts; -use hyper::rt; -use hyper::{Body, Response, StatusCode}; -use serde_json::Value; - -use proxmox_protocol::protocol::DynamicChunk; -use proxmox_protocol::server as pmx_server; -use proxmox_protocol::{ChunkEntry, FixedChunk}; - -use crate::api_schema::router::*; -use crate::api_schema::*; -use crate::backup::{BackupDir, DataStore, DynamicIndexWriter, FixedIndexWriter, IndexFile}; -use crate::tools; - -type Result = std::result::Result; - -pub fn api_method_upgrade_upload() -> ApiAsyncMethod { - ApiAsyncMethod::new( - upgrade_upload, - ObjectSchema::new("Download .pxar backup file.") - .required("store", StringSchema::new("Datastore name.")), - ) -} - -fn upgrade_upload( - parts: Parts, - req_body: Body, - param: Value, - _info: &ApiAsyncMethod, - _rpcenv: Box, -) -> Result { - let store = tools::required_string_param(¶m, "store")?.to_string(); - let expected_protocol: &'static str = "proxmox-backup-protocol-1"; - - let protocols = parts - .headers - .get("UPGRADE") - .ok_or_else(|| format_err!("missing Upgrade header"))? - .to_str()?; - - if protocols != expected_protocol { - bail!("invalid protocol name"); - } - - rt::spawn( - req_body - .on_upgrade() - .map_err(|e| Error::from(e)) - .and_then(move |conn| backup_protocol_handler(conn, &store)) - .map_err(|e| eprintln!("error during upgrade: {}", e)) - .flatten(), - ); - - Ok(Box::new(ok(Response::builder() - .status(StatusCode::SWITCHING_PROTOCOLS) - .header(UPGRADE, HeaderValue::from_static(expected_protocol)) - .body(Body::empty()) - .unwrap()))) -} - -struct BackupClientHandler { - store: Arc, -} - -struct ChunkLister(Box, usize); - -impl pmx_server::ChunkList for ChunkLister { - fn next(&mut self) -> Result> { - if self.1 == self.0.index_count() { - Ok(None) - } else { - let chunk = self.0.index_digest(self.1); - self.1 += 1; - Ok(chunk) - } - } -} - -impl pmx_server::HandleClient for BackupClientHandler { - fn error(&self) { - eprintln!("There was an error!"); - } - - fn get_chunk_list( - &self, - backup_name: &str, - ) -> Result> { - Ok(Box::new(ChunkLister(self.store.open_index(backup_name)?, 0))) - } - - fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result { - let (new, _csize) = self.store.insert_chunk_noverify(&chunk.hash, data)?; - Ok(new) - } - - fn create_backup( - &self, - backup_type: &str, - backup_id: &str, - backup_timestamp: i64, - new: bool, - ) -> Result> { - let (path, is_new) = self.store.create_backup_dir( - &BackupDir::new(backup_type, backup_id, backup_timestamp) - )?; - - if new && !is_new { - bail!("client requested to create a new backup, but it already existed"); - } - - Ok(Box::new(BackupHandler { - store: Arc::clone(&self.store), - path, - })) - } -} - -struct BackupHandler { - store: Arc, - path: PathBuf, -} - -impl pmx_server::HandleBackup for BackupHandler { - fn finish(&mut self) -> Result<()> { - bail!("TODO: finish"); - } - - fn create_file( - &self, - name: &str, - fixed_size: Option, - chunk_size: usize, - ) -> Result> { - if name.find('/').is_some() { - bail!("invalid file name"); - } - - let mut path_str = self.path - .to_str() - .ok_or_else(|| format_err!("generated non-utf8 path"))? - .to_string(); - path_str.push('/'); - path_str.push_str(name); - - match fixed_size { - None => { - path_str.push_str(".didx"); - let path = PathBuf::from(path_str.as_str()); - let writer = self.store.create_dynamic_writer(path)?; - Ok(Box::new(DynamicFile { - writer: Some(writer), - path: path_str, - })) - } - Some(file_size) => { - path_str.push_str(".fidx"); - let path = PathBuf::from(path_str.as_str()); - let writer = self.store.create_fixed_writer(path, file_size as usize, chunk_size)?; - Ok(Box::new(FixedFile { - writer: Some(writer), - path: path_str, - })) - } - } - } -} - -struct DynamicFile { - writer: Option, - path: String, -} - -impl pmx_server::BackupFile for DynamicFile { - fn relative_path(&self) -> &str { - self.path.as_str() - } - - fn add_fixed_data(&mut self, _index: u64, _hash: &FixedChunk) -> Result<()> { - bail!("add_fixed_data data on dynamic index writer!"); - } - - fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()> { - self.writer.as_mut().unwrap() - .add_chunk(chunk.offset, &chunk.digest) - .map_err(Error::from) - } - - fn finish(&mut self) -> Result<()> { - self.writer.take().unwrap().close() - } -} - -struct FixedFile { - writer: Option, - path: String, -} - -impl pmx_server::BackupFile for FixedFile { - fn relative_path(&self) -> &str { - self.path.as_str() - } - - fn add_fixed_data(&mut self, index: u64, hash: &FixedChunk) -> Result<()> { - self.writer.as_mut().unwrap() - .add_digest(index as usize, &hash.0) - } - - fn add_dynamic_data(&mut self, _chunk: &DynamicChunk) -> Result<()> { - bail!("add_dynamic_data data on fixed index writer!"); - } - - fn finish(&mut self) -> Result<()> { - self.writer.take().unwrap().close() - } -} - -fn backup_protocol_handler( - conn: hyper::upgrade::Upgraded, - store_name: &str, -) -> Result + Send>> { - let store = DataStore::lookup_datastore(store_name)?; - let handler = BackupClientHandler { store }; - let mut protocol = pmx_server::Connection::new(conn, handler)?; - Ok(Box::new(poll_fn(move || { - match protocol.main() { - Ok(_) => { - if protocol.eof() { - eprintln!("is eof!"); - } - Ok(Async::NotReady) - } - Err(e) => { - if let Some(e) = e.downcast_ref::() { - if e.kind() == std::io::ErrorKind::WouldBlock { - eprintln!("Got EWOULDBLOCK"); - return Ok(Async::NotReady); - } - } - // end the future - eprintln!("Backup protocol error: {}", e); - Err(()) - } - } - }))) -}