import proxmox-protocol crate

This is supposed to contain only the parts necessary to
communicate with the server via the proxmox backup protocol.
(Including the chunker, which is currently `include!()`d
from the main crate.

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-03-06 10:16:35 +01:00
parent e3062f87b1
commit ac4e349b5e
11 changed files with 2396 additions and 0 deletions

View File

@ -0,0 +1,14 @@
[package]
name = "proxmox-protocol"
version = "0.1.0"
authors = [
"Dietmar Maurer <dietmar@proxmox.com>",
"Wolfgang Bumiller <w.bumiller@proxmox.com>",
]
edition = "2018"
[dependencies]
chrono = "0.4"
endian_trait = "0.6"
failure = "0.1"
openssl = "0.10"

View File

@ -0,0 +1,403 @@
//! For the C API we need to provide a `Client` compatible with C. In rust `Client` takes a
//! `T: io::Read + io::Write`, so we need to provide a way for C to provide callbacks to us to
//! implement this.
use std::ffi::{CStr, CString};
use std::io;
use std::os::raw::{c_char, c_int, c_ulong, c_void};
use failure::{bail, format_err, Error};
/// Read callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`,
/// the rest are the usual read function parameters. This should return the number of bytes
/// actually read, zero on EOF, or a negative `errno` value on error (eg. `-EAGAIN`).
pub type ReadFn = extern "C" fn(opaque: *mut c_void, buf: *mut u8, size: u64) -> i64;
/// Write callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`,
/// the rest are the usual write function parameters. This should return the number of bytes
/// actually written, or a negative `errno` value on error (eg. `-EAGAIN`).
pub type WriteFn = extern "C" fn(opaque: *mut c_void, buf: *const u8, size: u64) -> i64;
/// Stores the external C callbacks for communicating with the protocol socket.
struct CApiSocket {
opaque: *mut c_void,
read: ReadFn,
write: WriteFn,
}
/// A client instance using C callbacks for reading from and writing to the protocol socket.
pub struct CClient {
client: crate::Client<CApiSocket>,
error: Option<CString>,
upload: Option<(*const u8, usize)>,
}
impl CClient {
fn set_error(&mut self, err: Error) -> c_int {
self.error = Some(match CString::new(err.to_string()) {
Ok(cs) => cs,
Err(_) => CString::new("<bad bytes in error string>").unwrap(),
});
-1
}
#[inline(always)]
fn bool_result(&mut self, res: Result<bool, Error>) -> c_int {
match res {
Ok(false) => 0,
Ok(true) => 1,
Err(e) => self.set_error(e),
}
}
#[inline(always)]
fn bool_call<F>(&mut self, func: F) -> c_int
where
F: FnOnce(&mut crate::Client<CApiSocket>) -> Result<bool, Error>,
{
let res = func(&mut self.client);
self.bool_result(res)
}
#[inline(always)]
fn int_call<F>(&mut self, func: F) -> c_int
where
F: FnOnce(&mut crate::Client<CApiSocket>) -> Result<c_int, Error>,
{
match func(&mut self.client) {
Ok(v) => v,
Err(e) => self.set_error(e.into()),
}
}
}
impl io::Read for CApiSocket {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let rc = (self.read)(self.opaque, buf.as_mut_ptr(), buf.len() as u64);
if rc < 0 {
Err(io::Error::from_raw_os_error((-rc) as i32))
} else {
Ok(rc as usize)
}
}
}
impl io::Write for CApiSocket {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let rc = (self.write)(self.opaque, buf.as_ptr(), buf.len() as u64);
if rc < 0 {
Err(io::Error::from_raw_os_error((-rc) as i32))
} else {
Ok(rc as usize)
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
/// Creates a new instance of a backup protocol client.
///
/// # Arguments
///
/// * `opaque` - An opaque pointer passed to the two provided callback methods.
/// * `read` - The read callback.
/// * `write` - The write callback.
#[no_mangle]
pub extern "C" fn proxmox_backup_new(
opaque: *mut c_void,
read: ReadFn,
write: WriteFn,
) -> *mut CClient {
Box::leak(Box::new(CClient {
client: crate::Client::new(CApiSocket {
opaque,
read,
write,
}),
error: None,
upload: None,
}))
}
/// Drops an instance of a backup protocol client. The pointer must be valid or `NULL`.
#[no_mangle]
pub extern "C" fn proxmox_backup_done(me: *mut CClient) {
if !me.is_null() {
unsafe {
Box::from_raw(me);
}
}
}
/// Returns a C String describing the last error or `NULL` if there was none.
#[no_mangle]
pub extern "C" fn proxmox_backup_get_error(me: *const CClient) -> *const c_char {
let me = unsafe { &*me };
match me.error {
Some(ref e) => e.as_ptr(),
None => std::ptr::null(),
}
}
/// Returns true if the `read` callback had previously returned `EOF`.
#[no_mangle]
pub extern "C" fn proxmox_backup_is_eof(me: *const CClient) -> bool {
let me = unsafe { &*me };
me.client.eof()
}
/// The data polling methods usually pass errors from the callbacks through to the original caller.
/// Since the protocol needs to be non-blocking-IO safe and therefore able to resumine at any point
/// where `-EAGAIN` can be returned by the callbacks, it is up to the caller which errors are to be
/// considered fatal, but any error returned by callbacks which is not `-EAGAIN` will result in an
/// internal error flag to be set which has to be cleared before trying to resume normal
/// operations.
#[no_mangle]
pub extern "C" fn proxmox_backup_clear_err(me: *mut CClient) {
let me = unsafe { &mut *me };
me.client.clear_err();
me.error = None;
}
/// Polls for data and checks whether the protocol handshake has been made successfully.
/// Returns `1` if the handshake was successful, `0` if it is not yet complete or `-1` on error.
#[no_mangle]
pub extern "C" fn proxmox_backup_wait_for_handshake(me: *mut CClient) -> c_int {
let me = unsafe { &mut *me };
me.bool_call(move |c| c.wait_for_handshake())
}
fn check_string(s: *const c_char) -> Result<&'static str, Error> {
if s.is_null() {
bail!("NULL string");
}
Ok(std::str::from_utf8(unsafe {
CStr::from_ptr(s).to_bytes()
})?)
}
/// Request the list of hashes for a backup file in order to prevent duplicates from being sent to
/// the server. This simply causes an internal list to be filled. Only one such operation can be
/// performed simultaneously. To wait for its completion see `proxmox_backup_wait_for_hashes`.
///
/// If the file name is `NULL` or not a valid UTF-8 string, this function returns an error without
/// putting the protocol handler in an error state.
///
/// Returns `0` on success, `-1` otherwise.
#[no_mangle]
pub extern "C" fn proxmox_backup_query_hashes(me: *mut CClient, file_name: *const c_char) -> c_int {
let me = unsafe { &mut *me };
me.int_call(move |client| {
let file_name = check_string(file_name)?;
client.query_hashes(file_name)?;
Ok(0)
})
}
/// If there is an ongoing hash list request, this will poll the data stream.
///
/// Returns `1` if the transfer is complete (or there was no transfer to begin with), `0` if it is
/// incomplete, or `-1` if an error occurred.
#[no_mangle]
pub extern "C" fn proxmox_backup_wait_for_hashes(me: *mut CClient) -> c_int {
let me = unsafe { &mut *me };
me.bool_call(move |c| c.wait_for_hashes())
}
/// Check if a chunk of the provided digest is known to the this client instance. Note that this
/// does not query the server for this information, and is only useful after a call to
/// `proxmox_backup_query_hashes` or after uploading something.
#[no_mangle]
pub extern "C" fn proxmox_backup_is_chunk_available(me: *const CClient, digest: *const u8) -> bool {
let me = unsafe { &*me };
let digest = unsafe { &*(digest as *const [u8; 32]) };
me.client.is_chunk_available(digest)
}
/// Begin uploading a chunk to the server. This attempts to upload the data right away, but if the
/// writer may fail due to non-blocking I/O in which case the `proxmox_backup_continue_upload`
/// function must be used.
///
/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately,
/// or `-1` on error.
///
/// The ID returned on success can be used to wait for the server to acknowledge that the chunk has
/// been written successfully. Use `proxmox_backup_wait_for_id` to do this. If confirmation is not
/// required, the ID should be released via `proxmox_backup_discard_id`.
#[no_mangle]
pub extern "C" fn proxmox_backup_upload_chunk(
me: *mut CClient,
digest: *const u8,
data: *const u8,
size: u64,
) -> c_int {
let me = unsafe { &mut *me };
let digest: &[u8; 32] = unsafe { &*(digest as *const [u8; 32]) };
let size = size as usize;
let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, size) };
match me.client.upload_chunk(digest, slice) {
Ok(Some(id)) => id.0 as c_int,
Ok(None) => {
me.upload = Some((data, size));
0
}
Err(e) => me.set_error(e),
}
}
/// If an upload did not finish immediately (`proxmox_backup_upload_chunk` returned `0`), this
/// function must be used to retry sending the rest of the data.
///
/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately,
/// or `-1` on error.
#[no_mangle]
pub extern "C" fn proxmox_backup_continue_upload(me: *mut CClient) -> c_int {
let me = unsafe { &mut *me };
match me.upload {
Some((data, len)) => {
let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, len) };
match me.client.continue_upload_chunk(slice) {
Ok(Some(id)) => id.0 as c_int,
Ok(None) => 0,
Err(e) => me.set_error(e),
}
}
None => me.set_error(format_err!("no upload currently running")),
}
}
/// Run the main receive loop. Returns `0` on success, `-1` on error.
#[no_mangle]
pub extern "C" fn proxmox_backup_poll_read(me: *mut CClient) -> c_int {
let me = unsafe { &mut *me };
match me.client.poll_read() {
Ok(_) => 0,
Err(e) => me.set_error(e),
}
}
/// Run the main send loop. If the `write` callback returned `-EAGAIN`, during an operation, the
/// protocol handler keeps the data to be sent in a write queue. This function will attempt to
/// continue writing out the remaining data. See individual function descriptions for when this is
/// necessary.
///
/// Returns `1` if the queue is now empty, `0` if there is still data in the queue, or `-1` on
/// error.
#[no_mangle]
pub extern "C" fn proxmox_backup_poll_send(me: *mut CClient) -> c_int {
let me = unsafe { &mut *me };
me.bool_call(move |c| Ok(c.poll_send()?.unwrap_or(true)))
}
/// Run the main receive loop and check for confirmation of a stream with the specified ID.
///
/// Returns `1` if the transaction was confirmed, `0` if not, or `-1` on error.
///
/// Note that once this function returned `1` for an ID, the id is considered to be free for
/// recycling and should not be used for further calls.
#[no_mangle]
pub extern "C" fn proxmox_backup_wait_for_id(me: *mut CClient, id: c_int) -> c_int {
let me = unsafe { &mut *me };
me.bool_call(move |c| c.wait_for_id(crate::StreamId(id as u8)))
}
/// Notifies the protocol handler that we do not bother waiting for confirmation of an ID. The ID
/// may immediately be recycled for future transactions, thus the user should not use it for any
/// further function calls.
///
/// Returns `0` on success, `-1` on error.
#[no_mangle]
pub extern "C" fn proxmox_backup_discard_id(me: *mut CClient, id: c_int) -> c_int {
let me = unsafe { &mut *me };
match me.client.discard_id(crate::StreamId(id as u8)) {
Ok(_) => 0,
Err(e) => me.set_error(e),
}
}
/// Create a new backup. The returned ID should be waited upon via `proxmox_backup_wait_for_id`,
/// which returns true once the server confirmed the creation of the backup.
#[no_mangle]
pub extern "C" fn proxmox_backup_create(
me: *mut CClient,
dynamic: bool,
backup_type: *const c_char, // "host", "ct", "vm"
backup_id: *const c_char,
time_epoch: i64,
file_name: *const c_char,
chunk_size: c_ulong,
file_size: i64,
is_new: bool,
) -> c_int {
let me = unsafe { &mut *me };
me.int_call(move |client| {
let index_type = match dynamic {
false => crate::IndexType::Fixed,
_ => crate::IndexType::Dynamic,
};
let backup_type = check_string(backup_type)?;
let backup_id = check_string(backup_id)?;
let file_name = check_string(file_name)?;
Ok(client
.create_backup(
index_type,
backup_type,
backup_id,
time_epoch,
file_name,
chunk_size as _,
if file_size < 0 {
None
} else {
Some(file_size as u64)
},
is_new,
)?
.0 as c_int)
})
}
/// Send a dynamic chunk entry.
///
/// If the entry was sent out successfully this returns `1`. If the `write` callback returned
/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send`
/// should be used to continue sending the data.
/// On error `-1` is returned.
#[no_mangle]
pub extern "C" fn proxmox_backup_dynamic_data(
me: *mut CClient,
stream: c_int,
digest: *const [u8; 32],
size: u64,
) -> c_int {
let me = unsafe { &mut *me };
me.bool_call(move |client| {
client.dynamic_data(crate::BackupStream(stream as u8), unsafe { &*digest }, size)
})
}
/// Send a fixed chunk entry.
///
/// If the entry was sent out successfully this returns `1`. If the `write` callback returned
/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send`
/// should be used to continue sending the data.
/// On error `-1` is returned.
#[no_mangle]
pub extern "C" fn proxmox_backup_fixed_data(
me: *mut CClient,
stream: c_int,
index: c_ulong,
digest: *const [u8; 32],
) -> c_int {
let me = unsafe { &mut *me };
me.bool_call(move |client| {
client.fixed_data(crate::BackupStream(stream as u8), index as usize, unsafe {
&*digest
})
})
}

