src/server/h2service.rs: implement generic h2 service

This commit is contained in:
Dietmar Maurer 2019-06-26 17:29:12 +02:00
parent fb0470837b
commit 42a87f7b96
3 changed files with 28 additions and 22 deletions

View File

@ -1,4 +1,5 @@
use failure::*; use failure::*;
use lazy_static::lazy_static;
use std::sync::Arc; use std::sync::Arc;
@ -14,16 +15,13 @@ use crate::tools;
use crate::tools::wrapped_reader_stream::*; use crate::tools::wrapped_reader_stream::*;
use crate::api_schema::router::*; use crate::api_schema::router::*;
use crate::api_schema::*; use crate::api_schema::*;
use crate::server::WorkerTask; use crate::server::{WorkerTask, H2Service};
use crate::backup::*; use crate::backup::*;
use crate::api2::types::*; use crate::api2::types::*;
mod environment; mod environment;
use environment::*; use environment::*;
mod service;
use service::*;
mod upload_chunk; mod upload_chunk;
use upload_chunk::*; use upload_chunk::*;
@ -96,7 +94,7 @@ fn upgrade_to_backup_protocol(
env.log(format!("starting new backup on datastore '{}': {:?}", store, path)); env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
let service = BackupService::new(env.clone(), worker.clone(), debug); let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_ROUTER, debug);
let abort_future = worker.abort_future(); let abort_future = worker.abort_future();
@ -150,6 +148,10 @@ fn upgrade_to_backup_protocol(
Ok(Box::new(futures::future::ok(response))) Ok(Box::new(futures::future::ok(response)))
} }
lazy_static!{
static ref BACKUP_ROUTER: Router = backup_api();
}
pub fn backup_api() -> Router { pub fn backup_api() -> Router {
let router = Router::new() let router = Router::new()

View File

@ -19,6 +19,9 @@ pub use command_socket::*;
mod worker_task; mod worker_task;
pub use worker_task::*; pub use worker_task::*;
mod h2service;
pub use h2service::*;
pub mod formatter; pub mod formatter;
#[macro_use] #[macro_use]

View File

@ -1,5 +1,4 @@
use failure::*; use failure::*;
use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -12,22 +11,22 @@ use crate::api_schema::router::*;
use crate::server::formatter::*; use crate::server::formatter::*;
use crate::server::WorkerTask; use crate::server::WorkerTask;
use super::environment::*; /// Hyper Service implementation to handle stateful H2 connections.
///
lazy_static!{ /// We use this kind of service to handle backup protocol
static ref BACKUP_ROUTER: Router = super::backup_api(); /// connections. State is stored inside the generic ``rpcenv``. Logs
} /// goes into the ``WorkerTask`` log.
pub struct H2Service<E> {
pub struct BackupService { router: &'static Router,
rpcenv: BackupEnvironment, rpcenv: E,
worker: Arc<WorkerTask>, worker: Arc<WorkerTask>,
debug: bool, debug: bool,
} }
impl BackupService { impl <E: RpcEnvironment + Clone> H2Service<E> {
pub fn new(rpcenv: BackupEnvironment, worker: Arc<WorkerTask>, debug: bool) -> Self { pub fn new(rpcenv: E, worker: Arc<WorkerTask>, router: &'static Router, debug: bool) -> Self {
Self { rpcenv, worker, debug } Self { rpcenv, worker, router, debug }
} }
pub fn debug<S: AsRef<str>>(&self, msg: S) { pub fn debug<S: AsRef<str>>(&self, msg: S) {
@ -49,18 +48,20 @@ impl BackupService {
let mut uri_param = HashMap::new(); let mut uri_param = HashMap::new();
match BACKUP_ROUTER.find_method(&components, method, &mut uri_param) { let formatter = &JSON_FORMATTER;
match self.router.find_method(&components, method, &mut uri_param) {
MethodDefinition::None => { MethodDefinition::None => {
let err = http_err!(NOT_FOUND, "Path not found.".to_string()); let err = http_err!(NOT_FOUND, "Path not found.".to_string());
return Box::new(future::ok((self.rpcenv.formatter.format_error)(err))); return Box::new(future::ok((formatter.format_error)(err)));
} }
MethodDefinition::Simple(api_method) => { MethodDefinition::Simple(api_method) => {
return crate::server::rest::handle_sync_api_request( return crate::server::rest::handle_sync_api_request(
self.rpcenv.clone(), api_method, self.rpcenv.formatter, parts, body, uri_param); self.rpcenv.clone(), api_method, formatter, parts, body, uri_param);
} }
MethodDefinition::Async(async_method) => { MethodDefinition::Async(async_method) => {
return crate::server::rest::handle_async_api_request( return crate::server::rest::handle_async_api_request(
self.rpcenv.clone(), async_method, self.rpcenv.formatter, parts, body, uri_param); self.rpcenv.clone(), async_method, formatter, parts, body, uri_param);
} }
} }
} }
@ -82,7 +83,7 @@ impl BackupService {
} }
} }
impl hyper::service::Service for BackupService { impl <E: RpcEnvironment + Clone> hyper::service::Service for H2Service<E> {
type ReqBody = Body; type ReqBody = Body;
type ResBody = Body; type ResBody = Body;
type Error = Error; type Error = Error;