diff --git a/src/server.rs b/src/server.rs
index 0786ba38..19e3b2d7 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -10,6 +10,9 @@ pub use environment::*;
mod state;
pub use state::*;
+mod command_socket;
+pub use command_socket::*;
+
mod worker_task;
pub use worker_task::*;
diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs
new file mode 100644
index 00000000..7378ed8e
--- /dev/null
+++ b/src/server/command_socket.rs
@@ -0,0 +1,67 @@
+use failure::*;
+
+use futures::*;
+use futures::stream::Stream;
+
+use tokio::net::unix::UnixListener;
+use tokio::io::AsyncRead;
+
+use std::io::Write;
+
+use std::path::PathBuf;
+use serde_json::Value;
+use std::sync::Arc;
+
+/// Listens on a Unix Socket to handle simple command asynchronously
+pub fn create_control_socket
(path: P, f: F) -> Result, Error>
+ where P: Into,
+ F: Send + Sync +'static + Fn(Value) -> Result,
+{
+ let path: PathBuf = path.into();
+
+ let socket = UnixListener::bind(&path)?;
+
+ let f = Arc::new(f);
+ let path = Arc::new(path);
+ let path2 = path.clone();
+ let path3 = path.clone();
+
+ let control_future = socket.incoming()
+ .map_err(move |err| { eprintln!("failed to accept on control socket {:?}: {}", path2, err); })
+ .for_each(move |conn| {
+ let f1 = f.clone();
+
+ let (rx, mut tx) = conn.split();
+ let path = path3.clone();
+ let path2 = path3.clone();
+
+ tokio::io::lines(std::io::BufReader::new(rx))
+ .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); })
+ .and_then(move |cmd| {
+ let res = try_block!({
+ let param = match cmd.parse::() {
+ Ok(p) => p,
+ Err(err) => bail!("ERRER {}", err),
+ };
+
+ f1(param)
+ });
+
+ let resp = match res {
+ Ok(v) => format!("OK: {}\n", v),
+ Err(err) => format!("ERROR: {}\n", err),
+ };
+ Ok(resp)
+ })
+ .for_each(move |resp| {
+ tx.write_all(resp.as_bytes())
+ .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); })
+ })
+
+ });
+
+ let abort_future = super::last_worker_future().map_err(|_| {});
+ let task = control_future.select(abort_future).map(|_| {}).map_err(|_| {});
+
+ Ok(task)
+}