View File

@ -0,0 +1,118 @@
use std::io::Read;
use failure::Error;
use crate::Chunker;
pub struct ChunkStream<T: Read> {
input: T,
buffer: Vec<u8>,
fill: usize,
pos: usize,
keep: bool,
eof: bool,
chunker: Chunker,
}
impl<T: Read> ChunkStream<T> {
pub fn new(input: T) -> Self {
Self {
input,
buffer: Vec::new(),
fill: 0,
pos: 0,
keep: false,
eof: false,
chunker: Chunker::new(4 * 1024 * 1024),
}
}
pub fn stream(&mut self) -> &mut Self {
self
}
fn fill_buf(&mut self) -> Result<bool, Error> {
if self.fill == self.buffer.len() {
let mut more = self.buffer.len(); // just double it
if more == 0 {
more = 1024 * 1024; // at the start, make a 1M buffer
}
// we need more data:
self.buffer.reserve(more);
unsafe {
self.buffer.set_len(self.buffer.capacity());
}
}
match self.input.read(&mut self.buffer[self.fill..]) {
Ok(more) => {
if more == 0 {
self.eof = true;
}
self.fill += more;
Ok(true)
}
Err(err) => {
if err.kind() == std::io::ErrorKind::WouldBlock {
Ok(false)
} else {
Err(err.into())
}
}
}
}
fn consume(&mut self) {
assert!(self.fill >= self.pos);
let remaining = self.fill - self.pos;
unsafe {
std::ptr::copy_nonoverlapping(
&self.buffer[self.pos] as *const u8,
self.buffer.as_mut_ptr(),
remaining,
);
}
self.fill = remaining;
self.pos = 0;
}
pub fn next(&mut self) {
self.keep = false;
}
// This crate should not depend on the futures create, so we use another Option instead of
// Async<T>.
pub fn get(&mut self) -> Result<Option<Option<&[u8]>>, Error> {
if self.keep {
return Ok(Some(Some(&self.buffer[0..self.pos])));
}
if self.eof {
return Ok(Some(None));
}
if self.pos != 0 {
self.consume();
}
loop {
match self.fill_buf() {
Ok(true) => (),
Ok(false) => return Ok(None),
Err(err) => return Err(err),
}
// Note that if we hit EOF we hit a hard boundary...
let boundary = self.chunker.scan(&self.buffer[self.pos..self.fill]);
if boundary == 0 && !self.eof {
self.pos = self.fill;
continue;
}
self.pos += boundary;
self.keep = true;
return Ok(Some(Some(&self.buffer[0..self.pos])));
}
}
}

