diff --git a/proxmox-protocol/Cargo.toml b/proxmox-protocol/Cargo.toml new file mode 100644 index 00000000..e214533c --- /dev/null +++ b/proxmox-protocol/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "proxmox-protocol" +version = "0.1.0" +authors = [ + "Dietmar Maurer ", + "Wolfgang Bumiller ", +] +edition = "2018" + +[dependencies] +chrono = "0.4" +endian_trait = "0.6" +failure = "0.1" +openssl = "0.10" diff --git a/proxmox-protocol/src/c_client.rs b/proxmox-protocol/src/c_client.rs new file mode 100644 index 00000000..be4c630d --- /dev/null +++ b/proxmox-protocol/src/c_client.rs @@ -0,0 +1,403 @@ +//! For the C API we need to provide a `Client` compatible with C. In rust `Client` takes a +//! `T: io::Read + io::Write`, so we need to provide a way for C to provide callbacks to us to +//! implement this. + +use std::ffi::{CStr, CString}; +use std::io; +use std::os::raw::{c_char, c_int, c_ulong, c_void}; + +use failure::{bail, format_err, Error}; + +/// Read callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`, +/// the rest are the usual read function parameters. This should return the number of bytes +/// actually read, zero on EOF, or a negative `errno` value on error (eg. `-EAGAIN`). +pub type ReadFn = extern "C" fn(opaque: *mut c_void, buf: *mut u8, size: u64) -> i64; + +/// Write callback. The first parameter is the `opaque` parameter passed to `proxmox_backup_new`, +/// the rest are the usual write function parameters. This should return the number of bytes +/// actually written, or a negative `errno` value on error (eg. `-EAGAIN`). +pub type WriteFn = extern "C" fn(opaque: *mut c_void, buf: *const u8, size: u64) -> i64; + +/// Stores the external C callbacks for communicating with the protocol socket. +struct CApiSocket { + opaque: *mut c_void, + read: ReadFn, + write: WriteFn, +} + +/// A client instance using C callbacks for reading from and writing to the protocol socket. +pub struct CClient { + client: crate::Client, + error: Option, + upload: Option<(*const u8, usize)>, +} + +impl CClient { + fn set_error(&mut self, err: Error) -> c_int { + self.error = Some(match CString::new(err.to_string()) { + Ok(cs) => cs, + Err(_) => CString::new("").unwrap(), + }); + -1 + } + + #[inline(always)] + fn bool_result(&mut self, res: Result) -> c_int { + match res { + Ok(false) => 0, + Ok(true) => 1, + Err(e) => self.set_error(e), + } + } + + #[inline(always)] + fn bool_call(&mut self, func: F) -> c_int + where + F: FnOnce(&mut crate::Client) -> Result, + { + let res = func(&mut self.client); + self.bool_result(res) + } + + #[inline(always)] + fn int_call(&mut self, func: F) -> c_int + where + F: FnOnce(&mut crate::Client) -> Result, + { + match func(&mut self.client) { + Ok(v) => v, + Err(e) => self.set_error(e.into()), + } + } +} + +impl io::Read for CApiSocket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let rc = (self.read)(self.opaque, buf.as_mut_ptr(), buf.len() as u64); + if rc < 0 { + Err(io::Error::from_raw_os_error((-rc) as i32)) + } else { + Ok(rc as usize) + } + } +} + +impl io::Write for CApiSocket { + fn write(&mut self, buf: &[u8]) -> io::Result { + let rc = (self.write)(self.opaque, buf.as_ptr(), buf.len() as u64); + if rc < 0 { + Err(io::Error::from_raw_os_error((-rc) as i32)) + } else { + Ok(rc as usize) + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +/// Creates a new instance of a backup protocol client. +/// +/// # Arguments +/// +/// * `opaque` - An opaque pointer passed to the two provided callback methods. +/// * `read` - The read callback. +/// * `write` - The write callback. +#[no_mangle] +pub extern "C" fn proxmox_backup_new( + opaque: *mut c_void, + read: ReadFn, + write: WriteFn, +) -> *mut CClient { + Box::leak(Box::new(CClient { + client: crate::Client::new(CApiSocket { + opaque, + read, + write, + }), + error: None, + upload: None, + })) +} + +/// Drops an instance of a backup protocol client. The pointer must be valid or `NULL`. +#[no_mangle] +pub extern "C" fn proxmox_backup_done(me: *mut CClient) { + if !me.is_null() { + unsafe { + Box::from_raw(me); + } + } +} + +/// Returns a C String describing the last error or `NULL` if there was none. +#[no_mangle] +pub extern "C" fn proxmox_backup_get_error(me: *const CClient) -> *const c_char { + let me = unsafe { &*me }; + match me.error { + Some(ref e) => e.as_ptr(), + None => std::ptr::null(), + } +} + +/// Returns true if the `read` callback had previously returned `EOF`. +#[no_mangle] +pub extern "C" fn proxmox_backup_is_eof(me: *const CClient) -> bool { + let me = unsafe { &*me }; + me.client.eof() +} + +/// The data polling methods usually pass errors from the callbacks through to the original caller. +/// Since the protocol needs to be non-blocking-IO safe and therefore able to resumine at any point +/// where `-EAGAIN` can be returned by the callbacks, it is up to the caller which errors are to be +/// considered fatal, but any error returned by callbacks which is not `-EAGAIN` will result in an +/// internal error flag to be set which has to be cleared before trying to resume normal +/// operations. +#[no_mangle] +pub extern "C" fn proxmox_backup_clear_err(me: *mut CClient) { + let me = unsafe { &mut *me }; + me.client.clear_err(); + me.error = None; +} + +/// Polls for data and checks whether the protocol handshake has been made successfully. +/// Returns `1` if the handshake was successful, `0` if it is not yet complete or `-1` on error. +#[no_mangle] +pub extern "C" fn proxmox_backup_wait_for_handshake(me: *mut CClient) -> c_int { + let me = unsafe { &mut *me }; + me.bool_call(move |c| c.wait_for_handshake()) +} + +fn check_string(s: *const c_char) -> Result<&'static str, Error> { + if s.is_null() { + bail!("NULL string"); + } + Ok(std::str::from_utf8(unsafe { + CStr::from_ptr(s).to_bytes() + })?) +} + +/// Request the list of hashes for a backup file in order to prevent duplicates from being sent to +/// the server. This simply causes an internal list to be filled. Only one such operation can be +/// performed simultaneously. To wait for its completion see `proxmox_backup_wait_for_hashes`. +/// +/// If the file name is `NULL` or not a valid UTF-8 string, this function returns an error without +/// putting the protocol handler in an error state. +/// +/// Returns `0` on success, `-1` otherwise. +#[no_mangle] +pub extern "C" fn proxmox_backup_query_hashes(me: *mut CClient, file_name: *const c_char) -> c_int { + let me = unsafe { &mut *me }; + + me.int_call(move |client| { + let file_name = check_string(file_name)?; + client.query_hashes(file_name)?; + Ok(0) + }) +} + +/// If there is an ongoing hash list request, this will poll the data stream. +/// +/// Returns `1` if the transfer is complete (or there was no transfer to begin with), `0` if it is +/// incomplete, or `-1` if an error occurred. +#[no_mangle] +pub extern "C" fn proxmox_backup_wait_for_hashes(me: *mut CClient) -> c_int { + let me = unsafe { &mut *me }; + me.bool_call(move |c| c.wait_for_hashes()) +} + +/// Check if a chunk of the provided digest is known to the this client instance. Note that this +/// does not query the server for this information, and is only useful after a call to +/// `proxmox_backup_query_hashes` or after uploading something. +#[no_mangle] +pub extern "C" fn proxmox_backup_is_chunk_available(me: *const CClient, digest: *const u8) -> bool { + let me = unsafe { &*me }; + let digest = unsafe { &*(digest as *const [u8; 32]) }; + me.client.is_chunk_available(digest) +} + +/// Begin uploading a chunk to the server. This attempts to upload the data right away, but if the +/// writer may fail due to non-blocking I/O in which case the `proxmox_backup_continue_upload` +/// function must be used. +/// +/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately, +/// or `-1` on error. +/// +/// The ID returned on success can be used to wait for the server to acknowledge that the chunk has +/// been written successfully. Use `proxmox_backup_wait_for_id` to do this. If confirmation is not +/// required, the ID should be released via `proxmox_backup_discard_id`. +#[no_mangle] +pub extern "C" fn proxmox_backup_upload_chunk( + me: *mut CClient, + digest: *const u8, + data: *const u8, + size: u64, +) -> c_int { + let me = unsafe { &mut *me }; + let digest: &[u8; 32] = unsafe { &*(digest as *const [u8; 32]) }; + let size = size as usize; + let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, size) }; + match me.client.upload_chunk(digest, slice) { + Ok(Some(id)) => id.0 as c_int, + Ok(None) => { + me.upload = Some((data, size)); + 0 + } + Err(e) => me.set_error(e), + } +} + +/// If an upload did not finish immediately (`proxmox_backup_upload_chunk` returned `0`), this +/// function must be used to retry sending the rest of the data. +/// +/// Returns `0` if the upload is incomplete, a positive ID if the upload was completed immediately, +/// or `-1` on error. +#[no_mangle] +pub extern "C" fn proxmox_backup_continue_upload(me: *mut CClient) -> c_int { + let me = unsafe { &mut *me }; + match me.upload { + Some((data, len)) => { + let slice: &[u8] = unsafe { std::slice::from_raw_parts(data, len) }; + match me.client.continue_upload_chunk(slice) { + Ok(Some(id)) => id.0 as c_int, + Ok(None) => 0, + Err(e) => me.set_error(e), + } + } + None => me.set_error(format_err!("no upload currently running")), + } +} + +/// Run the main receive loop. Returns `0` on success, `-1` on error. +#[no_mangle] +pub extern "C" fn proxmox_backup_poll_read(me: *mut CClient) -> c_int { + let me = unsafe { &mut *me }; + match me.client.poll_read() { + Ok(_) => 0, + Err(e) => me.set_error(e), + } +} + +/// Run the main send loop. If the `write` callback returned `-EAGAIN`, during an operation, the +/// protocol handler keeps the data to be sent in a write queue. This function will attempt to +/// continue writing out the remaining data. See individual function descriptions for when this is +/// necessary. +/// +/// Returns `1` if the queue is now empty, `0` if there is still data in the queue, or `-1` on +/// error. +#[no_mangle] +pub extern "C" fn proxmox_backup_poll_send(me: *mut CClient) -> c_int { + let me = unsafe { &mut *me }; + me.bool_call(move |c| Ok(c.poll_send()?.unwrap_or(true))) +} + +/// Run the main receive loop and check for confirmation of a stream with the specified ID. +/// +/// Returns `1` if the transaction was confirmed, `0` if not, or `-1` on error. +/// +/// Note that once this function returned `1` for an ID, the id is considered to be free for +/// recycling and should not be used for further calls. +#[no_mangle] +pub extern "C" fn proxmox_backup_wait_for_id(me: *mut CClient, id: c_int) -> c_int { + let me = unsafe { &mut *me }; + me.bool_call(move |c| c.wait_for_id(crate::StreamId(id as u8))) +} + +/// Notifies the protocol handler that we do not bother waiting for confirmation of an ID. The ID +/// may immediately be recycled for future transactions, thus the user should not use it for any +/// further function calls. +/// +/// Returns `0` on success, `-1` on error. +#[no_mangle] +pub extern "C" fn proxmox_backup_discard_id(me: *mut CClient, id: c_int) -> c_int { + let me = unsafe { &mut *me }; + match me.client.discard_id(crate::StreamId(id as u8)) { + Ok(_) => 0, + Err(e) => me.set_error(e), + } +} + +/// Create a new backup. The returned ID should be waited upon via `proxmox_backup_wait_for_id`, +/// which returns true once the server confirmed the creation of the backup. +#[no_mangle] +pub extern "C" fn proxmox_backup_create( + me: *mut CClient, + dynamic: bool, + backup_type: *const c_char, // "host", "ct", "vm" + backup_id: *const c_char, + time_epoch: i64, + file_name: *const c_char, + chunk_size: c_ulong, + file_size: i64, + is_new: bool, +) -> c_int { + let me = unsafe { &mut *me }; + me.int_call(move |client| { + let index_type = match dynamic { + false => crate::IndexType::Fixed, + _ => crate::IndexType::Dynamic, + }; + + let backup_type = check_string(backup_type)?; + let backup_id = check_string(backup_id)?; + let file_name = check_string(file_name)?; + + Ok(client + .create_backup( + index_type, + backup_type, + backup_id, + time_epoch, + file_name, + chunk_size as _, + if file_size < 0 { + None + } else { + Some(file_size as u64) + }, + is_new, + )? + .0 as c_int) + }) +} + +/// Send a dynamic chunk entry. +/// +/// If the entry was sent out successfully this returns `1`. If the `write` callback returned +/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send` +/// should be used to continue sending the data. +/// On error `-1` is returned. +#[no_mangle] +pub extern "C" fn proxmox_backup_dynamic_data( + me: *mut CClient, + stream: c_int, + digest: *const [u8; 32], + size: u64, +) -> c_int { + let me = unsafe { &mut *me }; + me.bool_call(move |client| { + client.dynamic_data(crate::BackupStream(stream as u8), unsafe { &*digest }, size) + }) +} + +/// Send a fixed chunk entry. +/// +/// If the entry was sent out successfully this returns `1`. If the `write` callback returned +/// `-EAGAIN` this returns `0` and the data is queued, after which `proxmox_backup_poll_send` +/// should be used to continue sending the data. +/// On error `-1` is returned. +#[no_mangle] +pub extern "C" fn proxmox_backup_fixed_data( + me: *mut CClient, + stream: c_int, + index: c_ulong, + digest: *const [u8; 32], +) -> c_int { + let me = unsafe { &mut *me }; + me.bool_call(move |client| { + client.fixed_data(crate::BackupStream(stream as u8), index as usize, unsafe { + &*digest + }) + }) +} diff --git a/proxmox-protocol/src/chunk_stream.rs b/proxmox-protocol/src/chunk_stream.rs new file mode 100644 index 00000000..4502b90b --- /dev/null +++ b/proxmox-protocol/src/chunk_stream.rs @@ -0,0 +1,118 @@ +use std::io::Read; + +use failure::Error; + +use crate::Chunker; + +pub struct ChunkStream { + input: T, + buffer: Vec, + fill: usize, + pos: usize, + keep: bool, + eof: bool, + chunker: Chunker, +} + +impl ChunkStream { + pub fn new(input: T) -> Self { + Self { + input, + buffer: Vec::new(), + fill: 0, + pos: 0, + keep: false, + eof: false, + chunker: Chunker::new(4 * 1024 * 1024), + } + } + + pub fn stream(&mut self) -> &mut Self { + self + } + + fn fill_buf(&mut self) -> Result { + if self.fill == self.buffer.len() { + let mut more = self.buffer.len(); // just double it + if more == 0 { + more = 1024 * 1024; // at the start, make a 1M buffer + } + // we need more data: + self.buffer.reserve(more); + unsafe { + self.buffer.set_len(self.buffer.capacity()); + } + } + + match self.input.read(&mut self.buffer[self.fill..]) { + Ok(more) => { + if more == 0 { + self.eof = true; + } + self.fill += more; + Ok(true) + } + Err(err) => { + if err.kind() == std::io::ErrorKind::WouldBlock { + Ok(false) + } else { + Err(err.into()) + } + } + } + } + + fn consume(&mut self) { + assert!(self.fill >= self.pos); + + let remaining = self.fill - self.pos; + unsafe { + std::ptr::copy_nonoverlapping( + &self.buffer[self.pos] as *const u8, + self.buffer.as_mut_ptr(), + remaining, + ); + } + self.fill = remaining; + self.pos = 0; + } + + pub fn next(&mut self) { + self.keep = false; + } + + // This crate should not depend on the futures create, so we use another Option instead of + // Async. + pub fn get(&mut self) -> Result>, Error> { + if self.keep { + return Ok(Some(Some(&self.buffer[0..self.pos]))); + } + + if self.eof { + return Ok(Some(None)); + } + + if self.pos != 0 { + self.consume(); + } + + loop { + match self.fill_buf() { + Ok(true) => (), + Ok(false) => return Ok(None), + Err(err) => return Err(err), + } + + // Note that if we hit EOF we hit a hard boundary... + let boundary = self.chunker.scan(&self.buffer[self.pos..self.fill]); + if boundary == 0 && !self.eof { + self.pos = self.fill; + continue; + } + + self.pos += boundary; + self.keep = true; + return Ok(Some(Some(&self.buffer[0..self.pos]))); + } + } +} diff --git a/proxmox-protocol/src/chunker.rs b/proxmox-protocol/src/chunker.rs new file mode 100644 index 00000000..48841b33 --- /dev/null +++ b/proxmox-protocol/src/chunker.rs @@ -0,0 +1,4 @@ +include!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/../src/backup/chunker.rs" +)); diff --git a/proxmox-protocol/src/client.rs b/proxmox-protocol/src/client.rs new file mode 100644 index 00000000..40cfec6e --- /dev/null +++ b/proxmox-protocol/src/client.rs @@ -0,0 +1,596 @@ +use std::borrow::Borrow; +use std::collections::hash_map; +use std::collections::{HashMap, HashSet}; +use std::io::{Read, Write}; +use std::mem; +use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use endian_trait::Endian; +use failure::*; + +use crate::common; +use crate::protocol::*; +use crate::tools::swapped_data_to_buf; +use crate::{ChunkEntry, FixedChunk, IndexType}; + +#[derive(Clone, Copy, Eq, PartialEq)] +#[repr(transparent)] +pub struct BackupStream(pub(crate) u8); + +#[derive(Clone, Copy, Eq, PartialEq)] +#[repr(transparent)] +pub struct StreamId(pub(crate) u8); + +impl From for StreamId { + fn from(v: BackupStream) -> Self { + Self(v.0) + } +} + +struct BackupStreamData { + id: u8, + index_type: IndexType, + pos: u64, + path: Option, +} + +pub enum AckState { + Waiting, // no ack received yet. + Received, // already received an ack, but the user hasn't seen it yet. + Ignore, // client doesn't care. + AwaitingData, // waiting for something other than an 'Ok' packet +} + +pub struct Client +where + S: Read + Write, +{ + chunks: RwLock>, + common: common::Connection, + handshake_done: bool, + + cur_id: u8, + free_ids: Vec, + waiting_ids: HashMap, + hash_download: Option, + + upload_chunk: Option, + upload_id: u8, + upload_pos: usize, + upload_state: u8, + + streams: HashMap, +} + +type Result = std::result::Result; + +impl Client +where + S: Read + Write, +{ + pub fn new(socket: S) -> Self { + Self { + chunks: RwLock::new(HashSet::new()), + common: common::Connection::new(socket), + handshake_done: false, + + cur_id: 1, + free_ids: Vec::new(), + waiting_ids: HashMap::new(), + hash_download: None, + + upload_state: 0, + upload_pos: 0, + upload_id: 0, + upload_chunk: None, + + streams: HashMap::new(), + } + } + + pub fn eof(&self) -> bool { + self.common.eof + } + + pub fn error(&self) -> bool { + self.common.error + } + + /// It is safe to clear the error after an `io::ErrorKind::Interrupted`. + pub fn clear_err(&mut self) { + self.common.clear_err() + } + + pub fn wait_for_handshake(&mut self) -> Result { + if !self.handshake_done { + self.poll_read()?; + } + Ok(self.handshake_done) + } + + pub fn query_hashes(&mut self, file_name: &str) -> Result<()> { + if self.hash_download.is_some() { + bail!("hash query already in progress"); + } + + let id = self.next_id()?; + let mut packet = Packet::builder(id, PacketType::GetHashList); + packet + .write_data(client::GetHashList { + name_length: file_name.len() as u16, + }) + .write_buf(file_name.as_bytes()); + self.common.queue_data(packet.finish())?; + self.hash_download = Some(id); + Ok(()) + } + + pub fn wait_for_hashes(&mut self) -> Result { + if self.hash_download.is_some() { + self.poll_read()?; + } + Ok(self.hash_download.is_none()) + } + + fn chunk_read_lock(&self) -> Result>> { + self.chunks + .read() + .map_err(|_| format_err!("lock poisoned, disconnecting client...")) + } + + pub fn is_chunk_available>(&self, chunk: &T) -> bool { + self.chunk_read_lock().unwrap().contains(chunk.borrow()) + } + + /// Attempts to upload a chunk. Returns an error state only on fatal errors. If the underlying + /// writer returns a `WouldBlock` error this returns `None` and `continue_upload_chunk` has to + /// be called until the chunk is uploaded completely. During this time no other operations + /// should be performed on the this object! + /// See `continue_upload_chunk()` for a description of the returned value. + pub fn upload_chunk(&mut self, info: &T, data: &[u8]) -> Result> + where + T: Borrow, + { + if self.upload_chunk.is_some() { + bail!("cannot concurrently upload multiple chunks"); + } + + self.upload_id = self.next_id()?; + self.upload_chunk = Some(info.borrow().clone()); + self.next_upload_state(0); + self.continue_upload_chunk(data) + } + + fn next_upload_state(&mut self, state: u8) { + self.upload_state = state; + self.upload_pos = 0; + } + + // This is split into a static method not borrowing self so the buffer can point into self... + fn do_upload_write( + writer: &mut common::Connection, + buf: &[u8], + ) -> Result> { + match writer.write_some(buf) { + Ok(put) => Ok(Some((put, put == buf.len()))), + Err(e) => { + if e.kind() == std::io::ErrorKind::WouldBlock { + Ok(None) + } else { + Err(e.into()) + } + } + } + } + + fn after_upload_write(&mut self, put: Option<(usize, bool)>, next_state: u8) -> bool { + match put { + None => return false, + Some((put, done)) => { + if done { + self.next_upload_state(next_state); + } else { + self.upload_pos += put; + } + done + } + } + } + + fn upload_write(&mut self, buf: &[u8], next_state: u8) -> Result { + let wrote = Self::do_upload_write(&mut self.common, buf)?; + Ok(self.after_upload_write(wrote, next_state)) + } + + /// If an `upload_chunk()` call returned `Ok(false)` this needs to be used to complete the + /// upload process as the chunk may have already been partially written to the socket. + /// This function will return `Ok(false)` on `WouldBlock` errors just like `upload_chunk()` + /// will, after which the caller should wait for the writer to become write-ready and then + /// call this method again. + /// Once the complete chunk packet has been written to the underlying socket, this returns a + /// packet ID which can be waited upon for completion + pub fn continue_upload_chunk(&mut self, data: &[u8]) -> Result> { + loop { + match self.upload_state { + // Writing the packet header: + 0 => { + let len = mem::size_of::() + + mem::size_of::() + + data.len(); + let packet = Packet { + id: self.upload_id, + pkttype: PacketType::UploadChunk as _, + length: len as _, + } + .to_le(); + let buf = unsafe { swapped_data_to_buf(&packet) }; + if !self.upload_write(&buf[self.upload_pos..], 1)? { + return Ok(None); + } + } + // Writing the hash: + 1 => { + let chunk = self.upload_chunk.as_ref().unwrap(); + let buf = &chunk.0[self.upload_pos..]; + let wrote = Self::do_upload_write(&mut self.common, buf)?; + if !self.after_upload_write(wrote, 2) { + return Ok(None); + } + } + // Writing the data: + 2 => { + if !self.upload_write(&data[self.upload_pos..], 3)? { + return Ok(None); + } + } + // Done: + 3 => { + self.upload_chunk = None; + self.expect_ok_for_id(self.upload_id); + return Ok(Some(StreamId(self.upload_id))); + } + n => bail!("bad chunk upload state: {}", n), + } + } + } + + // generic data polling method + pub fn poll_read(&mut self) -> Result<()> { + if self.common.eof { + // polls after EOF are errors: + bail!("server disconnected"); + } + + if !self.common.poll_read()? { + // On the client side we do not expect a server-side disconnect, so error out! + if self.common.eof { + bail!("server disconnected"); + } + return Ok(()); + } + + loop { + match self.common.current_packet_type { + PacketType::Ok => self.recv_ok()?, + PacketType::Hello => self.recv_hello()?, + PacketType::HashListPart => self.recv_hash_list()?, + PacketType::BackupCreated => self.backup_created()?, + _ => bail!( + "server sent an unexpected packet of type {}", + self.common.current_packet_type as u32, + ), + } + if !self.common.next()? { + break; + } + } + Ok(()) + } + + // None => nothing was queued + // Some(true) => queue finished + // Some(false) => queue not finished + pub fn poll_send(&mut self) -> Result> { + self.common.poll_send() + } + + // private helpermethods + + fn next_id(&mut self) -> Result { + if let Some(id) = self.free_ids.pop() { + return Ok(id); + } + if self.cur_id < 0xff { + self.cur_id += 1; + return Ok(self.cur_id - 1); + } + bail!("too many concurrent transactions"); + } + + fn free_id(&mut self, id: u8) { + self.free_ids.push(id); + } + + fn expect_ok_for_id(&mut self, id: u8) { + self.waiting_ids.insert(id, AckState::Waiting); + } + + fn expect_response_for_id(&mut self, id: u8) { + self.waiting_ids.insert(id, AckState::AwaitingData); + } + + // Methods handling received packets: + + fn recv_hello(&mut self) -> Result<()> { + let hello = self.common.read_unaligned_data::(0)?; + + if hello.magic != server::HELLO_MAGIC { + bail!("received an invalid hello packet"); + } + + let ver = hello.version; + if ver != server::PROTOCOL_VERSION { + bail!( + "hello packet contained incompatible protocol version: {} (!= {})", + ver, + server::PROTOCOL_VERSION, + ); + } + + self.handshake_done = true; + self.free_id(0); + Ok(()) + } + + fn chunk_write_lock(&self) -> Result>> { + self.chunks + .write() + .map_err(|_| format_err!("lock poisoned, disconnecting client...")) + } + + fn recv_hash_list(&mut self) -> Result<()> { + let data = self.common.packet_data(); + if data.len() == 0 { + // No more hashes, we're done + self.hash_download = None; + return Ok(()); + } + + if (data.len() % mem::size_of::()) != 0 { + bail!("hash list contains invalid size"); + } + + let chunk_list: &[FixedChunk] = unsafe { + std::slice::from_raw_parts( + data.as_ptr() as *const FixedChunk, + data.len() / mem::size_of::(), + ) + }; + + let mut my_chunks = self.chunk_write_lock()?; + for c in chunk_list { + eprintln!("Got chunk '{}'", c.digest_to_hex()); + my_chunks.insert(c.clone()); + } + + Ok(()) + } + + fn ack_id(&mut self, id: u8, data_packet: bool) -> Result<()> { + use hash_map::Entry::*; + + match self.waiting_ids.entry(id) { + Vacant(_) => bail!("received unexpected packet for transaction id {}", id), + Occupied(mut entry) => match entry.get() { + AckState::Ignore => { + entry.remove(); + } + AckState::Received => bail!("duplicate Ack received for transaction id {}", id), + AckState::Waiting => { + if data_packet { + bail!("received data packet while expecting simple Ok for {}", id); + } + *entry.get_mut() = AckState::Received; + } + AckState::AwaitingData => { + if !data_packet { + bail!( + "received empty Ok while waiting for data on stream id {}", + id + ); + } + *entry.get_mut() = AckState::Received; + } + }, + } + Ok(()) + } + + fn recv_ok(&mut self) -> Result<()> { + self.ack_id(self.common.current_packet.id, false) + } + + pub fn wait_for_id(&mut self, id: StreamId) -> Result { + if !self.waiting_ids.contains_key(&id.0) { + bail!("wait_for_id() called on unexpected id {}", id.0); + } + self.poll_read()?; + + use hash_map::Entry::*; + + match self.waiting_ids.entry(id.0) { + Vacant(_) => Ok(true), + Occupied(entry) => match entry.get() { + AckState::Received => { + entry.remove(); + Ok(true) + } + _ => Ok(false), + }, + } + } + + pub fn discard_id(&mut self, id: StreamId) -> Result<()> { + use hash_map::Entry::*; + match self.waiting_ids.entry(id.0) { + Vacant(_) => bail!("discard_id called with unknown id {}", id.0), + Occupied(mut entry) => match entry.get() { + AckState::Ignore => (), + AckState::Received => { + entry.remove(); + } + AckState::Waiting | AckState::AwaitingData => { + *entry.get_mut() = AckState::Ignore; + } + }, + } + Ok(()) + } + + pub fn create_backup( + &mut self, + index_type: IndexType, + backup_type: &str, + id: &str, + timestamp: i64, + file_name: &str, + chunk_size: usize, + file_size: Option, + is_new: bool, + ) -> Result { + let backup_type = backup_type::name_to_id(backup_type)?; + + if id.len() > 0xff { + bail!("id too long"); + } + + if file_name.len() > 0xff { + bail!("file name too long"); + } + + let mut flags: backup_flags::Type = 0; + if is_new { + flags |= backup_flags::EXCL; + } + if index_type == IndexType::Dynamic { + flags |= backup_flags::DYNAMIC_CHUNKS; + if file_size.is_some() { + bail!("file size must be None on dynamic backup streams"); + } + } else if file_size.is_none() { + bail!("file size is mandatory for fixed backup streams"); + } + + let packet_id = self.next_id()?; + let mut packet = Packet::builder(packet_id, PacketType::CreateBackup); + packet + .write_data(client::CreateBackup { + backup_type, + id_length: id.len() as _, + timestamp: timestamp as u64, + flags, + name_length: file_name.len() as _, + chunk_size: chunk_size as _, + file_size: file_size.unwrap_or(0) as u64, + }) + .write_buf(id.as_bytes()) + .write_buf(file_name.as_bytes()); + + self.streams.insert( + packet_id, + BackupStreamData { + id: packet_id, + index_type, + pos: 0, + path: None, + }, + ); + + self.expect_response_for_id(packet_id); + self.common.queue_data(packet.finish())?; + Ok(BackupStream(packet_id)) + } + + fn backup_created(&mut self) -> Result<()> { + let info = self + .common + .read_unaligned_data::(0)?; + let data = &self.common.packet_data()[mem::size_of_val(&info)..]; + if data.len() != info.path_length as usize { + bail!("backup-created packet has invalid length"); + } + let name = std::str::from_utf8(data)?; + let pkt_id = self.common.current_packet.id; + self.streams + .get_mut(&pkt_id) + .ok_or_else(|| format_err!("BackupCreated response for invalid stream: {}", pkt_id))? + .path = Some(name.to_string()); + self.ack_id(pkt_id, true)?; + Ok(()) + } + + pub fn dynamic_chunk(&mut self, stream: BackupStream, entry: &ChunkEntry) -> Result { + self.dynamic_data(stream, &entry.hash, entry.size) + } + + pub fn dynamic_data>( + &mut self, + stream: BackupStream, + digest: &T, + size: u64, + ) -> Result { + let data = self + .streams + .get_mut(&stream.0) + .ok_or_else(|| format_err!("no such active backup stream"))?; + + if data.index_type != IndexType::Dynamic { + bail!("dynamic_data called for stream of static chunks"); + } + + let mut packet = Packet::builder(data.id, PacketType::BackupDataDynamic); + packet + .write_data(data.pos as u64) + .write_buf(&digest.borrow().0); + data.pos += size; + + self.common.queue_data(packet.finish()) + } + + pub fn fixed_data>( + &mut self, + stream: BackupStream, + index: usize, + digest: &T, + ) -> Result { + let data = self + .streams + .get_mut(&stream.0) + .ok_or_else(|| format_err!("no such active backup stream"))?; + + if data.index_type != IndexType::Fixed { + bail!("fixed_data called for stream of dynamic chunks"); + } + + let mut packet = Packet::builder(data.id, PacketType::BackupDataFixed); + packet + .write_data(index as u64) + .write_buf(&digest.borrow().0); + + self.common.queue_data(packet.finish()) + } + + pub fn finish_backup(&mut self, stream: BackupStream) -> Result<(StreamId, String, bool)> { + let path = self + .streams + .remove(&stream.0) + .ok_or_else(|| format_err!("no such active backup stream"))? + .path + .unwrap_or_else(|| "".to_string()); + let ack = self + .common + .queue_data(Packet::simple(stream.0, PacketType::BackupFinished))?; + self.expect_ok_for_id(stream.0); + Ok((StreamId(stream.0), path, ack)) + } +} diff --git a/proxmox-protocol/src/common.rs b/proxmox-protocol/src/common.rs new file mode 100644 index 00000000..8c67f41d --- /dev/null +++ b/proxmox-protocol/src/common.rs @@ -0,0 +1,282 @@ +use std::io::{self, Read, Write}; +use std::mem; +use std::ptr; + +use failure::*; + +use endian_trait::Endian; + +use crate::protocol::*; + +type Result = std::result::Result; + +pub(crate) struct Connection +where + S: Read + Write, +{ + socket: S, + pub buffer: Vec, + pub current_packet: Packet, + pub current_packet_type: PacketType, + pub error: bool, + pub eof: bool, + upload_queue: Option<(Vec, usize)>, +} + +impl Connection +where + S: Read + Write, +{ + pub fn new(socket: S) -> Self { + Self { + socket, + buffer: Vec::new(), + current_packet: unsafe { mem::zeroed() }, + current_packet_type: PacketType::Error, + error: false, + eof: false, + upload_queue: None, + } + } + + pub fn write_some(&mut self, buf: &[u8]) -> std::io::Result { + self.socket.write(buf) + } + + /// It is safe to clear the error after an `io::ErrorKind::Interrupted`. + pub fn clear_err(&mut self) { + self.error = false; + } + + // None => nothing was queued + // Some(true) => queue finished + // Some(false) => queue not finished + pub fn poll_send(&mut self) -> Result> { + if let Some((ref data, ref mut pos)) = self.upload_queue { + loop { + match self.socket.write(&data[*pos..]) { + Ok(put) => { + *pos += put; + if *pos == data.len() { + self.upload_queue = None; + return Ok(Some(true)); + } + // Keep writing + continue; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + return Ok(Some(false)); + } + Err(e) => return Err(e.into()), + } + } + } else { + Ok(None) + } + } + + // Returns true when the data was also sent out, false if the queue is now full. + // For now we only allow a single dataset to be queued at once. + pub fn queue_data(&mut self, buf: Vec) -> Result { + if self.upload_queue.is_some() { + bail!("upload queue clash"); + } + + self.upload_queue = Some((buf, 0)); + + match self.poll_send()? { + None => unreachable!(), // We literally just set self.upload_queue to Some(value) + Some(v) => Ok(v), + } + } + + // Returns 'true' if there's data available, 'false' if there isn't (if the + // underlying reader returned `WouldBlock` or the `read()` was short). + // Other errors are propagated. + pub fn poll_read(&mut self) -> Result { + if self.eof { + return Ok(false); + } + + if self.error { + eprintln!("refusing to read from a client in error state"); + bail!("client is in error state"); + } + + match self.poll_data_do() { + Ok(has_packet) => Ok(has_packet), + Err(e) => { + // To support AsyncRead/AsyncWrite we do not enter a failed + // state when we read from a non-blocking source which fails + // with WouldBlock. + if let Some(ioe) = e.downcast_ref::() { + if ioe.kind() == io::ErrorKind::WouldBlock { + return Ok(false); + } + } + self.error = true; + Err(e) + } + } + } + + fn poll_data_do(&mut self) -> Result { + if !self.read_packet()? { + return Ok(false); + } + + if self.current_packet.length > MAX_PACKET_SIZE { + bail!("client tried to send a huge packet"); + } + + if !self.fill_packet()? { + return Ok(false); + } + + Ok(true) + } + + pub fn packet_length(&self) -> usize { + self.current_packet.length as usize + } + + pub fn packet_data(&self) -> &[u8] { + let beg = mem::size_of::(); + let end = self.packet_length(); + &self.buffer[beg..end] + } + + pub fn next(&mut self) -> Result { + let pktlen = self.packet_length(); + unsafe { + if self.buffer.len() != pktlen { + std::ptr::copy_nonoverlapping( + &self.buffer[pktlen], + &mut self.buffer[0], + self.buffer.len() - pktlen, + ); + } + self.buffer.set_len(self.buffer.len() - pktlen); + } + self.poll_data_do() + } + + // NOTE: After calling this you must `self.buffer.set_len()` when done! + #[must_use] + fn buffer_set_min_size(&mut self, size: usize) -> usize { + if self.buffer.capacity() < size { + self.buffer.reserve(size - self.buffer.len()); + } + let start = self.buffer.len(); + unsafe { + self.buffer.set_len(size); + } + start + } + + fn fill_buffer(&mut self, size: usize) -> Result { + if self.buffer.len() >= size { + return Ok(true); + } + let mut filled = self.buffer_set_min_size(size); + loop { + // We don't use read_exact to not block too long or busy-read on nonblocking sockets... + match self.socket.read(&mut self.buffer[filled..]) { + Ok(got) => { + if got == 0 { + self.eof = true; + unsafe { + self.buffer.set_len(filled); + } + return Ok(false); + } + filled += got; + if filled >= size { + unsafe { + self.buffer.set_len(filled); + } + return Ok(true); + } + // reloop + } + Err(e) => { + unsafe { + self.buffer.set_len(filled); + } + return Err(e.into()); + } + } + } + } + + fn read_packet_do(&mut self) -> Result { + if !self.fill_buffer(mem::size_of::())? { + return Ok(false); + } + + self.current_packet = self.read_unaligned::(0)?.from_le(); + + self.current_packet_type = match PacketType::try_from(self.current_packet.pkttype) { + Some(t) => t, + None => bail!("unexpected packet type"), + }; + + let length = self.current_packet.length; + if (length as usize) < mem::size_of::() { + bail!("received packet of bad length ({})", length); + } + + Ok(true) + } + + fn read_packet(&mut self) -> Result { + match self.read_packet_do() { + Ok(b) => Ok(b), + Err(e) => { + if let Some(ioe) = e.downcast_ref::() { + if ioe.kind() == io::ErrorKind::WouldBlock { + return Ok(false); + } + } + Err(e) + } + } + } + + fn read_unaligned(&self, offset: usize) -> Result { + if offset + mem::size_of::() > self.buffer.len() { + bail!("buffer underrun"); + } + Ok(unsafe { ptr::read_unaligned(&self.buffer[offset] as *const _ as *const T) }.from_le()) + } + + pub fn read_unaligned_data(&self, offset: usize) -> Result { + self.read_unaligned(offset + mem::size_of::()) + } + + fn fill_packet(&mut self) -> Result { + self.fill_buffer(self.current_packet.length as usize) + } + + // convenience helpers: + + pub fn assert_size(&self, size: usize) -> Result<()> { + if self.packet_data().len() != size { + bail!( + "protocol error: invalid packet size (type {})", + self.current_packet.pkttype, + ); + } + Ok(()) + } + + pub fn assert_atleast(&self, size: usize) -> Result<()> { + if self.packet_data().len() < size { + bail!( + "protocol error: invalid packet size (type {})", + self.current_packet.pkttype, + ); + } + Ok(()) + } +} diff --git a/proxmox-protocol/src/lib.rs b/proxmox-protocol/src/lib.rs new file mode 100644 index 00000000..8a2da6cf --- /dev/null +++ b/proxmox-protocol/src/lib.rs @@ -0,0 +1,19 @@ +pub(crate) mod common; + +pub mod protocol; +pub mod server; +pub mod tools; + +mod chunk_stream; +pub use chunk_stream::*; + +mod chunker; +pub use chunker::*; + +mod client; +pub use client::*; + +mod types; +pub use types::*; + +pub mod c_client; diff --git a/proxmox-protocol/src/protocol.rs b/proxmox-protocol/src/protocol.rs new file mode 100644 index 00000000..c1169f83 --- /dev/null +++ b/proxmox-protocol/src/protocol.rs @@ -0,0 +1,313 @@ +use std::mem; + +use endian_trait::Endian; + +// There's no reason to have more than that in a single packet... +pub const MAX_PACKET_SIZE: u32 = 16 * 1024 * 1024; + +// Each packet has a transaction ID (eg. when uploading multiple disks each +// upload is a separate stream). +#[derive(Endian)] +#[repr(C, packed)] +pub struct Packet { + pub id: u8, // request/command id + pub pkttype: u8, // packet type + pub length: u32, // data length before the next packet + + // content is attached directly afterwards +} + +impl Packet { + pub fn builder(id: u8, pkttype: PacketType) -> PacketBuilder { + PacketBuilder::new(id, pkttype) + } + + pub fn simple(id: u8, pkttype: PacketType) -> Vec { + Self::builder(id, pkttype).finish() + } +} + +#[derive(Endian, Clone, Copy)] +#[repr(u8)] +pub enum ErrorId { + Generic, + Busy, +} + +#[repr(u8)] +#[derive(Clone, Copy)] +pub enum PacketType { + /// First packet sent by the server. + Hello, + + /// Generic acknowledgement. + Ok, + + /// Error packet sent by the server, this cancels the request for which it is produced. + Error, + + /// The client wants the list of available hashes in order to know which ones require an + /// upload. + /// + /// The body should contain a backup file name of which to retrieve the hash list. + /// + /// Server responds with a sequence of ``HashListPart`` packets. + GetHashList, + + /// Array of hashes. The number of hashes in a packet is calculated from the packet length as + /// provided in the ``Packet`` struct. An empty packet indicates the end of the list. We send a + /// sequence of such packets because we don't know whether the server will be keeping the list + /// in memory yet, so it might not know the number in advance and may be iterating through + /// directories until it hits an end. It can produce the network packets asynchronously while + /// walking the chunk dir. + HashListPart, + + /// Client requests to download chunks via a hash list from the server. The number of chunks + /// can be derived from the length of this request, so it works similar to ``HashListPart``, + /// but there's only 1 chunk list per request ID. + /// + /// The server responds with a sequence of ``Chunk`` packets or ``Error``. + DownloadChunks, + + /// The response to ``DownloadChunks``. One packet per requested chunk. + Chunk, + + /// The upload of a chunk can happen independent from the ongoing backup + /// streams. Server responds with an ``OK``. + UploadChunk, + + /// Create a file in a new or existing backup. Contains all the metadata of + /// a file. + /// + /// The server responds with ``BackupCreated`` or ``Error``. On ``BackupCreated`` the client + /// may proceed to send as many ``BackupData...`` packets as necessary to fill the file. + /// The sequence is finished by the client with a ``BackupFinished``. + CreateBackup, + + /// Successful from the server to a client's ``CreateBackup`` packet. Contains the server side + /// path relative to the store. + BackupCreated, + + /// This packet contains an array of references to fixed sized chunks. Clients should upload + /// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent + /// chunk is an error. + /// + /// The server produces an ``Error`` packet in case of an error. + BackupDataFixed, + + /// This packet contains an array of references to dynamic sized chunks. Clients should upload + /// chunks via ``UploadChunk`` packets before using them in this type of packet. A non-existent + /// chunk is an error. + /// + /// The server produces an ``Error`` packet in case of an error. + BackupDataDynamic, + + /// This ends a backup file. The server responds with an ``OK`` or an ``Error`` packet. + BackupFinished, +} + +// Nightly has a std::convert::TryFrom, actually... +impl PacketType { + pub fn try_from(v: u8) -> Option { + if v <= PacketType::BackupFinished as u8 { + Some(unsafe { std::mem::transmute(v) }) + } else { + None + } + } +} + +// Not using bitflags! for Endian derive... +pub mod backup_flags { + pub type Type = u8; + /// The backup must not exist yet. + pub const EXCL: Type = 0x00000001; + /// The data represents a raw file + pub const RAW: Type = 0x00000002; + /// The data uses dynamically sized chunks (catar file) + pub const DYNAMIC_CHUNKS: Type = 0x00000004; +} + +pub mod backup_type { + pub type Type = u8; + pub const VM: Type = 0; + pub const CT: Type = 1; + pub const HOST: Type = 2; + + use failure::{bail, Error}; + pub fn id_to_name(id: Type) -> Result<&'static str, Error> { + Ok(match id { + VM => "vm", + CT => "ct", + HOST => "host", + n => bail!("unknown backup type id: {}", n), + }) + } + + pub fn name_to_id(id: &str) -> Result { + Ok(match id { + "vm" => VM, + "ct" => CT, + "host" => HOST, + n => bail!("unknown backup type name: {}", n), + }) + } +} + +#[repr(C, packed)] +pub struct DynamicChunk { + pub offset: u64, + pub digest: [u8; 32], +} + +pub mod server { + use endian_trait::Endian; + + pub const PROTOCOL_VERSION: u32 = 1; + + #[derive(Eq, PartialEq)] + #[repr(C, packed)] + pub struct HelloMagic([u8; 8]); + pub const HELLO_MAGIC: HelloMagic = HelloMagic(*b"PMXBCKUP"); + + pub const HELLO_VERSION: u32 = 1; // the current version + #[derive(Endian)] + #[repr(C, packed)] + pub struct Hello { + pub magic: HelloMagic, + pub version: u32, + } + + #[derive(Endian)] + #[repr(C, packed)] + pub struct Error { + pub id: u8, + } + + #[derive(Endian)] + #[repr(C, packed)] + pub struct Chunk { + pub hash: super::DynamicChunk, + // Data follows here... + } + + impl Endian for HelloMagic { + fn to_be(self) -> Self { + self + } + fn to_le(self) -> Self { + self + } + fn from_be(self) -> Self { + self + } + fn from_le(self) -> Self { + self + } + } + + #[derive(Endian)] + #[repr(C, packed)] + pub struct BackupCreated { + pub path_length: u16, + // path follows here + } +} + +pub mod client { + use endian_trait::Endian; + + #[derive(Endian)] + #[repr(C, packed)] + pub struct UploadChunk { + pub hash: crate::FixedChunk, + } + + #[derive(Endian)] + #[repr(C, packed)] + pub struct CreateBackup { + pub backup_type: super::backup_type::Type, + pub id_length: u8, // length of the ID string + pub timestamp: u64, // seconds since the epoch + pub flags: super::backup_flags::Type, + pub name_length: u8, // file name length + pub chunk_size: u32, // average or "fixed" chunk size + pub file_size: u64, // size for fixed size files (must be 0 if DYNAMIC_CHUNKS is set) + + // ``id_length`` bytes of ID follow + // ``name_length`` bytes of file name follow + // Further packets contain the data or chunks + } + + #[derive(Endian)] + #[repr(C, packed)] + pub struct GetHashList { + pub name_length: u16, + // name follows as payload + } +} + +impl Endian for DynamicChunk { + fn to_be(self) -> Self { + self + } + fn to_le(self) -> Self { + self + } + fn from_be(self) -> Self { + self + } + fn from_le(self) -> Self { + self + } +} + +pub struct PacketBuilder { + data: Vec, +} + +impl PacketBuilder { + pub fn new(id: u8, pkttype: PacketType) -> Self { + let data = Vec::with_capacity(mem::size_of::()); + let mut me = Self { data }; + me.write_data( + Packet { + id, + pkttype: pkttype as _, + length: 0, + } + .to_le(), + ); + me + } + + pub fn reserve(&mut self, more: usize) -> &mut Self { + self.data.reserve(more); + self + } + + pub fn write_buf(&mut self, buf: &[u8]) -> &mut Self { + self.data.extend_from_slice(buf); + self + } + + pub fn write_data(&mut self, data: T) -> &mut Self { + self.write_data_noswap(&data.to_le()) + } + + pub fn write_data_noswap(&mut self, data: &T) -> &mut Self { + self.write_buf(unsafe { + std::slice::from_raw_parts(data as *const T as *const u8, mem::size_of::()) + }) + } + + pub fn finish(mut self) -> Vec { + let length = self.data.len(); + assert!(length >= mem::size_of::()); + unsafe { + let head = self.data.as_mut_ptr() as *mut Packet; + std::ptr::write_unaligned((&mut (*head).length) as *mut u32, (length as u32).to_le()); + } + self.data + } +} diff --git a/proxmox-protocol/src/server.rs b/proxmox-protocol/src/server.rs new file mode 100644 index 00000000..d7e3b23a --- /dev/null +++ b/proxmox-protocol/src/server.rs @@ -0,0 +1,465 @@ +use std::collections::hash_map::{self, HashMap}; +use std::io::{Read, Write}; +use std::{mem, ptr}; + +use failure::*; + +use endian_trait::Endian; + +use crate::common; +use crate::protocol::*; +use crate::ChunkEntry; +use crate::FixedChunk; + +type Result = std::result::Result; + +pub trait ChunkList: Send { + fn next(&mut self) -> Result>; +} + +/// This provides callbacks used by a `Connection` when it receives a packet. +pub trait HandleClient { + /// The protocol handler will call this when the client produces an irrecoverable error. + fn error(&self); + + /// The client wants the list of hashes, the provider should provide an iterator over chunk + /// entries. + fn get_chunk_list(&self, backup_name: &str) -> Result>; + + /// The client has uploaded a chunk, we should add it to the chunk store. Return whether the + /// chunk was new. + fn upload_chunk(&self, chunk: &ChunkEntry, data: &[u8]) -> Result; + + /// The client wants to create a backup. Since multiple backup streams can happen in parallel, + /// this should return a handler used to create the individual streams. + /// The handler will be informed about success via the ``finish()`` method. + fn create_backup( + &self, + backup_type: &str, + id: &str, + timestamp: i64, + new: bool, + ) -> Result>; +} + +/// A single backup may contain multiple files. Currently we represent this via a hierarchy where +/// the `HandleBackup` trait is instantiated for each backup, which is responsible for +/// instantiating the `BackupFile` trait objects. +pub trait HandleBackup { + /// All individual streams for this backup have been successfully finished. + fn finish(&mut self) -> Result<()>; + + /// Create a specific file in this backup. + fn create_file( + &self, + name: &str, + fixed_size: Option, + chunk_size: usize, + ) -> Result>; +} + +/// This handles backup files created by calling `create_file` on a `Backup`. +pub trait BackupFile { + /// Backup use the server-local timestamp formatting, so we want to be able to tell the client + /// the real remote path: + fn relative_path(&self) -> &str; + + /// The client wants to add a chunk to a fixed index file at a certain position. + fn add_fixed_data(&mut self, index: u64, chunk: &FixedChunk) -> Result<()>; + + /// The client wants to add a chunks to a dynamic index file. + fn add_dynamic_data(&mut self, chunk: &DynamicChunk) -> Result<()>; + + /// This notifies the handler that the backup has finished successfully. This should commit the + /// data to the store for good. After this the client will receive an "ok". + /// + /// If the Drop handler gets called before this method, the backup was aborted due to an error + /// or the client disconnected unexpectedly, in which case cleanup of temporary files should be + /// performed. + fn finish(&mut self) -> Result<()>; +} + +#[derive(Clone, Eq, Hash, PartialEq)] +struct BackupId(backup_type::Type, String, i64); + +/// Associates a socket with the server side of the backup protocol. +/// The communcation channel should be `Read + Write` and may be non-blocking (provided it +/// correctly returns `io::ErrorKind::WouldBlock`). +/// The handler has to implement the `HandleClient` trait to provide callbacks used while +/// communicating with the client. +pub struct Connection +where + S: Read + Write, + H: HandleClient, +{ + handler: H, + common: common::Connection, + + // states: + + // If this is set we are currently transferring our hash list to the client: + hash_list: Option<( + u8, // data stream ID + Box, + )>, + + // currently active 'backups' (handlers for a specific BackupDir) + backups: HashMap>, + + // currently active backup *file* streams + backup_files: HashMap>, +} + +impl Connection +where + S: Read + Write, + H: HandleClient, +{ + pub fn new(socket: S, handler: H) -> Result { + let mut me = Self { + handler, + common: common::Connection::new(socket), + hash_list: None, + backups: HashMap::new(), + backup_files: HashMap::new(), + }; + + me.send_hello()?; + Ok(me) + } + + fn send_hello(&mut self) -> Result<()> { + let mut packet = Packet::builder(0, PacketType::Hello); + packet.write_data(server::Hello { + magic: server::HELLO_MAGIC, + version: server::PROTOCOL_VERSION, + }); + self.common.queue_data(packet.finish())?; + Ok(()) + } + + pub fn eof(&self) -> bool { + self.common.eof + } + + /// It is safe to clear the error after an `io::ErrorKind::Interrupted`. + pub fn clear_err(&mut self) { + self.common.clear_err() + } + + pub fn main(&mut self) -> Result<()> { + self.poll_read()?; + self.poll_send()?; + Ok(()) + } + + // If this returns an error it is considered fatal and the connection should be dropped! + fn poll_read(&mut self) -> Result<()> { + if self.common.eof { + // polls after EOF are errors: + bail!("client disconnected"); + } + + if !self.common.poll_read()? { + // No data available + if self.common.eof { + bail!("client disconnected"); + } + return Ok(()); + } + + // we received a packet, handle it: + + loop { + use PacketType::*; + match self.common.current_packet_type { + GetHashList => self.hash_list_requested()?, + UploadChunk => self.receive_chunk()?, + CreateBackup => self.create_backup()?, + BackupDataDynamic => self.backup_data_dynamic()?, + BackupDataFixed => self.backup_data_fixed()?, + BackupFinished => self.backup_finished()?, + _ => bail!( + "client sent an unexpected packet of type {}", + self.common.current_packet_type as u32, + ), + }; + if !self.common.next()? { + break; + } + } + + Ok(()) + } + + fn poll_send(&mut self) -> Result<()> { + if self.common.error { + eprintln!("refusing to send datato client in error state"); + bail!("client is in error state"); + } + + if let Some(false) = self.common.poll_send()? { + // send queue is not finished, don't add anything else... + return Ok(()); + } + + // Queue has either finished or was empty, see if we should enqueue more data: + if self.hash_list.is_some() { + return self.send_hash_list(); + } + Ok(()) + } + + fn hash_list_requested(&mut self) -> Result<()> { + // Verify protocol: GetHashList is an empty packet. + let request = self.common.read_unaligned_data::(0)?; + self.common + .assert_size(mem::size_of_val(&request) + request.name_length as usize)?; + let name_bytes = &self.common.packet_data()[mem::size_of_val(&request)..]; + let name = std::str::from_utf8(name_bytes)?; + + // We support only one active hash list stream: + if self.hash_list.is_some() { + return self.respond_error(ErrorId::Busy); + } + + self.hash_list = Some(( + self.common.current_packet.id, + self.handler.get_chunk_list(name)?, + )); + + Ok(()) + } + + fn send_hash_list(&mut self) -> Result<()> { + loop { + let (stream_id, hash_iter) = self.hash_list.as_mut().unwrap(); + + let max_chunks_per_packet = (MAX_PACKET_SIZE as usize - mem::size_of::()) + / mem::size_of::(); + + let mut packet = Packet::builder(*stream_id, PacketType::HashListPart); + packet.reserve(mem::size_of::() * max_chunks_per_packet); + + let mut count = 0; + for _ in 0..max_chunks_per_packet { + let entry: &[u8; 32] = match hash_iter.next() { + Ok(Some(entry)) => entry, + Ok(None) => break, + Err(e) => { + eprintln!("error sending chunk list to client: {}", e); + continue; + } + }; + + packet.write_buf(entry); + count += 1; + } + + let can_send_more = self.common.queue_data(packet.finish())?; + + if count == 0 { + // We just sent the EOF packet, clear our iterator state! + self.hash_list = None; + break; + } + + if !can_send_more { + break; + } + } + Ok(()) + } + + fn respond_error(&mut self, kind: ErrorId) -> Result<()> { + self.respond_value(PacketType::Error, kind)?; + Ok(()) + } + + fn respond_value(&mut self, pkttype: PacketType, data: T) -> Result<()> { + let mut packet = Packet::builder(self.common.current_packet.id, pkttype); + packet.write_data(data); + self.common.queue_data(packet.finish())?; + Ok(()) + } + + fn respond_empty(&mut self, pkttype: PacketType) -> Result<()> { + self.common + .queue_data(Packet::simple(self.common.current_packet.id, pkttype))?; + Ok(()) + } + + fn respond_ok(&mut self) -> Result<()> { + self.respond_empty(PacketType::Ok) + } + + fn receive_chunk(&mut self) -> Result<()> { + self.common + .assert_atleast(mem::size_of::())?; + let data = self.common.packet_data(); + let (hash, data) = data.split_at(mem::size_of::()); + if data.len() == 0 { + bail!("received an empty chunk"); + } + let entry = ChunkEntry::from_data(data); + if entry.hash != hash { + let cli_hash = crate::tools::digest_to_hex(hash); + let data_hash = entry.digest_to_hex(); + bail!( + "client claimed data with digest {} has digest {}", + data_hash, + cli_hash + ); + } + let _new = self.handler.upload_chunk(&entry, data)?; + self.respond_ok() + } + + fn create_backup(&mut self) -> Result<()> { + if self + .backup_files + .contains_key(&self.common.current_packet.id) + { + bail!("stream id already in use..."); + } + + let create_msg = self.common.read_unaligned_data::(0)?; + + // simple data: + let flags = create_msg.flags; + let backup_type = create_msg.backup_type; + let time = create_msg.timestamp as i64; + + // text comes from the payload data after the CreateBackup struct: + let data = self.common.packet_data(); + let payload = &data[mem::size_of_val(&create_msg)..]; + + // there must be exactly the ID and the file name in the payload: + let id_len = create_msg.id_length as usize; + let name_len = create_msg.name_length as usize; + let expected_len = id_len + name_len; + if payload.len() < expected_len { + bail!("client sent incomplete CreateBackup request"); + } else if payload.len() > expected_len { + bail!("client sent excess data in CreateBackup request"); + } + + // id and file name must be utf8: + let id = std::str::from_utf8(&payload[0..id_len]) + .map_err(|e| format_err!("client-requested backup id is invalid: {}", e))?; + let file_name = std::str::from_utf8(&payload[id_len..]) + .map_err(|e| format_err!("client-requested backup file name invalid: {}", e))?; + + // Sanity check dynamic vs fixed: + let is_dynamic = (flags & backup_flags::DYNAMIC_CHUNKS) != 0; + let file_size = match (is_dynamic, create_msg.file_size) { + (false, size) => Some(size), + (true, 0) => None, + (true, _) => bail!("file size of dynamic streams must be zero"), + }; + + // search or create the handler: + let hashmap_id = BackupId(backup_type, id.to_string(), time); + let handle = match self.backups.entry(hashmap_id) { + hash_map::Entry::Vacant(entry) => entry.insert(self.handler.create_backup( + backup_type::id_to_name(backup_type)?, + id, + time, + (flags & backup_flags::EXCL) != 0, + )?), + hash_map::Entry::Occupied(entry) => entry.into_mut(), + }; + let file = handle.create_file(file_name, file_size, create_msg.chunk_size as usize)?; + + let mut response = + Packet::builder(self.common.current_packet.id, PacketType::BackupCreated); + let path = file.relative_path(); + if path.len() > 0xffff { + bail!("path too long"); + } + response + .write_data(server::BackupCreated { + path_length: path.len() as _, + }) + .write_buf(path.as_bytes()); + self.common.queue_data(response.finish())?; + + self.backup_files + .insert(self.common.current_packet.id, file); + + Ok(()) + } + + fn backup_data_dynamic(&mut self) -> Result<()> { + let stream_id = self.common.current_packet.id; + let file = self + .backup_files + .get_mut(&stream_id) + .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?; + + let mut data = self.common.packet_data(); + // Data consists of (offset: u64, hash: [u8; 32]) + let entry_len = mem::size_of::(); + + while data.len() >= entry_len { + let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const DynamicChunk) }; + data = &data[entry_len..]; + + entry.offset = entry.offset.from_le(); + file.add_dynamic_data(&entry)?; + } + + if data.len() != 0 { + bail!( + "client sent excess data ({} bytes) after dynamic chunk indices!", + data.len() + ); + } + + Ok(()) + } + + fn backup_data_fixed(&mut self) -> Result<()> { + let stream_id = self.common.current_packet.id; + let file = self + .backup_files + .get_mut(&stream_id) + .ok_or_else(|| format_err!("BackupDataFixed for invalid stream id {}", stream_id))?; + + let mut data = self.common.packet_data(); + // Data consists of (index: u64, hash: [u8; 32]) + #[repr(C, packed)] + struct IndexedChunk { + index: u64, + digest: FixedChunk, + } + let entry_len = mem::size_of::(); + + while data.len() >= entry_len { + let mut entry = unsafe { ptr::read_unaligned(data.as_ptr() as *const IndexedChunk) }; + data = &data[entry_len..]; + + entry.index = entry.index.from_le(); + file.add_fixed_data(entry.index, &entry.digest)?; + } + + if data.len() != 0 { + bail!( + "client sent excess data ({} bytes) after dynamic chunk indices!", + data.len() + ); + } + + Ok(()) + } + + fn backup_finished(&mut self) -> Result<()> { + let stream_id = self.common.current_packet.id; + let mut file = self + .backup_files + .remove(&stream_id) + .ok_or_else(|| format_err!("BackupDataDynamic for invalid stream id {}", stream_id))?; + file.finish()?; + self.respond_ok() + } +} diff --git a/proxmox-protocol/src/tools.rs b/proxmox-protocol/src/tools.rs new file mode 100644 index 00000000..a614e28f --- /dev/null +++ b/proxmox-protocol/src/tools.rs @@ -0,0 +1,47 @@ +use failure::{bail, Error}; + +pub fn digest_to_hex(digest: &[u8]) -> String { + const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef"; + + let mut buf = Vec::::with_capacity(digest.len() * 2); + + for i in 0..digest.len() { + buf.push(HEX_CHARS[(digest[i] >> 4) as usize]); + buf.push(HEX_CHARS[(digest[i] & 0xf) as usize]); + } + + unsafe { String::from_utf8_unchecked(buf) } +} + +pub unsafe fn swapped_data_to_buf(data: &T) -> &[u8] { + std::slice::from_raw_parts(data as *const T as *const u8, std::mem::size_of::()) +} + +fn hex_nibble(c: u8) -> Result { + Ok(match c { + b'0'..=b'9' => c - b'0', + b'a'..=b'f' => c - b'a' + 0xa, + b'A'..=b'F' => c - b'A' + 0xa, + _ => bail!("not a hex digit: {}", c as char), + }) +} + +#[inline] +pub fn parse_hex_digest>(hex: T) -> Result<[u8; 32], Error> { + let mut digest: [u8; 32] = unsafe { std::mem::uninitialized() }; + + let hex = hex.as_ref(); + + if hex.len() != 64 { + bail!( + "invalid hex digest ({} instead of 64 digits long)", + hex.len() + ); + } + + for i in 0..32 { + digest[i] = (hex_nibble(hex[i * 2])? << 4) + hex_nibble(hex[i * 2 + 1])?; + } + + Ok(digest) +} diff --git a/proxmox-protocol/src/types.rs b/proxmox-protocol/src/types.rs new file mode 100644 index 00000000..72561920 --- /dev/null +++ b/proxmox-protocol/src/types.rs @@ -0,0 +1,135 @@ +use std::borrow::Borrow; + +use endian_trait::Endian; +use failure::*; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum IndexType { + Fixed, + Dynamic, +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[repr(transparent)] +pub struct FixedChunk(pub [u8; 32]); + +impl FixedChunk { + pub fn new(hash: [u8; 32]) -> Self { + Self(hash) + } + + pub fn from_hex>(hex: T) -> Result { + Ok(Self::new(crate::tools::parse_hex_digest(hex.as_ref())?)) + } + + pub fn from_data(data: &[u8]) -> Self { + let mut hasher = openssl::sha::Sha256::new(); + hasher.update(data); + Self::new(hasher.finish()) + } + + pub fn digest_to_hex(&self) -> String { + crate::tools::digest_to_hex(&self.0) + } +} + +impl Endian for FixedChunk { + fn to_be(self) -> Self { + self + } + fn to_le(self) -> Self { + self + } + fn from_be(self) -> Self { + self + } + fn from_le(self) -> Self { + self + } +} + +#[derive(Clone, Copy, Debug, Hash)] +#[repr(C, packed)] +pub struct ChunkEntry { + pub hash: [u8; 32], + pub size: u64, +} + +impl ChunkEntry { + pub fn new(hash: [u8; 32], size: u64) -> Self { + Self { hash, size } + } + + pub fn from_hex>(hex: T, size: u64) -> Result { + Ok(Self::new( + crate::tools::parse_hex_digest(hex.as_ref())?, + size, + )) + } + + pub fn len(&self) -> u64 { + self.size + } + + pub fn from_data(data: &[u8]) -> Self { + let mut hasher = openssl::sha::Sha256::new(); + hasher.update(data); + Self::new(hasher.finish(), data.len() as u64) + } + + pub fn digest_to_hex(&self) -> String { + crate::tools::digest_to_hex(&self.hash) + } + + pub fn to_fixed(&self) -> FixedChunk { + FixedChunk(self.hash) + } +} + +impl PartialEq for ChunkEntry { + fn eq(&self, other: &Self) -> bool { + self.size == other.size && self.hash == other.hash + } +} + +impl Eq for ChunkEntry {} + +impl Endian for ChunkEntry { + fn to_be(self) -> Self { + self.size.to_be(); + self + } + + fn to_le(self) -> Self { + self.size.to_le(); + self + } + + fn from_be(self) -> Self { + self.size.from_be(); + self + } + + fn from_le(self) -> Self { + self.size.from_le(); + self + } +} + +impl Into for ChunkEntry { + fn into(self) -> FixedChunk { + FixedChunk(self.hash) + } +} + +impl Borrow for ChunkEntry { + fn borrow(&self) -> &FixedChunk { + unsafe { std::mem::transmute(&self.hash) } + } +} + +impl Borrow for [u8; 32] { + fn borrow(&self) -> &FixedChunk { + unsafe { std::mem::transmute(self) } + } +}