From cab681696988b430396adca060fdec769af7968d Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Thu, 29 Aug 2019 09:55:49 +0200 Subject: [PATCH] src/bin/test_chunk_speed2.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/bin/test_chunk_speed2.rs | 78 ++++++++++++++++++------------------ 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/src/bin/test_chunk_speed2.rs b/src/bin/test_chunk_speed2.rs index d9ed3488..28de6a95 100644 --- a/src/bin/test_chunk_speed2.rs +++ b/src/bin/test_chunk_speed2.rs @@ -1,6 +1,5 @@ use failure::*; use futures::*; -use std::sync::atomic::{AtomicUsize, Ordering}; extern crate proxmox_backup; @@ -13,42 +12,43 @@ use proxmox_backup::backup::*; // // Note: I can currently get about 830MB/s -fn main() { - - let repeat = std::sync::Arc::new(AtomicUsize::new(0)); - let repeat2 = repeat.clone(); - - let stream_len = std::sync::Arc::new(AtomicUsize::new(0)); - let stream_len2 = stream_len.clone(); - - let task = tokio::fs::File::open("random-test.dat") - .map_err(Error::from) - .and_then(move |file| { - let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) - .map(|bytes| bytes.to_vec()).map_err(Error::from); - //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); - let chunk_stream = ChunkStream::new(stream, None); - - let start_time = std::time::Instant::now(); - - chunk_stream - .for_each(move |chunk| { - if chunk.len() > 16*1024*1024 { panic!("Chunk too large {}", chunk.len()); } - repeat.fetch_add(1, Ordering::SeqCst); - stream_len.fetch_add(chunk.len(), Ordering::SeqCst); - println!("Got chunk {}", chunk.len()); - Ok(()) - }) - .and_then(move |_result| { - let repeat = repeat2.load(Ordering::SeqCst); - let stream_len = stream_len2.load(Ordering::SeqCst); - let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize); - println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed); - println!("Average chunk size was {} bytes.", stream_len/repeat); - println!("time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); - Ok(()) - }) - }); - - tokio::run(task.map_err(|err| { panic!("ERROR: {}", err); })); +#[tokio::main] +async fn main() { + if let Err(err) = run().await { + panic!("ERROR: {}", err); + } +} + +async fn run() -> Result<(), Error> { + + let file = tokio::fs::File::open("random-test.dat").await?; + + let stream = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) + .map_ok(|bytes| bytes.to_vec()) + .map_err(Error::from); + + //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); + let mut chunk_stream = ChunkStream::new(stream, None); + + let start_time = std::time::Instant::now(); + + let mut repeat = 0; + let mut stream_len = 0; + while let Some(chunk) = chunk_stream.try_next().await? { + if chunk.len() > 16*1024*1024 { + panic!("Chunk too large {}", chunk.len()); + } + + repeat += 1; + stream_len += chunk.len(); + + println!("Got chunk {}", chunk.len()); + } + + let speed = ((stream_len*1000000)/(1024*1024))/(start_time.elapsed().as_micros() as usize); + println!("Uploaded {} chunks in {} seconds ({} MB/s).", repeat, start_time.elapsed().as_secs(), speed); + println!("Average chunk size was {} bytes.", stream_len/repeat); + println!("time per request: {} microseconds.", (start_time.elapsed().as_micros())/(repeat as u128)); + + Ok(()) }