src/bin/test_chunk_speed2.rs: switch to async

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-08-29 09:55:49 +02:00
parent fda5797b8a
commit cab6816969

View File

@ -1,6 +1,5 @@
use failure::*; use failure::*;
use futures::*; use futures::*;
use std::sync::atomic::{AtomicUsize, Ordering};
extern crate proxmox_backup; extern crate proxmox_backup;
@ -13,42 +12,43 @@ use proxmox_backup::backup::*;
// //
// Note: I can currently get about 830MB/s // Note: I can currently get about 830MB/s
fn main() { #[tokio::main]
async fn main() {
let repeat = std::sync::Arc::new(AtomicUsize::new(0)); if let Err(err) = run().await {
let repeat2 = repeat.clone(); panic!("ERROR: {}", err);
}
let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); }
let stream_len2 = stream_len.clone();
async fn run() -> Result<(), Error> {
let task = tokio::fs::File::open("random-test.dat")
.map_err(Error::from) let file = tokio::fs::File::open("random-test.dat").await?;
.and_then(move |file| {
let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new())
.map(|bytes| bytes.to_vec()).map_err(Error::from); .map_ok(|bytes| bytes.to_vec())
//let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); .map_err(Error::from);
let chunk_stream = ChunkStream::new(stream, None);
//let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
let start_time = std::time::Instant::now(); let mut chunk_stream = ChunkStream::new(stream, None);
chunk_stream let start_time = std::time::Instant::now();
.for_each(move |chunk| {
if chunk.len() > 16*1024*1024 { panic!("Chunk too large {}", chunk.len()); } let mut repeat = 0;
repeat.fetch_add(1, Ordering::SeqCst); let mut stream_len = 0;
stream_len.fetch_add(chunk.len(), Ordering::SeqCst); while let Some(chunk) = chunk_stream.try_next().await? {
println!("Got chunk {}", chunk.len()); if chunk.len() > 16*1024*1024 {
Ok(()) panic!("Chunk too large {}", chunk.len());
}) }
.and_then(move |_result| {
let repeat = repeat2.load(Ordering::SeqCst); repeat += 1;
let stream_len = stream_len2.load(Ordering::SeqCst); stream_len += chunk.len();
let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed); println!("Got chunk {}", chunk.len());
println!("Average chunk size was {} bytes.", stream_len/repeat); }
println!("time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
Ok(()) let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize);
}) println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed);
}); println!("Average chunk size was {} bytes.", stream_len/repeat);
println!("time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128));
tokio::run(task.map_err(|err| { panic!("ERROR: {}", err); }));
Ok(())
} }