View File

@ -0,0 +1,4 @@
include!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/../src/backup/chunker.rs"
));

View File

@ -0,0 +1,596 @@
use std::borrow::Borrow;
use std::collections::hash_map;
use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};
use std::mem;
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use endian_trait::Endian;
use failure::*;
use crate::common;
use crate::protocol::*;
use crate::tools::swapped_data_to_buf;
use crate::{ChunkEntry, FixedChunk, IndexType};
#[derive(Clone, Copy, Eq, PartialEq)]
#[repr(transparent)]
pub struct BackupStream(pub(crate) u8);
#[derive(Clone, Copy, Eq, PartialEq)]
#[repr(transparent)]
pub struct StreamId(pub(crate) u8);
impl From<BackupStream> for StreamId {
fn from(v: BackupStream) -> Self {
Self(v.0)
}
}
struct BackupStreamData {
id: u8,
index_type: IndexType,
pos: u64,
path: Option<String>,
}
pub enum AckState {
Waiting, // no ack received yet.
Received, // already received an ack, but the user hasn't seen it yet.
Ignore, // client doesn't care.
AwaitingData, // waiting for something other than an 'Ok' packet
}
pub struct Client<S>
where
S: Read + Write,
{
chunks: RwLock<HashSet<FixedChunk>>,
common: common::Connection<S>,
handshake_done: bool,
cur_id: u8,
free_ids: Vec<u8>,
waiting_ids: HashMap<u8, AckState>,
hash_download: Option<u8>,
upload_chunk: Option<FixedChunk>,
upload_id: u8,
upload_pos: usize,
upload_state: u8,
streams: HashMap<u8, BackupStreamData>,
}
type Result<T> = std::result::Result<T, Error>;
impl<S> Client<S>
where
S: Read + Write,
{
pub fn new(socket: S) -> Self {
Self {
chunks: RwLock::new(HashSet::new()),
common: common::Connection::new(socket),
handshake_done: false,
cur_id: 1,
free_ids: Vec::new(),
waiting_ids: HashMap::new(),
hash_download: None,
upload_state: 0,
upload_pos: 0,
upload_id: 0,
upload_chunk: None,
streams: HashMap::new(),
}
}
pub fn eof(&self) -> bool {
self.common.eof
}
pub fn error(&self) -> bool {
self.common.error
}
/// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
pub fn clear_err(&mut self) {
self.common.clear_err()
}
pub fn wait_for_handshake(&mut self) -> Result<bool> {
if !self.handshake_done {
self.poll_read()?;
}
Ok(self.handshake_done)
}
pub fn query_hashes(&mut self, file_name: &str) -> Result<()> {
if self.hash_download.is_some() {
bail!("hash query already in progress");
}
let id = self.next_id()?;
let mut packet = Packet::builder(id, PacketType::GetHashList);
packet
.write_data(client::GetHashList {
name_length: file_name.len() as u16,
})
.write_buf(file_name.as_bytes());
self.common.queue_data(packet.finish())?;
self.hash_download = Some(id);
Ok(())
}
pub fn wait_for_hashes(&mut self) -> Result<bool> {
if self.hash_download.is_some() {
self.poll_read()?;
}
Ok(self.hash_download.is_none())
}
fn chunk_read_lock(&self) -> Result<RwLockReadGuard<HashSet<FixedChunk>>> {
self.chunks
.read()
.map_err(|_| format_err!("lock poisoned, disconnecting client..."))
}
pub fn is_chunk_available<T: Borrow<FixedChunk>>(&self, chunk: &T) -> bool {
self.chunk_read_lock().unwrap().contains(chunk.borrow())
}
/// Attempts to upload a chunk. Returns an error state only on fatal errors. If the underlying
/// writer returns a `WouldBlock` error this returns `None` and `continue_upload_chunk` has to
/// be called until the chunk is uploaded completely. During this time no other operations
/// should be performed on the this object!
/// See `continue_upload_chunk()` for a description of the returned value.
pub fn upload_chunk<T>(&mut self, info: &T, data: &[u8]) -> Result<Option<StreamId>>
where
T: Borrow<FixedChunk>,
{
if self.upload_chunk.is_some() {
bail!("cannot concurrently upload multiple chunks");
}
self.upload_id = self.next_id()?;
self.upload_chunk = Some(info.borrow().clone());
self.next_upload_state(0);
self.continue_upload_chunk(data)
}
fn next_upload_state(&mut self, state: u8) {
self.upload_state = state;
self.upload_pos = 0;
}
// This is split into a static method not borrowing self so the buffer can point into self...
fn do_upload_write(
writer: &mut common::Connection<S>,
buf: &[u8],
) -> Result<Option<(usize, bool)>> {
match writer.write_some(buf) {
Ok(put) => Ok(Some((put, put == buf.len()))),
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
Ok(None)
} else {
Err(e.into())
}
}
}
}
fn after_upload_write(&mut self, put: Option<(usize, bool)>, next_state: u8) -> bool {
match put {
None => return false,
Some((put, done)) => {
if done {
self.next_upload_state(next_state);
} else {
self.upload_pos += put;
}
done
}
}
}
fn upload_write(&mut self, buf: &[u8], next_state: u8) -> Result<bool> {
let wrote = Self::do_upload_write(&mut self.common, buf)?;
Ok(self.after_upload_write(wrote, next_state))
}
/// If an `upload_chunk()` call returned `Ok(false)` this needs to be used to complete the
/// upload process as the chunk may have already been partially written to the socket.
/// This function will return `Ok(false)` on `WouldBlock` errors just like `upload_chunk()`
/// will, after which the caller should wait for the writer to become write-ready and then
/// call this method again.
/// Once the complete chunk packet has been written to the underlying socket, this returns a
/// packet ID which can be waited upon for completion
pub fn continue_upload_chunk(&mut self, data: &[u8]) -> Result<Option<StreamId>> {
loop {
match self.upload_state {
// Writing the packet header:
0 => {
let len = mem::size_of::<Packet>()
+ mem::size_of::<client::UploadChunk>()
+ data.len();
let packet = Packet {
id: self.upload_id,
pkttype: PacketType::UploadChunk as _,
length: len as _,
}
.to_le();
let buf = unsafe { swapped_data_to_buf(&packet) };
if !self.upload_write(&buf[self.upload_pos..], 1)? {
return Ok(None);
}
}
// Writing the hash:
1 => {
let chunk = self.upload_chunk.as_ref().unwrap();
let buf = &chunk.0[self.upload_pos..];
let wrote = Self::do_upload_write(&mut self.common, buf)?;
if !self.after_upload_write(wrote, 2) {
return Ok(None);
}
}
// Writing the data:
2 => {
if !self.upload_write(&data[self.upload_pos..], 3)? {
return Ok(None);
}
}
// Done:
3 => {
self.upload_chunk = None;
self.expect_ok_for_id(self.upload_id);
return Ok(Some(StreamId(self.upload_id)));
}
n => bail!("bad chunk upload state: {}", n),
}
}
}
// generic data polling method
pub fn poll_read(&mut self) -> Result<()> {
if self.common.eof {
// polls after EOF are errors:
bail!("server disconnected");
}
if !self.common.poll_read()? {
// On the client side we do not expect a server-side disconnect, so error out!
if self.common.eof {
bail!("server disconnected");
}
return Ok(());
}
loop {
match self.common.current_packet_type {
PacketType::Ok => self.recv_ok()?,
PacketType::Hello => self.recv_hello()?,
PacketType::HashListPart => self.recv_hash_list()?,
PacketType::BackupCreated => self.backup_created()?,
_ => bail!(
"server sent an unexpected packet of type {}",
self.common.current_packet_type as u32,
),
}
if !self.common.next()? {
break;
}
}
Ok(())
}
// None => nothing was queued
// Some(true) => queue finished
// Some(false) => queue not finished
pub fn poll_send(&mut self) -> Result<Option<bool>> {
self.common.poll_send()
}
// private helpermethods
fn next_id(&mut self) -> Result<u8> {
if let Some(id) = self.free_ids.pop() {
return Ok(id);
}
if self.cur_id < 0xff {
self.cur_id += 1;
return Ok(self.cur_id - 1);
}
bail!("too many concurrent transactions");
}
fn free_id(&mut self, id: u8) {
self.free_ids.push(id);
}
fn expect_ok_for_id(&mut self, id: u8) {
self.waiting_ids.insert(id, AckState::Waiting);
}
fn expect_response_for_id(&mut self, id: u8) {
self.waiting_ids.insert(id, AckState::AwaitingData);
}
// Methods handling received packets:
fn recv_hello(&mut self) -> Result<()> {
let hello = self.common.read_unaligned_data::<server::Hello>(0)?;
if hello.magic != server::HELLO_MAGIC {
bail!("received an invalid hello packet");
}
let ver = hello.version;
if ver != server::PROTOCOL_VERSION {
bail!(
"hello packet contained incompatible protocol version: {} (!= {})",
ver,
server::PROTOCOL_VERSION,
);
}
self.handshake_done = true;
self.free_id(0);
Ok(())
}
fn chunk_write_lock(&self) -> Result<RwLockWriteGuard<HashSet<FixedChunk>>> {
self.chunks
.write()
.map_err(|_| format_err!("lock poisoned, disconnecting client..."))
}
fn recv_hash_list(&mut self) -> Result<()> {
let data = self.common.packet_data();
if data.len() == 0 {
// No more hashes, we're done
self.hash_download = None;
return Ok(());
}
if (data.len() % mem::size_of::<FixedChunk>()) != 0 {
bail!("hash list contains invalid size");
}
let chunk_list: &[FixedChunk] = unsafe {
std::slice::from_raw_parts(
data.as_ptr() as *const FixedChunk,
data.len() / mem::size_of::<FixedChunk>(),
)
};
let mut my_chunks = self.chunk_write_lock()?;
for c in chunk_list {
eprintln!("Got chunk '{}'", c.digest_to_hex());
my_chunks.insert(c.clone());
}
Ok(())
}
fn ack_id(&mut self, id: u8, data_packet: bool) -> Result<()> {
use hash_map::Entry::*;
match self.waiting_ids.entry(id) {
Vacant(_) => bail!("received unexpected packet for transaction id {}", id),
Occupied(mut entry) => match entry.get() {
AckState::Ignore => {
entry.remove();
}
AckState::Received => bail!("duplicate Ack received for transaction id {}", id),
AckState::Waiting => {
if data_packet {
bail!("received data packet while expecting simple Ok for {}", id);
}
*entry.get_mut() = AckState::Received;
}
AckState::AwaitingData => {
if !data_packet {
bail!(
"received empty Ok while waiting for data on stream id {}",
id
);
}
*entry.get_mut() = AckState::Received;
}
},
}
Ok(())
}
fn recv_ok(&mut self) -> Result<()> {
self.ack_id(self.common.current_packet.id, false)
}
pub fn wait_for_id(&mut self, id: StreamId) -> Result<bool> {
if !self.waiting_ids.contains_key(&id.0) {
bail!("wait_for_id() called on unexpected id {}", id.0);
}
self.poll_read()?;
use hash_map::Entry::*;
match self.waiting_ids.entry(id.0) {
Vacant(_) => Ok(true),
Occupied(entry) => match entry.get() {
AckState::Received => {
entry.remove();
Ok(true)
}
_ => Ok(false),
},
}
}
pub fn discard_id(&mut self, id: StreamId) -> Result<()> {
use hash_map::Entry::*;
match self.waiting_ids.entry(id.0) {
Vacant(_) => bail!("discard_id called with unknown id {}", id.0),
Occupied(mut entry) => match entry.get() {
AckState::Ignore => (),
AckState::Received => {
entry.remove();
}
AckState::Waiting | AckState::AwaitingData => {
*entry.get_mut() = AckState::Ignore;
}
},
}
Ok(())
}
pub fn create_backup(
&mut self,
index_type: IndexType,
backup_type: &str,
id: &str,
timestamp: i64,
file_name: &str,
chunk_size: usize,
file_size: Option<u64>,
is_new: bool,
) -> Result<BackupStream> {
let backup_type = backup_type::name_to_id(backup_type)?;
if id.len() > 0xff {
bail!("id too long");
}
if file_name.len() > 0xff {
bail!("file name too long");
}
let mut flags: backup_flags::Type = 0;
if is_new {
flags |= backup_flags::EXCL;
}
if index_type == IndexType::Dynamic {
flags |= backup_flags::DYNAMIC_CHUNKS;
if file_size.is_some() {
bail!("file size must be None on dynamic backup streams");
}
} else if file_size.is_none() {
bail!("file size is mandatory for fixed backup streams");
}
let packet_id = self.next_id()?;
let mut packet = Packet::builder(packet_id, PacketType::CreateBackup);
packet
.write_data(client::CreateBackup {
backup_type,
id_length: id.len() as _,
timestamp: timestamp as u64,
flags,
name_length: file_name.len() as _,
chunk_size: chunk_size as _,
file_size: file_size.unwrap_or(0) as u64,
})
.write_buf(id.as_bytes())
.write_buf(file_name.as_bytes());
self.streams.insert(
packet_id,
BackupStreamData {
id: packet_id,
index_type,
pos: 0,
path: None,
},
);
self.expect_response_for_id(packet_id);
self.common.queue_data(packet.finish())?;
Ok(BackupStream(packet_id))
}
fn backup_created(&mut self) -> Result<()> {
let info = self
.common
.read_unaligned_data::<server::BackupCreated>(0)?;
let data = &self.common.packet_data()[mem::size_of_val(&info)..];
if data.len() != info.path_length as usize {
bail!("backup-created packet has invalid length");
}
let name = std::str::from_utf8(data)?;
let pkt_id = self.common.current_packet.id;
self.streams
.get_mut(&pkt_id)
.ok_or_else(|| format_err!("BackupCreated response for invalid stream: {}", pkt_id))?
.path = Some(name.to_string());
self.ack_id(pkt_id, true)?;
Ok(())
}
pub fn dynamic_chunk(&mut self, stream: BackupStream, entry: &ChunkEntry) -> Result<bool> {
self.dynamic_data(stream, &entry.hash, entry.size)
}
pub fn dynamic_data<T: Borrow<FixedChunk>>(
&mut self,
stream: BackupStream,
digest: &T,
size: u64,
) -> Result<bool> {
let data = self
.streams
.get_mut(&stream.0)
.ok_or_else(|| format_err!("no such active backup stream"))?;
if data.index_type != IndexType::Dynamic {
bail!("dynamic_data called for stream of static chunks");
}
let mut packet = Packet::builder(data.id, PacketType::BackupDataDynamic);
packet
.write_data(data.pos as u64)
.write_buf(&digest.borrow().0);
data.pos += size;
self.common.queue_data(packet.finish())
}
pub fn fixed_data<T: Borrow<FixedChunk>>(
&mut self,
stream: BackupStream,
index: usize,
digest: &T,
) -> Result<bool> {
let data = self
.streams
.get_mut(&stream.0)
.ok_or_else(|| format_err!("no such active backup stream"))?;
if data.index_type != IndexType::Fixed {
bail!("fixed_data called for stream of dynamic chunks");
}
let mut packet = Packet::builder(data.id, PacketType::BackupDataFixed);
packet
.write_data(index as u64)
.write_buf(&digest.borrow().0);
self.common.queue_data(packet.finish())
}
pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(StreamId, String, bool)> {
let path = self
.streams
.remove(&stream.0)
.ok_or_else(|| format_err!("no such active backup stream"))?
.path
.unwrap_or_else(|| "<no remote name received>".to_string());
let ack = self
.common
.queue_data(Packet::simple(stream.0, PacketType::BackupFinished))?;
self.expect_ok_for_id(stream.0);
Ok((StreamId(stream.0), path, ack))
}
}

