use failure::*; use futures::*; use bytes::{Bytes, BytesMut}; use std::collections::HashMap; /// Trait to get digest list from index files /// /// To allow easy iteration over all used chunks. pub trait IndexFile: Send { fn index_count(&self) -> usize; fn index_digest(&self, pos: usize) -> Option<&[u8; 32]>; fn index_bytes(&self) -> u64; /// Returns most often used chunks fn find_most_used_chunks(&self, max: usize) -> HashMap<[u8; 32], usize> { let mut map = HashMap::new(); for pos in 0..self.index_count() { let digest = self.index_digest(pos).unwrap(); let count = map.entry(*digest).or_insert(0); *count += 1; } let mut most_used = Vec::new(); for (digest, count) in map { if count <= 1 { continue; } match most_used.binary_search_by_key(&count, |&(_digest, count)| count) { Ok(p) => most_used.insert(p, (digest, count)), Err(p) => most_used.insert(p, (digest, count)), } if most_used.len() > max { let _ = most_used.pop(); } } let mut map = HashMap::new(); for data in most_used { map.insert(data.0, data.1); } map } } /// Encode digest list from an `IndexFile` into a binary stream /// /// The reader simply returns a birary stream of 32 byte digest values. pub struct DigestListEncoder { index: Box, pos: usize, count: usize, } impl DigestListEncoder { pub fn new(index: Box) -> Self { let count = index.index_count(); Self { index, pos: 0, count } } } impl std::io::Read for DigestListEncoder { fn read(&mut self, buf: &mut [u8]) -> Result { if buf.len() < 32 { panic!("read buffer too small"); } if self.pos < self.count { let mut written = 0; loop { let digest = self.index.index_digest(self.pos).unwrap(); unsafe { std::ptr::copy_nonoverlapping(digest.as_ptr(), buf.as_mut_ptr().add(written), 32); } self.pos += 1; written += 32; if self.pos >= self.count { break; } if (written + 32) >= buf.len() { break; } } return Ok(written); } else { return Ok(0); } } } /// Decodes a Stream into Stream /// /// The reader simply returns a birary stream of 32 byte digest values. pub struct DigestListDecoder { input: S, buffer: BytesMut, } impl DigestListDecoder { pub fn new(input: S) -> Self { Self { input, buffer: BytesMut::new() } } } impl Stream for DigestListDecoder where S: Stream, S::Error: Into, { type Item = [u8; 32]; type Error = Error; fn poll(&mut self) -> Result>, Self::Error> { loop { if self.buffer.len() >= 32 { let left = self.buffer.split_to(32); let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() }; unsafe { std::ptr::copy_nonoverlapping(left.as_ptr(), digest.as_mut_ptr(), 32); } return Ok(Async::Ready(Some(digest))); } match self.input.poll() { Err(err) => { return Err(err.into()); } Ok(Async::NotReady) => { return Ok(Async::NotReady); } Ok(Async::Ready(None)) => { let rest = self.buffer.len(); if rest == 0 { return Ok(Async::Ready(None)); } return Err(format_err!("got small digest ({} != 32).", rest)); } Ok(Async::Ready(Some(data))) => { self.buffer.extend_from_slice(&data); // continue } } } } }