api3/admin/datastore/upload_catar.rs: implement upload future

This commit is contained in:
Dietmar Maurer 2019-01-15 11:38:26 +01:00
parent 7e21da6e23
commit 1629d2ad7b
5 changed files with 104 additions and 45 deletions

View File

@ -12,6 +12,8 @@ use crate::config::datastore;
use crate::backup::datastore::*; use crate::backup::datastore::*;
mod upload_catar;
// this is just a test for mutability/mutex handling - will remove later // this is just a test for mutability/mutex handling - will remove later
fn start_garbage_collection(param: Value, _info: &ApiMethod) -> Result<Value, Error> { fn start_garbage_collection(param: Value, _info: &ApiMethod) -> Result<Value, Error> {
@ -52,39 +54,6 @@ pub fn api_method_garbage_collection_status() -> ApiMethod {
) )
} }
fn upload_catar(req_body: hyper::Body, param: Value, _info: &ApiUploadMethod) -> BoxFut {
let name = param["name"].as_str().unwrap();
println!("Upload .catar to {}", name);
let resp = req_body
.map_err(|err| http_err!(BAD_REQUEST, format!("Promlems reading request body: {}", err)))
.for_each(|chunk| {
println!("UPLOAD Chunk {}", chunk.len());
Ok(())
})
.and_then(|()| {
println!("UPLOAD DATA Sucessful");
let response = http::Response::builder()
.status(200)
.body(hyper::Body::empty())
.unwrap();
Ok(response)
});
Box::new(resp)
}
fn api_method_upload_catar() -> ApiUploadMethod {
ApiUploadMethod::new(
upload_catar,
ObjectSchema::new("Upload .catar backup file.")
.required("name", StringSchema::new("Datastore name."))
)
}
fn get_datastore_list(_param: Value, _info: &ApiMethod) -> Result<Value, Error> { fn get_datastore_list(_param: Value, _info: &ApiMethod) -> Result<Value, Error> {
@ -105,7 +74,7 @@ pub fn router() -> Router {
ObjectSchema::new("Directory index.") ObjectSchema::new("Directory index.")
.required("name", StringSchema::new("Datastore name."))) .required("name", StringSchema::new("Datastore name.")))
) )
.upload(api_method_upload_catar()) .upload(upload_catar::api_method_upload_catar())
.subdir( .subdir(
"gc", "gc",
Router::new() Router::new()

View File

@ -0,0 +1,81 @@
use failure::*;
use crate::backup::datastore::*;
use crate::backup::archive_index::*;
use crate::server::rest::*;
use crate::api::schema::*;
use crate::api::router::*;
use serde_json::{json, Value};
use std::sync::Arc;
use futures::*;
pub struct UploadCaTar {
stream: hyper::Body,
index: ArchiveIndexWriter,
count: usize,
}
impl Future for UploadCaTar {
type Item = ();
type Error = failure::Error;
fn poll(&mut self) -> Poll<(), failure::Error> {
loop {
use std::io::Write;
match try_ready!(self.stream.poll()) {
Some(chunk) => {
self.count += chunk.len();
println!("UPLOAD Chunk {} {}", chunk.len(), self.count);
if let Err(err) = self.index.write(&chunk) {
bail!("writing chunk failed - {}", err);
}
return Ok(Async::NotReady);
}
None => {
println!("UPLOAD Close Index {}", self.count);
self.index.close();
return Ok(Async::Ready(()))
}
}
}
}
}
fn upload_catar(req_body: hyper::Body, param: Value, _info: &ApiUploadMethod) -> BoxFut {
let store = param["name"].as_str().unwrap();
use std::io::Write;
println!("Upload .catar to {}", store);
let chunk_size = 4*1024*1024;
let datastore = DataStore::lookup_datastore(store).unwrap().clone();
let mut index = datastore.create_archive_writer("upload.aidx", chunk_size).unwrap();
let upload = UploadCaTar { stream: req_body, index, count: 0};
let resp = upload.and_then(|v| {
let response = http::Response::builder()
.status(200)
.body(hyper::Body::empty())
.unwrap();
Ok(response)
});
Box::new(resp)
}
pub fn api_method_upload_catar() -> ApiUploadMethod {
ApiUploadMethod::new(
upload_catar,
ObjectSchema::new("Upload .catar backup file.")
.required("name", StringSchema::new("Datastore name."))
)
}

View File

@ -3,6 +3,7 @@ use failure::*;
use super::chunk_store::*; use super::chunk_store::*;
use super::chunker::*; use super::chunker::*;
use std::sync::Arc;
use std::io::{Read, Write, BufWriter}; use std::io::{Read, Write, BufWriter};
use std::fs::File; use std::fs::File;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -312,8 +313,8 @@ impl <'a> std::io::Seek for BufferedArchiveReader<'a> {
} }
} }
pub struct ArchiveIndexWriter<'a> { pub struct ArchiveIndexWriter {
store: &'a ChunkStore, store: Arc<ChunkStore>,
chunker: Chunker, chunker: Chunker,
writer: BufWriter<File>, writer: BufWriter<File>,
closed: bool, closed: bool,
@ -327,9 +328,16 @@ pub struct ArchiveIndexWriter<'a> {
chunk_buffer: Vec<u8>, chunk_buffer: Vec<u8>,
} }
impl <'a> ArchiveIndexWriter<'a> { impl Drop for ArchiveIndexWriter {
pub fn create(store: &'a ChunkStore, path: &Path, chunk_size: usize) -> Result<Self, Error> { fn drop(&mut self) {
let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
}
}
impl ArchiveIndexWriter {
pub fn create(store: Arc<ChunkStore>, path: &Path, chunk_size: usize) -> Result<Self, Error> {
let full_path = store.relative_path(path); let full_path = store.relative_path(path);
let mut tmp_path = full_path.clone(); let mut tmp_path = full_path.clone();
@ -433,7 +441,7 @@ impl <'a> ArchiveIndexWriter<'a> {
} }
} }
impl <'a> Write for ArchiveIndexWriter<'a> { impl Write for ArchiveIndexWriter {
fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> { fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {

View File

@ -11,7 +11,7 @@ use super::image_index::*;
use super::archive_index::*; use super::archive_index::*;
pub struct DataStore { pub struct DataStore {
chunk_store: ChunkStore, chunk_store: Arc<ChunkStore>,
gc_mutex: Mutex<bool>, gc_mutex: Mutex<bool>,
} }
@ -58,7 +58,7 @@ impl DataStore {
let chunk_store = ChunkStore::open(store_name, path)?; let chunk_store = ChunkStore::open(store_name, path)?;
Ok(Self { Ok(Self {
chunk_store: chunk_store, chunk_store: Arc::new(chunk_store),
gc_mutex: Mutex::new(false), gc_mutex: Mutex::new(false),
}) })
} }
@ -82,11 +82,12 @@ impl DataStore {
chunk_size: usize chunk_size: usize
) -> Result<ArchiveIndexWriter, Error> { ) -> Result<ArchiveIndexWriter, Error> {
let index = ArchiveIndexWriter::create(&self.chunk_store, filename.as_ref(), chunk_size)?; let index = ArchiveIndexWriter::create(
self.chunk_store.clone(), filename.as_ref(), chunk_size)?;
Ok(index) Ok(index)
} }
pub fn open_archive_reader<P: AsRef<Path>>(&self, filename: P) -> Result<ArchiveIndexReader, Error> { pub fn open_archive_reader<P: AsRef<Path>>(&self, filename: P) -> Result<ArchiveIndexReader, Error> {
let index = ArchiveIndexReader::open(&self.chunk_store, filename.as_ref())?; let index = ArchiveIndexReader::open(&self.chunk_store, filename.as_ref())?;

View File

@ -180,8 +180,8 @@ fn handle_sync_api_request(
fn handle_upload_api_request( fn handle_upload_api_request(
info: &'static ApiUploadMethod, info: &'static ApiUploadMethod,
formatter: &'static OutputFormatter, _formatter: &'static OutputFormatter,
parts: Parts, _parts: Parts,
req_body: Body, req_body: Body,
uri_param: HashMap<String, String>, uri_param: HashMap<String, String>,
) -> BoxFut ) -> BoxFut