View File

@ -0,0 +1,282 @@
use std::io::{self, Read, Write};
use std::mem;
use std::ptr;
use failure::*;
use endian_trait::Endian;
use crate::protocol::*;
type Result<T> = std::result::Result<T, Error>;
pub(crate) struct Connection<S>
where
S: Read + Write,
{
socket: S,
pub buffer: Vec<u8>,
pub current_packet: Packet,
pub current_packet_type: PacketType,
pub error: bool,
pub eof: bool,
upload_queue: Option<(Vec<u8>, usize)>,
}
impl<S> Connection<S>
where
S: Read + Write,
{
pub fn new(socket: S) -> Self {
Self {
socket,
buffer: Vec::new(),
current_packet: unsafe { mem::zeroed() },
current_packet_type: PacketType::Error,
error: false,
eof: false,
upload_queue: None,
}
}
pub fn write_some(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.socket.write(buf)
}
/// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
pub fn clear_err(&mut self) {
self.error = false;
}
// None => nothing was queued
// Some(true) => queue finished
// Some(false) => queue not finished
pub fn poll_send(&mut self) -> Result<Option<bool>> {
if let Some((ref data, ref mut pos)) = self.upload_queue {
loop {
match self.socket.write(&data[*pos..]) {
Ok(put) => {
*pos += put;
if *pos == data.len() {
self.upload_queue = None;
return Ok(Some(true));
}
// Keep writing
continue;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(Some(false));
}
Err(e) => return Err(e.into()),
}
}
} else {
Ok(None)
}
}
// Returns true when the data was also sent out, false if the queue is now full.
// For now we only allow a single dataset to be queued at once.
pub fn queue_data(&mut self, buf: Vec<u8>) -> Result<bool> {
if self.upload_queue.is_some() {
bail!("upload queue clash");
}
self.upload_queue = Some((buf, 0));
match self.poll_send()? {
None => unreachable!(), // We literally just set self.upload_queue to Some(value)
Some(v) => Ok(v),
}
}
// Returns 'true' if there's data available, 'false' if there isn't (if the
// underlying reader returned `WouldBlock` or the `read()` was short).
// Other errors are propagated.
pub fn poll_read(&mut self) -> Result<bool> {
if self.eof {
return Ok(false);
}
if self.error {
eprintln!("refusing to read from a client in error state");
bail!("client is in error state");
}
match self.poll_data_do() {
Ok(has_packet) => Ok(has_packet),
Err(e) => {
// To support AsyncRead/AsyncWrite we do not enter a failed
// state when we read from a non-blocking source which fails
// with WouldBlock.
if let Some(ioe) = e.downcast_ref::<std::io::Error>() {
if ioe.kind() == io::ErrorKind::WouldBlock {
return Ok(false);
}
}
self.error = true;
Err(e)
}
}
}
fn poll_data_do(&mut self) -> Result<bool> {
if !self.read_packet()? {
return Ok(false);
}
if self.current_packet.length > MAX_PACKET_SIZE {
bail!("client tried to send a huge packet");
}
if !self.fill_packet()? {
return Ok(false);
}
Ok(true)
}
pub fn packet_length(&self) -> usize {
self.current_packet.length as usize
}
pub fn packet_data(&self) -> &[u8] {
let beg = mem::size_of::<Packet>();
let end = self.packet_length();
&self.buffer[beg..end]
}
pub fn next(&mut self) -> Result<bool> {
let pktlen = self.packet_length();
unsafe {
if self.buffer.len() != pktlen {
std::ptr::copy_nonoverlapping(
&self.buffer[pktlen],
&mut self.buffer[0],
self.buffer.len() - pktlen,
);
}
self.buffer.set_len(self.buffer.len() - pktlen);
}
self.poll_data_do()
}
// NOTE: After calling this you must `self.buffer.set_len()` when done!
#[must_use]
fn buffer_set_min_size(&mut self, size: usize) -> usize {
if self.buffer.capacity() < size {
self.buffer.reserve(size - self.buffer.len());
}
let start = self.buffer.len();
unsafe {
self.buffer.set_len(size);
}
start
}
fn fill_buffer(&mut self, size: usize) -> Result<bool> {
if self.buffer.len() >= size {
return Ok(true);
}
let mut filled = self.buffer_set_min_size(size);
loop {
// We don't use read_exact to not block too long or busy-read on nonblocking sockets...
match self.socket.read(&mut self.buffer[filled..]) {
Ok(got) => {
if got == 0 {
self.eof = true;
unsafe {
self.buffer.set_len(filled);
}
return Ok(false);
}
filled += got;
if filled >= size {
unsafe {
self.buffer.set_len(filled);
}
return Ok(true);
}
// reloop
}
Err(e) => {
unsafe {
self.buffer.set_len(filled);
}
return Err(e.into());
}
}
}
}
fn read_packet_do(&mut self) -> Result<bool> {
if !self.fill_buffer(mem::size_of::<Packet>())? {
return Ok(false);
}
self.current_packet = self.read_unaligned::<Packet>(0)?.from_le();
self.current_packet_type = match PacketType::try_from(self.current_packet.pkttype) {
Some(t) => t,
None => bail!("unexpected packet type"),
};
let length = self.current_packet.length;
if (length as usize) < mem::size_of::<Packet>() {
bail!("received packet of bad length ({})", length);
}
Ok(true)
}
fn read_packet(&mut self) -> Result<bool> {
match self.read_packet_do() {
Ok(b) => Ok(b),
Err(e) => {
if let Some(ioe) = e.downcast_ref::<std::io::Error>() {
if ioe.kind() == io::ErrorKind::WouldBlock {
return Ok(false);
}
}
Err(e)
}
}
}
fn read_unaligned<T: Endian>(&self, offset: usize) -> Result<T> {
if offset + mem::size_of::<T>() > self.buffer.len() {
bail!("buffer underrun");
}
Ok(unsafe { ptr::read_unaligned(&self.buffer[offset] as *const _ as *const T) }.from_le())
}
pub fn read_unaligned_data<T: Endian>(&self, offset: usize) -> Result<T> {
self.read_unaligned(offset + mem::size_of::<Packet>())
}
fn fill_packet(&mut self) -> Result<bool> {
self.fill_buffer(self.current_packet.length as usize)
}
// convenience helpers:
pub fn assert_size(&self, size: usize) -> Result<()> {
if self.packet_data().len() != size {
bail!(
"protocol error: invalid packet size (type {})",
self.current_packet.pkttype,
);
}
Ok(())
}
pub fn assert_atleast(&self, size: usize) -> Result<()> {
if self.packet_data().len() < size {
bail!(
"protocol error: invalid packet size (type {})",
self.current_packet.pkttype,
);
}
Ok(())
}
}

