diff --git a/src/api3/admin/datastore.rs b/src/api3/admin/datastore.rs index a3c865c1..175a9b8b 100644 --- a/src/api3/admin/datastore.rs +++ b/src/api3/admin/datastore.rs @@ -12,6 +12,8 @@ use crate::config::datastore; use crate::backup::datastore::*; +mod upload_catar; + // this is just a test for mutability/mutex handling - will remove later fn start_garbage_collection(param: Value, _info: &ApiMethod) -> Result { @@ -52,39 +54,6 @@ pub fn api_method_garbage_collection_status() -> ApiMethod { ) } -fn upload_catar(req_body: hyper::Body, param: Value, _info: &ApiUploadMethod) -> BoxFut { - - let name = param["name"].as_str().unwrap(); - - println!("Upload .catar to {}", name); - - let resp = req_body - .map_err(|err| http_err!(BAD_REQUEST, format!("Promlems reading request body: {}", err))) - .for_each(|chunk| { - println!("UPLOAD Chunk {}", chunk.len()); - Ok(()) - }) - .and_then(|()| { - println!("UPLOAD DATA Sucessful"); - - let response = http::Response::builder() - .status(200) - .body(hyper::Body::empty()) - .unwrap(); - - Ok(response) - }); - - Box::new(resp) -} - -fn api_method_upload_catar() -> ApiUploadMethod { - ApiUploadMethod::new( - upload_catar, - ObjectSchema::new("Upload .catar backup file.") - .required("name", StringSchema::new("Datastore name.")) - ) -} fn get_datastore_list(_param: Value, _info: &ApiMethod) -> Result { @@ -105,7 +74,7 @@ pub fn router() -> Router { ObjectSchema::new("Directory index.") .required("name", StringSchema::new("Datastore name."))) ) - .upload(api_method_upload_catar()) + .upload(upload_catar::api_method_upload_catar()) .subdir( "gc", Router::new() diff --git a/src/api3/admin/datastore/upload_catar.rs b/src/api3/admin/datastore/upload_catar.rs new file mode 100644 index 00000000..0b8422c1 --- /dev/null +++ b/src/api3/admin/datastore/upload_catar.rs @@ -0,0 +1,81 @@ +use failure::*; + +use crate::backup::datastore::*; +use crate::backup::archive_index::*; +use crate::server::rest::*; +use crate::api::schema::*; +use crate::api::router::*; + +use serde_json::{json, Value}; + +use std::sync::Arc; +use futures::*; + +pub struct UploadCaTar { + stream: hyper::Body, + index: ArchiveIndexWriter, + count: usize, +} + +impl Future for UploadCaTar { + type Item = (); + type Error = failure::Error; + + fn poll(&mut self) -> Poll<(), failure::Error> { + loop { + use std::io::Write; + + match try_ready!(self.stream.poll()) { + Some(chunk) => { + self.count += chunk.len(); + println!("UPLOAD Chunk {} {}", chunk.len(), self.count); + if let Err(err) = self.index.write(&chunk) { + bail!("writing chunk failed - {}", err); + } + return Ok(Async::NotReady); + } + None => { + println!("UPLOAD Close Index {}", self.count); + self.index.close(); + return Ok(Async::Ready(())) + } + } + } + } +} + +fn upload_catar(req_body: hyper::Body, param: Value, _info: &ApiUploadMethod) -> BoxFut { + + let store = param["name"].as_str().unwrap(); + + use std::io::Write; + + println!("Upload .catar to {}", store); + + let chunk_size = 4*1024*1024; + let datastore = DataStore::lookup_datastore(store).unwrap().clone(); + + let mut index = datastore.create_archive_writer("upload.aidx", chunk_size).unwrap(); + + let upload = UploadCaTar { stream: req_body, index, count: 0}; + + let resp = upload.and_then(|v| { + + let response = http::Response::builder() + .status(200) + .body(hyper::Body::empty()) + .unwrap(); + + Ok(response) + }); + + Box::new(resp) +} + +pub fn api_method_upload_catar() -> ApiUploadMethod { + ApiUploadMethod::new( + upload_catar, + ObjectSchema::new("Upload .catar backup file.") + .required("name", StringSchema::new("Datastore name.")) + ) +} diff --git a/src/backup/archive_index.rs b/src/backup/archive_index.rs index 1e8918e5..02c162f5 100644 --- a/src/backup/archive_index.rs +++ b/src/backup/archive_index.rs @@ -3,6 +3,7 @@ use failure::*; use super::chunk_store::*; use super::chunker::*; +use std::sync::Arc; use std::io::{Read, Write, BufWriter}; use std::fs::File; use std::path::{Path, PathBuf}; @@ -312,8 +313,8 @@ impl <'a> std::io::Seek for BufferedArchiveReader<'a> { } } -pub struct ArchiveIndexWriter<'a> { - store: &'a ChunkStore, +pub struct ArchiveIndexWriter { + store: Arc, chunker: Chunker, writer: BufWriter, closed: bool, @@ -327,9 +328,16 @@ pub struct ArchiveIndexWriter<'a> { chunk_buffer: Vec, } -impl <'a> ArchiveIndexWriter<'a> { +impl Drop for ArchiveIndexWriter { - pub fn create(store: &'a ChunkStore, path: &Path, chunk_size: usize) -> Result { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors + } +} + +impl ArchiveIndexWriter { + + pub fn create(store: Arc, path: &Path, chunk_size: usize) -> Result { let full_path = store.relative_path(path); let mut tmp_path = full_path.clone(); @@ -433,7 +441,7 @@ impl <'a> ArchiveIndexWriter<'a> { } } -impl <'a> Write for ArchiveIndexWriter<'a> { +impl Write for ArchiveIndexWriter { fn write(&mut self, data: &[u8]) -> std::result::Result { diff --git a/src/backup/datastore.rs b/src/backup/datastore.rs index 823391eb..6548abb5 100644 --- a/src/backup/datastore.rs +++ b/src/backup/datastore.rs @@ -11,7 +11,7 @@ use super::image_index::*; use super::archive_index::*; pub struct DataStore { - chunk_store: ChunkStore, + chunk_store: Arc, gc_mutex: Mutex, } @@ -58,7 +58,7 @@ impl DataStore { let chunk_store = ChunkStore::open(store_name, path)?; Ok(Self { - chunk_store: chunk_store, + chunk_store: Arc::new(chunk_store), gc_mutex: Mutex::new(false), }) } @@ -82,11 +82,12 @@ impl DataStore { chunk_size: usize ) -> Result { - let index = ArchiveIndexWriter::create(&self.chunk_store, filename.as_ref(), chunk_size)?; + let index = ArchiveIndexWriter::create( + self.chunk_store.clone(), filename.as_ref(), chunk_size)?; Ok(index) } - + pub fn open_archive_reader>(&self, filename: P) -> Result { let index = ArchiveIndexReader::open(&self.chunk_store, filename.as_ref())?; diff --git a/src/server/rest.rs b/src/server/rest.rs index 5d3f23c3..d119658d 100644 --- a/src/server/rest.rs +++ b/src/server/rest.rs @@ -180,8 +180,8 @@ fn handle_sync_api_request( fn handle_upload_api_request( info: &'static ApiUploadMethod, - formatter: &'static OutputFormatter, - parts: Parts, + _formatter: &'static OutputFormatter, + _parts: Parts, req_body: Body, uri_param: HashMap, ) -> BoxFut