diff --git a/src/server/command_socket.rs b/src/server/command_socket.rs index 814d160b..92a1322c 100644 --- a/src/server/command_socket.rs +++ b/src/server/command_socket.rs @@ -18,7 +18,6 @@ pub fn create_control_socket(path: P, f: F) -> Result Result, { let path: PathBuf = path.into(); - let path1: PathBuf = path.clone(); let socket = UnixListener::bind(&path)?; @@ -35,29 +34,34 @@ pub fn create_control_socket(path: P, f: F) -> Result() { - Ok(p) => p, - Err(err) => bail!("unable to parse json value - {}", err), + let abort_future = super::last_worker_future().map_err(|_| {}); + + tokio::spawn( + tokio::io::lines(std::io::BufReader::new(rx)) + .map_err(move |err| { eprintln!("control socket {:?} read error: {}", path, err); }) + .and_then(move |cmd| { + let res = try_block!({ + let param = match cmd.parse::() { + Ok(p) => p, + Err(err) => bail!("unable to parse json value - {}", err), + }; + + f1(param) + }); + + let resp = match res { + Ok(v) => format!("OK: {}\n", v), + Err(err) => format!("ERROR: {}\n", err), }; - - f1(param) - }); - - let resp = match res { - Ok(v) => format!("OK: {}\n", v), - Err(err) => format!("ERROR: {}\n", err), - }; - Ok(resp) - }) - .for_each(move |resp| { - tx.write_all(resp.as_bytes()) - .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); }) - }) - + Ok(resp) + }) + .for_each(move |resp| { + tx.write_all(resp.as_bytes()) + .map_err(|err| { eprintln!("control socket {:?} write response error: {}", path2, err); }) + }) + .select(abort_future) + .then(move |_| { Ok(()) }) + ) }); let abort_future = super::last_worker_future().map_err(|_| {});