View File

@ -0,0 +1,19 @@
pub(crate) mod common;
pub mod protocol;
pub mod server;
pub mod tools;
mod chunk_stream;
pub use chunk_stream::*;
mod chunker;
pub use chunker::*;
mod client;
pub use client::*;
mod types;
pub use types::*;
pub mod c_client;

View File

@ -0,0 +1,313 @@
use std::mem;
use endian_trait::Endian;
// There's no reason to have more than that in a single packet...
pub const MAX_PACKET_SIZE: u32 = 16 * 1024 * 1024;
// Each packet has a transaction ID (eg. when uploading multiple disks each
// upload is a separate stream).
#[derive(Endian)]
#[repr(C, packed)]
pub struct Packet {
pub id: u8, // request/command id
pub pkttype: u8, // packet type
pub length: u32, // data length before the next packet
// content is attached directly afterwards
}
impl Packet {
pub fn builder(id: u8, pkttype: PacketType) -> PacketBuilder {
PacketBuilder::new(id, pkttype)
}
pub fn simple(id: u8, pkttype: PacketType) -> Vec<u8> {
Self::builder(id, pkttype).finish()
}
}
#[derive(Endian, Clone, Copy)]
#[repr(u8)]
pub enum ErrorId {
Generic,
Busy,
}
#[repr(u8)]
#[derive(Clone, Copy)]
pub enum PacketType {
/// First packet sent by the server.
Hello,
/// Generic acknowledgement.
Ok,
/// Error packet sent by the server, this cancels the request for which it is produced.
Error,
/// The client wants the list of available hashes in order to know which ones require an
/// upload.
///
/// The body should contain a backup file name of which to retrieve the hash list.
///
/// Server responds with a sequence of ``HashListPart`` packets.
GetHashList,
/// Array of hashes. The number of hashes in a packet is calculated from the packet length as
/// provided in the ``Packet`` struct. An empty packet indicates the end of the list. We send a
/// sequence of such packets because we don't know whether the server will be keeping the list
/// in memory yet, so it might not know the number in advance and may be iterating through
/// directories until it hits an end. It can produce the network packets asynchronously while
/// walking the chunk dir.
HashListPart,
/// Client requests to download chunks via a hash list from the server. The number of chunks
/// can be derived from the length of this request, so it works similar to ``HashListPart``,
/// but there's only 1 chunk list per request ID.
///
/// The server responds with a sequence of ``Chunk`` packets or ``Error``.
DownloadChunks,
/// The response to ``DownloadChunks``. One packet per requested chunk.
Chunk,
/// The upload of a chunk can happen independent from the ongoing backup
/// streams. Server responds with an ``OK``.
UploadChunk,
/// Create a file in a new or existing backup. Contains all the metadata of
/// a file.
///
/// The server responds with ``BackupCreated`` or ``Error``. On ``BackupCreated`` the client
/// may proceed to send as many ``BackupData...`` packets as necessary to fill the file.
/// The sequence is finished by the client with a ``BackupFinished``.
CreateBackup,
/// Successful from the server to a client's ``CreateBackup`` packet. Contains the server side
/// path relative to the store.
BackupCreated,
/// This packet contains an array of references to fixed sized chunks. Clients should upload
/// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent
/// chunk is an error.
///
/// The server produces an ``Error`` packet in case of an error.
BackupDataFixed,
/// This packet contains an array of references to dynamic sized chunks. Clients should upload
/// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent
/// chunk is an error.
///
/// The server produces an ``Error`` packet in case of an error.
BackupDataDynamic,
/// This ends a backup file. The server responds with an ``OK`` or an ``Error`` packet.
BackupFinished,
}
// Nightly has a std::convert::TryFrom, actually...
impl PacketType {
pub fn try_from(v: u8) -> Option<Self> {
if v <= PacketType::BackupFinished as u8 {
Some(unsafe { std::mem::transmute(v) })
} else {
None
}
}
}
// Not using bitflags! for Endian derive...
pub mod backup_flags {
pub type Type = u8;
/// The backup must not exist yet.
pub const EXCL: Type = 0x00000001;
/// The data represents a raw file
pub const RAW: Type = 0x00000002;
/// The data uses dynamically sized chunks (catar file)
pub const DYNAMIC_CHUNKS: Type = 0x00000004;
}
pub mod backup_type {
pub type Type = u8;
pub const VM: Type = 0;
pub const CT: Type = 1;
pub const HOST: Type = 2;
use failure::{bail, Error};
pub fn id_to_name(id: Type) -> Result<&'static str, Error> {
Ok(match id {
VM => "vm",
CT => "ct",
HOST => "host",
n => bail!("unknown backup type id: {}", n),
})
}
pub fn name_to_id(id: &str) -> Result<Type, Error> {
Ok(match id {
"vm" => VM,
"ct" => CT,
"host" => HOST,
n => bail!("unknown backup type name: {}", n),
})
}
}
#[repr(C, packed)]
pub struct DynamicChunk {
pub offset: u64,
pub digest: [u8; 32],
}
pub mod server {
use endian_trait::Endian;
pub const PROTOCOL_VERSION: u32 = 1;
#[derive(Eq, PartialEq)]
#[repr(C, packed)]
pub struct HelloMagic([u8; 8]);
pub const HELLO_MAGIC: HelloMagic = HelloMagic(*b"PMXBCKUP");
pub const HELLO_VERSION: u32 = 1; // the current version
#[derive(Endian)]
#[repr(C, packed)]
pub struct Hello {
pub magic: HelloMagic,
pub version: u32,
}
#[derive(Endian)]
#[repr(C, packed)]
pub struct Error {
pub id: u8,
}
#[derive(Endian)]
#[repr(C, packed)]
pub struct Chunk {
pub hash: super::DynamicChunk,
// Data follows here...
}
impl Endian for HelloMagic {
fn to_be(self) -> Self {
self
}
fn to_le(self) -> Self {
self
}
fn from_be(self) -> Self {
self
}
fn from_le(self) -> Self {
self
}
}
#[derive(Endian)]
#[repr(C, packed)]
pub struct BackupCreated {
pub path_length: u16,
// path follows here
}
}
pub mod client {
use endian_trait::Endian;
#[derive(Endian)]
#[repr(C, packed)]
pub struct UploadChunk {
pub hash: crate::FixedChunk,
}
#[derive(Endian)]
#[repr(C, packed)]
pub struct CreateBackup {
pub backup_type: super::backup_type::Type,
pub id_length: u8, // length of the ID string
pub timestamp: u64, // seconds since the epoch
pub flags: super::backup_flags::Type,
pub name_length: u8, // file name length
pub chunk_size: u32, // average or "fixed" chunk size
pub file_size: u64, // size for fixed size files (must be 0 if DYNAMIC_CHUNKS is set)
// ``id_length`` bytes of ID follow
// ``name_length`` bytes of file name follow
// Further packets contain the data or chunks
}
#[derive(Endian)]
#[repr(C, packed)]
pub struct GetHashList {
pub name_length: u16,
// name follows as payload
}
}
impl Endian for DynamicChunk {
fn to_be(self) -> Self {
self
}
fn to_le(self) -> Self {
self
}
fn from_be(self) -> Self {
self
}
fn from_le(self) -> Self {
self
}
}
pub struct PacketBuilder {
data: Vec<u8>,
}
impl PacketBuilder {
pub fn new(id: u8, pkttype: PacketType) -> Self {
let data = Vec::with_capacity(mem::size_of::<Packet>());
let mut me = Self { data };
me.write_data(
Packet {
id,
pkttype: pkttype as _,
length: 0,
}
.to_le(),
);
me
}
pub fn reserve(&mut self, more: usize) -> &mut Self {
self.data.reserve(more);
self
}
pub fn write_buf(&mut self, buf: &[u8]) -> &mut Self {
self.data.extend_from_slice(buf);
self
}
pub fn write_data<T: Endian>(&mut self, data: T) -> &mut Self {
self.write_data_noswap(&data.to_le())
}
pub fn write_data_noswap<T>(&mut self, data: &T) -> &mut Self {
self.write_buf(unsafe {
std::slice::from_raw_parts(data as *const T as *const u8, mem::size_of::<T>())
})
}
pub fn finish(mut self) -> Vec<u8> {
let length = self.data.len();
assert!(length >= mem::size_of::<Packet>());
unsafe {
let head = self.data.as_mut_ptr() as *mut Packet;
std::ptr::write_unaligned((&mut (*head).length) as *mut u32, (length as u32).to_le());
}
self.data
}
}

