src/client/pull.rs: use new ParallelHandler

This commit is contained in:
Dietmar Maurer 2020-09-25 12:14:59 +02:00
parent 3c9b370255
commit 5441708634

View File

@ -7,10 +7,11 @@ use std::sync::{Arc, Mutex};
use std::collections::{HashSet, HashMap};
use std::io::{Seek, SeekFrom};
use std::time::SystemTime;
use std::sync::atomic::{AtomicUsize, Ordering};
use proxmox::api::error::{StatusCode, HttpError};
use crate::{
tools::compute_file_csum,
tools::{ParallelHandler, compute_file_csum},
server::WorkerTask,
backup::*,
api2::types::*,
@ -22,111 +23,6 @@ use crate::{
// fixme: delete vanished groups
// Todo: correctly lock backup groups
fn chunk_writer_pipeline(
target: Arc<DataStore>,
) -> (
std::sync::mpsc::SyncSender<(DataBlob, [u8;32], u64)>,
std::sync::mpsc::Receiver<Result<usize, Error>>,
) {
let (writer_tx, writer_rx) = std::sync::mpsc::sync_channel(1);
let (result_tx, result_rx) = std::sync::mpsc::channel();
let (pipe1_tx, pipe1_rx) = std::sync::mpsc::sync_channel(1);
let (pipe2_tx, pipe2_rx) = std::sync::mpsc::sync_channel(5);
let pipe2_copy_tx = pipe2_tx.clone();
let result_tx_copy1 = result_tx.clone();
let result_tx_copy2 = result_tx.clone();
let thread1 = std::thread::Builder::new()
.name(String::from("sync chunk decompressor"))
.spawn(move|| {
let result: Result<(), Error> = proxmox::try_block!({
loop {
let (chunk, digest, size): (DataBlob, [u8;32], u64) = match writer_rx.recv() {
Ok(input) => input,
Err(_err) => return Ok(()),
};
if chunk.is_encrypted() {
pipe2_copy_tx.send((chunk, digest)).unwrap();
continue;
}
let data = chunk.decode(None, None)?;
if data.len() as u64 != size {
bail!("detected chunk with wrong length ({} != {})", size, data.len());
}
pipe1_tx.send((chunk, data, digest)).unwrap();
}
});
if let Err(err) = result {
result_tx_copy1.send(Err(err)).unwrap();
}
}).unwrap();
let thread2 = std::thread::Builder::new()
.name(String::from("sync chunk verifier"))
.spawn(move|| {
let result: Result<(), Error> = proxmox::try_block!({
loop {
let (chunk, data, digest): (DataBlob, Vec<u8>, [u8;32]) = match pipe1_rx.recv() {
Ok(input) => input,
Err(_err) => return Ok(()),
};
let data_digest = openssl::sha::sha256(&data);
if digest != data_digest {
bail!("detected chunk with wrong digest.");
}
pipe2_tx.send((chunk, digest)).unwrap();
}
});
if let Err(err) = result {
result_tx_copy2.send(Err(err)).unwrap();
}
}).unwrap();
std::thread::Builder::new()
.name(String::from("sync chunk writer"))
.spawn(move|| {
let mut bytes = 0;
let result: Result<(), Error> = proxmox::try_block!({
loop {
let (chunk, digest): (DataBlob, [u8;32]) = match pipe2_rx.recv() {
Ok(input) => input,
Err(_err) => break,
};
bytes += chunk.raw_size() as usize;
target.insert_chunk(&chunk, &digest)?;
}
if let Err(panic) = thread2.join() {
match panic.downcast::<&str>() {
Ok(panic_msg) => bail!("verification thread failed: {}", panic_msg),
Err(_) => bail!("verification thread failed"),
}
}
if let Err(panic) = thread1.join() {
match panic.downcast::<&str>() {
Ok(panic_msg) => bail!("decompressor thread failed: {}", panic_msg),
Err(_) => bail!("decompressor thread failed"),
}
}
Ok(())
});
match result {
Ok(()) => result_tx.send(Ok(bytes)).unwrap(),
Err(err) => result_tx.send(Err(err)).unwrap(),
}
}).unwrap();
(writer_tx, result_rx)
}
async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask,
chunk_reader: RemoteChunkReader,
@ -154,14 +50,28 @@ async fn pull_index_chunks<I: IndexFile>(
})
);
let (write_channel, write_result_rx) = chunk_writer_pipeline(target.clone());
let target2 = target.clone();
let verify_pool = ParallelHandler::new(
"sync chunk writer", 4,
move |(chunk, digest, size): (DataBlob, [u8;32], u64)| {
// println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
chunk.verify_unencrypted(size as usize, &digest)?;
target2.insert_chunk(&chunk, &digest)?;
Ok(())
}
);
let result = stream
let verify_and_write_channel = verify_pool.channel();
let bytes = Arc::new(AtomicUsize::new(0));
stream
.map(|info| {
let target = Arc::clone(&target);
let chunk_reader = chunk_reader.clone();
let write_channel = write_channel.clone();
let bytes = Arc::clone(&bytes);
let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move {
let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
@ -171,31 +81,28 @@ async fn pull_index_chunks<I: IndexFile>(
}
//worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest)));
let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
let raw_size = chunk.raw_size() as usize;
// decode, verify and write in a separate threads to maximize throughput
crate::tools::runtime::block_in_place(|| write_channel.send((chunk, info.digest, info.size())))?;
crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?;
bytes.fetch_add(raw_size, Ordering::SeqCst);
Ok(())
})
})
.try_buffer_unordered(20)
.try_for_each(|_res| futures::future::ok(()))
.await;
.await?;
drop(write_channel);
drop(verify_and_write_channel);
// check errors from result channel first
let bytes = match write_result_rx.recv() {
Err(_) => bail!("result channel closed early."),
Ok(Ok(bytes)) => bytes,
Ok(Err(err)) => bail!("write chnunk failed - {}", err),
};
// then check result
result?;
verify_pool.complete()?;
let elapsed = start_time.elapsed()?.as_secs_f64();
let bytes = bytes.load(Ordering::SeqCst);
worker.log(format!("downloaded {} bytes ({} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed)));
Ok(())