src/client/pull.rs: decode, verify and write in a separate threads

To maximize throughput.
This commit is contained in:
Dietmar Maurer 2020-09-24 12:58:53 +02:00
parent c0fa14d94a
commit 998db63933

View File

@ -6,6 +6,7 @@ use std::convert::TryFrom;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::collections::{HashSet, HashMap}; use std::collections::{HashSet, HashMap};
use std::io::{Seek, SeekFrom}; use std::io::{Seek, SeekFrom};
use std::time::SystemTime;
use proxmox::api::error::{StatusCode, HttpError}; use proxmox::api::error::{StatusCode, HttpError};
use crate::{ use crate::{
@ -21,8 +22,113 @@ use crate::{
// fixme: delete vanished groups // fixme: delete vanished groups
// Todo: correctly lock backup groups // Todo: correctly lock backup groups
fn chunk_writer_pipeline(
target: Arc<DataStore>,
) -> (
std::sync::mpsc::SyncSender<(DataBlob, [u8;32], u64)>,
std::sync::mpsc::Receiver<Result<usize, Error>>,
) {
let (writer_tx, writer_rx) = std::sync::mpsc::sync_channel(1);
let (result_tx, result_rx) = std::sync::mpsc::channel();
let (pipe1_tx, pipe1_rx) = std::sync::mpsc::sync_channel(1);
let (pipe2_tx, pipe2_rx) = std::sync::mpsc::sync_channel(5);
let pipe2_copy_tx = pipe2_tx.clone();
let result_tx_copy1 = result_tx.clone();
let result_tx_copy2 = result_tx.clone();
let thread1 = std::thread::Builder::new()
.name(String::from("sync chunk decompressor"))
.spawn(move|| {
let result: Result<(), Error> = proxmox::try_block!({
loop {
let (chunk, digest, size): (DataBlob, [u8;32], u64) = match writer_rx.recv() {
Ok(input) => input,
Err(_err) => return Ok(()),
};
if chunk.is_encrypted() {
pipe2_copy_tx.send((chunk, digest)).unwrap();
continue;
}
let data = chunk.decode(None, None)?;
if data.len() as u64 != size {
bail!("detected chunk with wrong length ({} != {})", size, data.len());
}
pipe1_tx.send((chunk, data, digest)).unwrap();
}
});
if let Err(err) = result {
result_tx_copy1.send(Err(err)).unwrap();
}
}).unwrap();
let thread2 = std::thread::Builder::new()
.name(String::from("sync chunk verifier"))
.spawn(move|| {
let result: Result<(), Error> = proxmox::try_block!({
loop {
let (chunk, data, digest): (DataBlob, Vec<u8>, [u8;32]) = match pipe1_rx.recv() {
Ok(input) => input,
Err(_err) => return Ok(()),
};
let data_digest = openssl::sha::sha256(&data);
if digest != data_digest {
bail!("detected chunk with wrong digest.");
}
pipe2_tx.send((chunk, digest)).unwrap();
}
});
if let Err(err) = result {
result_tx_copy2.send(Err(err)).unwrap();
}
}).unwrap();
std::thread::Builder::new()
.name(String::from("sync chunk writer"))
.spawn(move|| {
let mut bytes = 0;
let result: Result<(), Error> = proxmox::try_block!({
loop {
let (chunk, digest): (DataBlob, [u8;32]) = match pipe2_rx.recv() {
Ok(input) => input,
Err(_err) => break,
};
bytes += chunk.raw_size() as usize;
target.insert_chunk(&chunk, &digest)?;
}
if let Err(panic) = thread2.join() {
match panic.downcast::<&str>() {
Ok(panic_msg) => bail!("verification thread failed: {}", panic_msg),
Err(_) => bail!("verification thread failed"),
}
}
if let Err(panic) = thread1.join() {
match panic.downcast::<&str>() {
Ok(panic_msg) => bail!("decompressor thread failed: {}", panic_msg),
Err(_) => bail!("decompressor thread failed"),
}
}
Ok(())
});
match result {
Ok(()) => result_tx.send(Ok(bytes)).unwrap(),
Err(err) => result_tx.send(Err(err)).unwrap(),
}
}).unwrap();
(writer_tx, result_rx)
}
async fn pull_index_chunks<I: IndexFile>( async fn pull_index_chunks<I: IndexFile>(
_worker: &WorkerTask, worker: &WorkerTask,
chunk_reader: RemoteChunkReader, chunk_reader: RemoteChunkReader,
target: Arc<DataStore>, target: Arc<DataStore>,
index: I, index: I,
@ -31,6 +137,8 @@ async fn pull_index_chunks<I: IndexFile>(
use futures::stream::{self, StreamExt, TryStreamExt}; use futures::stream::{self, StreamExt, TryStreamExt};
let start_time = SystemTime::now();
let stream = stream::iter( let stream = stream::iter(
(0..index.index_count()) (0..index.index_count())
.map(|pos| index.chunk_info(pos).unwrap()) .map(|pos| index.chunk_info(pos).unwrap())
@ -46,11 +154,14 @@ async fn pull_index_chunks<I: IndexFile>(
}) })
); );
stream let (write_channel, write_result_rx) = chunk_writer_pipeline(target.clone());
let result = stream
.map(|info| { .map(|info| {
let target = Arc::clone(&target); let target = Arc::clone(&target);
let chunk_reader = chunk_reader.clone(); let chunk_reader = chunk_reader.clone();
let write_channel = write_channel.clone();
Ok::<_, Error>(async move { Ok::<_, Error>(async move {
let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?; let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
@ -61,16 +172,31 @@ async fn pull_index_chunks<I: IndexFile>(
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
crate::tools::runtime::block_in_place(|| { // decode, verify and write in a separate threads to maximize throughput
chunk.verify_unencrypted(info.size() as usize, &info.digest)?; crate::tools::runtime::block_in_place(|| write_channel.send((chunk, info.digest, info.size())))?;
target.insert_chunk(&chunk, &info.digest)?;
Ok(()) Ok(())
}) })
})
}) })
.try_buffer_unordered(20) .try_buffer_unordered(20)
.try_for_each(|_res| futures::future::ok(())) .try_for_each(|_res| futures::future::ok(()))
.await?; .await;
drop(write_channel);
// check errors from result channel first
let bytes = match write_result_rx.recv() {
Err(_) => bail!("result channel closed early."),
Ok(Ok(bytes)) => bytes,
Ok(Err(err)) => bail!("write chnunk failed - {}", err),
};
// then check result
result?;
let elapsed = start_time.elapsed()?.as_secs_f64();
worker.log(format!("downloaded {} bytes ({} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed)));
Ok(()) Ok(())
} }