View File

@ -0,0 +1,465 @@
use std::collections::hash_map::{self, HashMap};
use std::io::{Read, Write};
use std::{mem, ptr};
use failure::*;
use endian_trait::Endian;
use crate::common;
use crate::protocol::*;
use crate::ChunkEntry;
use crate::FixedChunk;
type Result<T> = std::result::Result<T, Error>;
pub trait ChunkList: Send {
fn next(&mut self) -> Result<Option<&[u8; 32]>>;
}
/// This provides callbacks used by a `Connection` when it receives a packet.
pub trait HandleClient {
/// The protocol handler will call this when the client produces an irrecoverable error.
fn error(&self);
/// The client wants the list of hashes, the provider should provide an iterator over chunk
/// entries.
fn get_chunk_list(&self, backup_name: &str) -> Result<Box<dyn ChunkList>>;
/// The client has uploaded a chunk, we should add it to the chunk store. Return whether the
/// chunk was new.
fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result<bool>;
/// The client wants to create a backup. Since multiple backup streams can happen in parallel,
/// this should return a handler used to create the individual streams.
/// The handler will be informed about success via the ``finish()`` method.
fn create_backup(
&self,
backup_type: &str,
id: &str,
timestamp: i64,
new: bool,
) -> Result<Box<dyn HandleBackup + Send>>;
}
/// A single backup may contain multiple files. Currently we represent this via a hierarchy where
/// the `HandleBackup` trait is instantiated for each backup, which is responsible for
/// instantiating the `BackupFile` trait objects.
pub trait HandleBackup {
/// All individual streams for this backup have been successfully finished.
fn finish(&mut self) -> Result<()>;
/// Create a specific file in this backup.
fn create_file(
&self,
name: &str,
fixed_size: Option<u64>,
chunk_size: usize,
) -> Result<Box<dyn BackupFile + Send>>;
}
/// This handles backup files created by calling `create_file` on a `Backup`.
pub trait BackupFile {
/// Backup use the server-local timestamp formatting, so we want to be able to tell the client
/// the real remote path:
fn relative_path(&self) -> &str;
/// The client wants to add a chunk to a fixed index file at a certain position.
fn add_fixed_data(&mut self, index: u64, chunk: &FixedChunk) -> Result<()>;
/// The client wants to add a chunks to a dynamic index file.
fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()>;
/// This notifies the handler that the backup has finished successfully. This should commit the
/// data to the store for good. After this the client will receive an "ok".
///
/// If the Drop handler gets called before this method, the backup was aborted due to an error
/// or the client disconnected unexpectedly, in which case cleanup of temporary files should be
/// performed.
fn finish(&mut self) -> Result<()>;
}
#[derive(Clone, Eq, Hash, PartialEq)]
struct BackupId(backup_type::Type, String, i64);
/// Associates a socket with the server side of the backup protocol.
/// The communcation channel should be `Read + Write` and may be non-blocking (provided it
/// correctly returns `io::ErrorKind::WouldBlock`).
/// The handler has to implement the `HandleClient` trait to provide callbacks used while
/// communicating with the client.
pub struct Connection<S, H>
where
S: Read + Write,
H: HandleClient,
{
handler: H,
common: common::Connection<S>,
// states:
// If this is set we are currently transferring our hash list to the client:
hash_list: Option<(
u8, // data stream ID
Box<dyn ChunkList>,
)>,
// currently active 'backups' (handlers for a specific BackupDir)
backups: HashMap<BackupId, Box<dyn HandleBackup + Send>>,
// currently active backup *file* streams
backup_files: HashMap<u8, Box<dyn BackupFile + Send>>,
}
impl<S, H> Connection<S, H>
where
S: Read + Write,
H: HandleClient,
{
pub fn new(socket: S, handler: H) -> Result<Self> {
let mut me = Self {
handler,
common: common::Connection::new(socket),
hash_list: None,
backups: HashMap::new(),
backup_files: HashMap::new(),
};
me.send_hello()?;
Ok(me)
}
fn send_hello(&mut self) -> Result<()> {
let mut packet = Packet::builder(0, PacketType::Hello);
packet.write_data(server::Hello {
magic: server::HELLO_MAGIC,
version: server::PROTOCOL_VERSION,
});
self.common.queue_data(packet.finish())?;
Ok(())
}
pub fn eof(&self) -> bool {
self.common.eof
}
/// It is safe to clear the error after an `io::ErrorKind::Interrupted`.
pub fn clear_err(&mut self) {
self.common.clear_err()
}
pub fn main(&mut self) -> Result<()> {
self.poll_read()?;
self.poll_send()?;
Ok(())
}
// If this returns an error it is considered fatal and the connection should be dropped!
fn poll_read(&mut self) -> Result<()> {
if self.common.eof {
// polls after EOF are errors:
bail!("client disconnected");
}
if !self.common.poll_read()? {
// No data available
if self.common.eof {
bail!("client disconnected");
}
return Ok(());
}
// we received a packet, handle it:
loop {
use PacketType::*;
match self.common.current_packet_type {
GetHashList => self.hash_list_requested()?,
UploadChunk => self.receive_chunk()?,
CreateBackup => self.create_backup()?,
BackupDataDynamic => self.backup_data_dynamic()?,
BackupDataFixed => self.backup_data_fixed()?,
BackupFinished => self.backup_finished()?,
_ => bail!(
"client sent an unexpected packet of type {}",
self.common.current_packet_type as u32,
),
};
if !self.common.next()? {
break;
}
}
Ok(())
}
fn poll_send(&mut self) -> Result<()> {
if self.common.error {
eprintln!("refusing to send datato client in error state");
bail!("client is in error state");
}
if let Some(false) = self.common.poll_send()? {
// send queue is not finished, don't add anything else...
return Ok(());
}
// Queue has either finished or was empty, see if we should enqueue more data:
if self.hash_list.is_some() {
return self.send_hash_list();
}
Ok(())
}
fn hash_list_requested(&mut self) -> Result<()> {
// Verify protocol: GetHashList is an empty packet.
let request = self.common.read_unaligned_data::<client::GetHashList>(0)?;
self.common
.assert_size(mem::size_of_val(&request) + request.name_length as usize)?;
let name_bytes = &self.common.packet_data()[mem::size_of_val(&request)..];
let name = std::str::from_utf8(name_bytes)?;
// We support only one active hash list stream:
if self.hash_list.is_some() {
return self.respond_error(ErrorId::Busy);
}
self.hash_list = Some((
self.common.current_packet.id,
self.handler.get_chunk_list(name)?,
));
Ok(())
}
fn send_hash_list(&mut self) -> Result<()> {
loop {
let (stream_id, hash_iter) = self.hash_list.as_mut().unwrap();
let max_chunks_per_packet = (MAX_PACKET_SIZE as usize - mem::size_of::<Packet>())
/ mem::size_of::<FixedChunk>();
let mut packet = Packet::builder(*stream_id, PacketType::HashListPart);
packet.reserve(mem::size_of::<FixedChunk>() * max_chunks_per_packet);
let mut count = 0;
for _ in 0..max_chunks_per_packet {
let entry: &[u8; 32] = match hash_iter.next() {
Ok(Some(entry)) => entry,
Ok(None) => break,
Err(e) => {
eprintln!("error sending chunk list to client: {}", e);
continue;
}
};
packet.write_buf(entry);
count += 1;
}
let can_send_more = self.common.queue_data(packet.finish())?;
if count == 0 {
// We just sent the EOF packet, clear our iterator state!
self.hash_list = None;
break;
}
if !can_send_more {
break;
}
}
Ok(())
}
fn respond_error(&mut self, kind: ErrorId) -> Result<()> {
self.respond_value(PacketType::Error, kind)?;
Ok(())
}
fn respond_value<T: Endian>(&mut self, pkttype: PacketType, data: T) -> Result<()> {
let mut packet = Packet::builder(self.common.current_packet.id, pkttype);
packet.write_data(data);
self.common.queue_data(packet.finish())?;
Ok(())
}
fn respond_empty(&mut self, pkttype: PacketType) -> Result<()> {
self.common
.queue_data(Packet::simple(self.common.current_packet.id, pkttype))?;
Ok(())
}
fn respond_ok(&mut self) -> Result<()> {
self.respond_empty(PacketType::Ok)
}
fn receive_chunk(&mut self) -> Result<()> {
self.common
.assert_atleast(mem::size_of::<client::UploadChunk>())?;
let data = self.common.packet_data();
let (hash, data) = data.split_at(mem::size_of::<FixedChunk>());
if data.len() == 0 {
bail!("received an empty chunk");
}
let entry = ChunkEntry::from_data(data);
if entry.hash != hash {
let cli_hash = crate::tools::digest_to_hex(hash);
let data_hash = entry.digest_to_hex();
bail!(
"client claimed data with digest {} has digest {}",
data_hash,
cli_hash
);
}
let _new = self.handler.upload_chunk(&entry, data)?;
self.respond_ok()
}
fn create_backup(&mut self) -> Result<()> {
if self
.backup_files
.contains_key(&self.common.current_packet.id)
{
bail!("stream id already in use...");
}
let create_msg = self.common.read_unaligned_data::<client::CreateBackup>(0)?;
// simple data:
let flags = create_msg.flags;
let backup_type = create_msg.backup_type;
let time = create_msg.timestamp as i64;
// text comes from the payload data after the CreateBackup struct:
let data = self.common.packet_data();
let payload = &data[mem::size_of_val(&create_msg)..];
// there must be exactly the ID and the file name in the payload:
let id_len = create_msg.id_length as usize;
let name_len = create_msg.name_length as usize;
let expected_len = id_len + name_len;
if payload.len() < expected_len {
bail!("client sent incomplete CreateBackup request");
} else if payload.len() > expected_len {
bail!("client sent excess data in CreateBackup request");
}
// id and file name must be utf8:
let id = std::str::from_utf8(&payload[0..id_len])
.map_err(|e| format_err!("client-requested backup id is invalid: {}", e))?;
let file_name = std::str::from_utf8(&payload[id_len..])
.map_err(|e| format_err!("client-requested backup file name invalid: {}", e))?;
// Sanity check dynamic vs fixed:
let is_dynamic = (flags & backup_flags::DYNAMIC_CHUNKS) != 0;
let file_size = match (is_dynamic, create_msg.file_size) {
(false, size) => Some(size),
(true, 0) => None,
(true, _) => bail!("file size of dynamic streams must be zero"),
};
// search or create the handler:
let hashmap_id = BackupId(backup_type, id.to_string(), time);
let handle = match self.backups.entry(hashmap_id) {
hash_map::Entry::Vacant(entry) => entry.insert(self.handler.create_backup(
backup_type::id_to_name(backup_type)?,
id,
time,
(flags & backup_flags::EXCL) != 0,
)?),
hash_map::Entry::Occupied(entry) => entry.into_mut(),
};
let file = handle.create_file(file_name, file_size, create_msg.chunk_size as usize)?;
let mut response =
Packet::builder(self.common.current_packet.id, PacketType::BackupCreated);
let path = file.relative_path();
if path.len() > 0xffff {
bail!("path too long");
}
response
.write_data(server::BackupCreated {
path_length: path.len() as _,
})
.write_buf(path.as_bytes());
self.common.queue_data(response.finish())?;
self.backup_files
.insert(self.common.current_packet.id, file);
Ok(())
}
fn backup_data_dynamic(&mut self) -> Result<()> {
let stream_id = self.common.current_packet.id;
let file = self
.backup_files
.get_mut(&stream_id)
.ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?;
let mut data = self.common.packet_data();
// Data consists of (offset: u64, hash: [u8; 32])
let entry_len = mem::size_of::<DynamicChunk>();
while data.len() >= entry_len {
let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const DynamicChunk) };
data = &data[entry_len..];
entry.offset = entry.offset.from_le();
file.add_dynamic_data(&entry)?;
}
if data.len() != 0 {
bail!(
"client sent excess data ({} bytes) after dynamic chunk indices!",
data.len()
);
}
Ok(())
}
fn backup_data_fixed(&mut self) -> Result<()> {
let stream_id = self.common.current_packet.id;
let file = self
.backup_files
.get_mut(&stream_id)
.ok_or_else(|| format_err!("BackupDataFixed for invalid stream id {}", stream_id))?;
let mut data = self.common.packet_data();
// Data consists of (index: u64, hash: [u8; 32])
#[repr(C, packed)]
struct IndexedChunk {
index: u64,
digest: FixedChunk,
}
let entry_len = mem::size_of::<IndexedChunk>();
while data.len() >= entry_len {
let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const IndexedChunk) };
data = &data[entry_len..];
entry.index = entry.index.from_le();
file.add_fixed_data(entry.index, &entry.digest)?;
}
if data.len() != 0 {
bail!(
"client sent excess data ({} bytes) after dynamic chunk indices!",
data.len()
);
}
Ok(())
}
fn backup_finished(&mut self) -> Result<()> {
let stream_id = self.common.current_packet.id;
let mut file = self
.backup_files
.remove(&stream_id)
.ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?;
file.finish()?;
self.respond_ok()
}
}

