src/api2/backup/upload_chunk.rs: switch to async
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
parent
59b2baa0f6
commit
7622005574
@ -1,16 +1,18 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use failure::*;
|
use failure::*;
|
||||||
use futures::*;
|
use futures::*;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use hyper::http::request::Parts;
|
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
|
use hyper::http::request::Parts;
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
|
|
||||||
use crate::tools;
|
use crate::api2::types::*;
|
||||||
use crate::backup::*;
|
|
||||||
use crate::api_schema::*;
|
use crate::api_schema::*;
|
||||||
use crate::api_schema::router::*;
|
use crate::api_schema::router::*;
|
||||||
use crate::api2::types::*;
|
use crate::backup::*;
|
||||||
|
use crate::tools;
|
||||||
|
|
||||||
use super::environment::*;
|
use super::environment::*;
|
||||||
|
|
||||||
@ -24,51 +26,58 @@ pub struct UploadChunk {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl UploadChunk {
|
impl UploadChunk {
|
||||||
|
|
||||||
pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
|
pub fn new(stream: Body, store: Arc<DataStore>, digest: [u8; 32], size: u32, encoded_size: u32) -> Self {
|
||||||
Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
|
Self { stream, store, size, encoded_size, raw_data: Some(vec![]), digest }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for UploadChunk {
|
impl Future for UploadChunk {
|
||||||
type Item = ([u8; 32], u32, u32, bool);
|
type Output = Result<([u8; 32], u32, u32, bool), Error>;
|
||||||
type Error = failure::Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<([u8; 32], u32, u32, bool), failure::Error> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
loop {
|
let this = self.get_mut();
|
||||||
match try_ready!(self.stream.poll()) {
|
|
||||||
Some(input) => {
|
let err: Error = loop {
|
||||||
if let Some(ref mut raw_data) = self.raw_data {
|
match ready!(Pin::new(&mut this.stream).poll_next(cx)) {
|
||||||
if (raw_data.len() + input.len()) > (self.encoded_size as usize) {
|
Some(Err(err)) => return Poll::Ready(Err(Error::from(err))),
|
||||||
bail!("uploaded chunk is larger than announced.");
|
Some(Ok(input)) => {
|
||||||
|
if let Some(ref mut raw_data) = this.raw_data {
|
||||||
|
if (raw_data.len() + input.len()) > (this.encoded_size as usize) {
|
||||||
|
break format_err!("uploaded chunk is larger than announced.");
|
||||||
}
|
}
|
||||||
raw_data.extend_from_slice(&input);
|
raw_data.extend_from_slice(&input);
|
||||||
} else {
|
} else {
|
||||||
bail!("poll upload chunk stream failed - already finished.");
|
break format_err!("poll upload chunk stream failed - already finished.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
if let Some(raw_data) = self.raw_data.take() {
|
if let Some(raw_data) = this.raw_data.take() {
|
||||||
if raw_data.len() != (self.encoded_size as usize) {
|
if raw_data.len() != (this.encoded_size as usize) {
|
||||||
bail!("uploaded chunk has unexpected size.");
|
break format_err!("uploaded chunk has unexpected size.");
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut chunk = DataChunk::from_raw(raw_data, self.digest)?;
|
let (is_duplicate, compressed_size) = match proxmox::tools::try_block! {
|
||||||
|
let mut chunk = DataChunk::from_raw(raw_data, this.digest)?;
|
||||||
|
|
||||||
chunk.verify_unencrypted(self.size as usize)?;
|
chunk.verify_unencrypted(this.size as usize)?;
|
||||||
|
|
||||||
// always comput CRC at server side
|
// always comput CRC at server side
|
||||||
chunk.set_crc(chunk.compute_crc());
|
chunk.set_crc(chunk.compute_crc());
|
||||||
|
|
||||||
let (is_duplicate, compressed_size) = self.store.insert_chunk(&chunk)?;
|
this.store.insert_chunk(&chunk)
|
||||||
|
} {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(err) => break err,
|
||||||
|
};
|
||||||
|
|
||||||
return Ok(Async::Ready((self.digest, self.size, compressed_size as u32, is_duplicate)))
|
return Poll::Ready(Ok((this.digest, this.size, compressed_size as u32, is_duplicate)))
|
||||||
} else {
|
} else {
|
||||||
bail!("poll upload chunk stream failed - already finished.");
|
break format_err!("poll upload chunk stream failed - already finished.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
Poll::Ready(Err(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,14 +124,14 @@ fn upload_fixed_chunk(
|
|||||||
.then(move |result| {
|
.then(move |result| {
|
||||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||||
|
|
||||||
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
|
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
|
||||||
env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
|
env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
|
||||||
let digest_str = proxmox::tools::digest_to_hex(&digest);
|
let digest_str = proxmox::tools::digest_to_hex(&digest);
|
||||||
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
|
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
|
||||||
Ok(json!(digest_str))
|
Ok(json!(digest_str))
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(env.format_response(result))
|
future::ok(env.format_response(result))
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(Box::new(resp))
|
Ok(Box::new(resp))
|
||||||
@ -171,14 +180,14 @@ fn upload_dynamic_chunk(
|
|||||||
.then(move |result| {
|
.then(move |result| {
|
||||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||||
|
|
||||||
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
|
let result = result.and_then(|(digest, size, compressed_size, is_duplicate)| {
|
||||||
env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
|
env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
|
||||||
let digest_str = proxmox::tools::digest_to_hex(&digest);
|
let digest_str = proxmox::tools::digest_to_hex(&digest);
|
||||||
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
|
env.debug(format!("upload_chunk done: {} bytes, {}", size, digest_str));
|
||||||
Ok(json!(digest_str))
|
Ok(json!(digest_str))
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(env.format_response(result))
|
future::ok(env.format_response(result))
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(Box::new(resp))
|
Ok(Box::new(resp))
|
||||||
@ -201,10 +210,10 @@ fn upload_speedtest(
|
|||||||
|
|
||||||
let resp = req_body
|
let resp = req_body
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.fold(0, |size: usize, chunk| -> Result<usize, Error> {
|
.try_fold(0, |size: usize, chunk| {
|
||||||
let sum = size + chunk.len();
|
let sum = size + chunk.len();
|
||||||
//println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
|
//println!("UPLOAD {} bytes, sum {}", chunk.len(), sum);
|
||||||
Ok(sum)
|
future::ok::<usize, Error>(sum)
|
||||||
})
|
})
|
||||||
.then(move |result| {
|
.then(move |result| {
|
||||||
match result {
|
match result {
|
||||||
@ -216,7 +225,7 @@ fn upload_speedtest(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
let env: &BackupEnvironment = rpcenv.as_ref();
|
let env: &BackupEnvironment = rpcenv.as_ref();
|
||||||
Ok(env.format_response(Ok(Value::Null)))
|
future::ok(env.format_response(Ok(Value::Null)))
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(Box::new(resp))
|
Ok(Box::new(resp))
|
||||||
@ -257,11 +266,11 @@ fn upload_blob(
|
|||||||
|
|
||||||
let resp = req_body
|
let resp = req_body
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.fold(Vec::new(), |mut acc, chunk| {
|
.try_fold(Vec::new(), |mut acc, chunk| {
|
||||||
acc.extend_from_slice(&*chunk);
|
acc.extend_from_slice(&*chunk);
|
||||||
Ok::<_, Error>(acc)
|
future::ok::<_, Error>(acc)
|
||||||
})
|
})
|
||||||
.and_then(move |data| {
|
.and_then(move |data| async move {
|
||||||
if encoded_size != data.len() {
|
if encoded_size != data.len() {
|
||||||
bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
|
bail!("got blob with unexpected length ({} != {})", encoded_size, data.len());
|
||||||
}
|
}
|
||||||
@ -271,7 +280,7 @@ fn upload_blob(
|
|||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.and_then(move |_| {
|
.and_then(move |_| {
|
||||||
Ok(env3.format_response(Ok(Value::Null)))
|
future::ok(env3.format_response(Ok(Value::Null)))
|
||||||
})
|
})
|
||||||
;
|
;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user