From 78a39e051792a71321bf93cf07707972c79af7ee Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Mon, 8 Apr 2019 17:59:39 +0200 Subject: [PATCH] src/server/command_socket.rs: simple command socket --- src/server.rs | 3 ++ src/server/command_socket.rs | 67 ++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 src/server/command_socket.rs diff --git a/src/server.rs b/src/server.rs index 0786ba38..19e3b2d7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,6 +10,9 @@ pub use environment::*; mod state; pub use state::*; +mod command_socket; +pub use command_socket::*; + mod worker_task; pub use worker_task::*; diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs new file mode 100644 index 00000000..7378ed8e --- /dev/null +++ b/src/server/command_socket.rs @@ -0,0 +1,67 @@ +use failure::*; + +use futures::*; +use futures::stream::Stream; + +use tokio::net::unix::UnixListener; +use tokio::io::AsyncRead; + +use std::io::Write; + +use std::path::PathBuf; +use serde_json::Value; +use std::sync::Arc; + +/// Listens on a Unix Socket to handle simple command asynchronously +pub fn create_control_socket(path: P, f: F) -> Result, Error> + where P: Into, + F: Send + Sync +'static + Fn(Value) -> Result, +{ + let path: PathBuf = path.into(); + + let socket = UnixListener::bind(&path)?; + + let f = Arc::new(f); + let path = Arc::new(path); + let path2 = path.clone(); + let path3 = path.clone(); + + let control_future = socket.incoming() + .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); }) + .for_each(move |conn| { + let f1 = f.clone(); + + let (rx, mut tx) = conn.split(); + let path = path3.clone(); + let path2 = path3.clone(); + + tokio::io::lines(std::io::BufReader::new(rx)) + .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); }) + .and_then(move |cmd| { + let res = try_block!({ + let param = match cmd.parse::() { + Ok(p) => p, + Err(err) => bail!("ERRER {}", err), + }; + + f1(param) + }); + + let resp = match res { + Ok(v) => format!("OK: {}\n", v), + Err(err) => format!("ERROR: {}\n", err), + }; + Ok(resp) + }) + .for_each(move |resp| { + tx.write_all(resp.as_bytes()) + .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); }) + }) + + }); + + let abort_future = super::last_worker_future().map_err(|_| {}); + let task = control_future.select(abort_future).map(|_| {}).map_err(|_| {}); + + Ok(task) +}