View File

@ -0,0 +1,47 @@
use failure::{bail, Error};
pub fn digest_to_hex(digest: &[u8]) -> String {
const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef";
let mut buf = Vec::<u8>::with_capacity(digest.len() * 2);
for i in 0..digest.len() {
buf.push(HEX_CHARS[(digest[i] >> 4) as usize]);
buf.push(HEX_CHARS[(digest[i] & 0xf) as usize]);
}
unsafe { String::from_utf8_unchecked(buf) }
}
pub unsafe fn swapped_data_to_buf<T>(data: &T) -> &[u8] {
std::slice::from_raw_parts(data as *const T as *const u8, std::mem::size_of::<T>())
}
fn hex_nibble(c: u8) -> Result<u8, Error> {
Ok(match c {
b'0'..=b'9' => c - b'0',
b'a'..=b'f' => c - b'a' + 0xa,
b'A'..=b'F' => c - b'A' + 0xa,
_ => bail!("not a hex digit: {}", c as char),
})
}
#[inline]
pub fn parse_hex_digest<T: AsRef<[u8]>>(hex: T) -> Result<[u8; 32], Error> {
let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() };
let hex = hex.as_ref();
if hex.len() != 64 {
bail!(
"invalid hex digest ({} instead of 64 digits long)",
hex.len()
);
}
for i in 0..32 {
digest[i] = (hex_nibble(hex[i * 2])? << 4) + hex_nibble(hex[i * 2 + 1])?;
}
Ok(digest)
}

