diff --git a/src/client/pull.rs b/src/client/pull.rs index f6f77c3d..0e8c2ace 100644 --- a/src/client/pull.rs +++ b/src/client/pull.rs @@ -6,6 +6,7 @@ use std::convert::TryFrom; use std::sync::{Arc, Mutex}; use std::collections::{HashSet, HashMap}; use std::io::{Seek, SeekFrom}; +use std::time::SystemTime; use proxmox::api::error::{StatusCode, HttpError}; use crate::{ @@ -21,8 +22,113 @@ use crate::{ // fixme: delete vanished groups // Todo: correctly lock backup groups +fn chunk_writer_pipeline( + target: Arc, +) -> ( + std::sync::mpsc::SyncSender<(DataBlob, [u8;32], u64)>, + std::sync::mpsc::Receiver>, +) { + let (writer_tx, writer_rx) = std::sync::mpsc::sync_channel(1); + let (result_tx, result_rx) = std::sync::mpsc::channel(); + let (pipe1_tx, pipe1_rx) = std::sync::mpsc::sync_channel(1); + let (pipe2_tx, pipe2_rx) = std::sync::mpsc::sync_channel(5); + + let pipe2_copy_tx = pipe2_tx.clone(); + let result_tx_copy1 = result_tx.clone(); + let result_tx_copy2 = result_tx.clone(); + + let thread1 = std::thread::Builder::new() + .name(String::from("sync chunk decompressor")) + .spawn(move|| { + let result: Result<(), Error> = proxmox::try_block!({ + loop { + let (chunk, digest, size): (DataBlob, [u8;32], u64) = match writer_rx.recv() { + Ok(input) => input, + Err(_err) => return Ok(()), + }; + + if chunk.is_encrypted() { + pipe2_copy_tx.send((chunk, digest)).unwrap(); + continue; + } + + let data = chunk.decode(None, None)?; + + if data.len() as u64 != size { + bail!("detected chunk with wrong length ({} != {})", size, data.len()); + } + + pipe1_tx.send((chunk, data, digest)).unwrap(); + } + }); + if let Err(err) = result { + result_tx_copy1.send(Err(err)).unwrap(); + } + }).unwrap(); + + let thread2 = std::thread::Builder::new() + .name(String::from("sync chunk verifier")) + .spawn(move|| { + let result: Result<(), Error> = proxmox::try_block!({ + loop { + let (chunk, data, digest): (DataBlob, Vec, [u8;32]) = match pipe1_rx.recv() { + Ok(input) => input, + Err(_err) => return Ok(()), + }; + + let data_digest = openssl::sha::sha256(&data); + if digest != data_digest { + bail!("detected chunk with wrong digest."); + } + pipe2_tx.send((chunk, digest)).unwrap(); + } + }); + if let Err(err) = result { + result_tx_copy2.send(Err(err)).unwrap(); + } + }).unwrap(); + + std::thread::Builder::new() + .name(String::from("sync chunk writer")) + .spawn(move|| { + let mut bytes = 0; + let result: Result<(), Error> = proxmox::try_block!({ + loop { + let (chunk, digest): (DataBlob, [u8;32]) = match pipe2_rx.recv() { + Ok(input) => input, + Err(_err) => break, + }; + bytes += chunk.raw_size() as usize; + target.insert_chunk(&chunk, &digest)?; + } + + if let Err(panic) = thread2.join() { + match panic.downcast::<&str>() { + Ok(panic_msg) => bail!("verification thread failed: {}", panic_msg), + Err(_) => bail!("verification thread failed"), + } + } + + if let Err(panic) = thread1.join() { + match panic.downcast::<&str>() { + Ok(panic_msg) => bail!("decompressor thread failed: {}", panic_msg), + Err(_) => bail!("decompressor thread failed"), + } + } + + Ok(()) + }); + match result { + Ok(()) => result_tx.send(Ok(bytes)).unwrap(), + Err(err) => result_tx.send(Err(err)).unwrap(), + } + }).unwrap(); + + (writer_tx, result_rx) +} + async fn pull_index_chunks( - _worker: &WorkerTask, + worker: &WorkerTask, chunk_reader: RemoteChunkReader, target: Arc, index: I, @@ -31,6 +137,8 @@ async fn pull_index_chunks( use futures::stream::{self, StreamExt, TryStreamExt}; + let start_time = SystemTime::now(); + let stream = stream::iter( (0..index.index_count()) .map(|pos| index.chunk_info(pos).unwrap()) @@ -46,11 +154,14 @@ async fn pull_index_chunks( }) ); - stream + let (write_channel, write_result_rx) = chunk_writer_pipeline(target.clone()); + + let result = stream .map(|info| { let target = Arc::clone(&target); let chunk_reader = chunk_reader.clone(); + let write_channel = write_channel.clone(); Ok::<_, Error>(async move { let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?; @@ -61,16 +172,31 @@ async fn pull_index_chunks( //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; - crate::tools::runtime::block_in_place(|| { - chunk.verify_unencrypted(info.size() as usize, &info.digest)?; - target.insert_chunk(&chunk, &info.digest)?; - Ok(()) - }) - }) + // decode, verify and write in a separate threads to maximize throughput + crate::tools::runtime::block_in_place(|| write_channel.send((chunk, info.digest, info.size())))?; + + Ok(()) + }) }) .try_buffer_unordered(20) .try_for_each(|_res| futures::future::ok(())) - .await?; + .await; + + drop(write_channel); + + // check errors from result channel first + let bytes = match write_result_rx.recv() { + Err(_) => bail!("result channel closed early."), + Ok(Ok(bytes)) => bytes, + Ok(Err(err)) => bail!("write chnunk failed - {}", err), + }; + + // then check result + result?; + + let elapsed = start_time.elapsed()?.as_secs_f64(); + + worker.log(format!("downloaded {} bytes ({} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed))); Ok(()) }