View File

@ -0,0 +1,135 @@
use std::borrow::Borrow;
use endian_trait::Endian;
use failure::*;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum IndexType {
Fixed,
Dynamic,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct FixedChunk(pub [u8; 32]);
impl FixedChunk {
pub fn new(hash: [u8; 32]) -> Self {
Self(hash)
}
pub fn from_hex<T: AsRef<[u8]>>(hex: T) -> Result<Self, Error> {
Ok(Self::new(crate::tools::parse_hex_digest(hex.as_ref())?))
}
pub fn from_data(data: &[u8]) -> Self {
let mut hasher = openssl::sha::Sha256::new();
hasher.update(data);
Self::new(hasher.finish())
}
pub fn digest_to_hex(&self) -> String {
crate::tools::digest_to_hex(&self.0)
}
}
impl Endian for FixedChunk {
fn to_be(self) -> Self {
self
}
fn to_le(self) -> Self {
self
}
fn from_be(self) -> Self {
self
}
fn from_le(self) -> Self {
self
}
}
#[derive(Clone, Copy, Debug, Hash)]
#[repr(C, packed)]
pub struct ChunkEntry {
pub hash: [u8; 32],
pub size: u64,
}
impl ChunkEntry {
pub fn new(hash: [u8; 32], size: u64) -> Self {
Self { hash, size }
}
pub fn from_hex<T: AsRef<[u8]>>(hex: T, size: u64) -> Result<Self, Error> {
Ok(Self::new(
crate::tools::parse_hex_digest(hex.as_ref())?,
size,
))
}
pub fn len(&self) -> u64 {
self.size
}
pub fn from_data(data: &[u8]) -> Self {
let mut hasher = openssl::sha::Sha256::new();
hasher.update(data);
Self::new(hasher.finish(), data.len() as u64)
}
pub fn digest_to_hex(&self) -> String {
crate::tools::digest_to_hex(&self.hash)
}
pub fn to_fixed(&self) -> FixedChunk {
FixedChunk(self.hash)
}
}
impl PartialEq for ChunkEntry {
fn eq(&self, other: &Self) -> bool {
self.size == other.size && self.hash == other.hash
}
}
impl Eq for ChunkEntry {}
impl Endian for ChunkEntry {
fn to_be(self) -> Self {
self.size.to_be();
self
}
fn to_le(self) -> Self {
self.size.to_le();
self
}
fn from_be(self) -> Self {
self.size.from_be();
self
}
fn from_le(self) -> Self {
self.size.from_le();
self
}
}
impl Into<FixedChunk> for ChunkEntry {
fn into(self) -> FixedChunk {
FixedChunk(self.hash)
}
}
impl Borrow<FixedChunk> for ChunkEntry {
fn borrow(&self) -> &FixedChunk {
unsafe { std::mem::transmute(&self.hash) }
}
}
impl Borrow<FixedChunk> for [u8; 32] {
fn borrow(&self) -> &FixedChunk {
unsafe { std::mem::transmute(self) }
}
}