streamdeck-obs-replay/obsws/codegen/protocol.py
Tyler eca3834fd7
Some checks failed
continuous-integration/drone/push Build is failing
initial version
2020-01-14 23:32:53 -05:00

403 lines
12 KiB
Python

# TODO: No warning is given for struct fields whose names match function names.
# Maybe bring the reserved list back.
import json
import os
import sys
from typing import Dict, List, Tuple
package = "obsws"
doc = "https://github.com/Palakis/obs-websocket/blob/4.3-maintenance/docs/generated/protocol.md"
disclaimer = """
// This file is automatically generated.
// https://github.com/christopher-dG/go-obs-websocket/blob/master/codegen/protocol.py
"""
# TODO: Test the less clear ones.
type_map = {
"bool": "bool",
"boolean": "bool",
"int": "int",
"integer": "int",
"float": "float64",
"double": "float64",
"string": "string",
"array": "[]interface{}",
"object": "map[string]interface{}",
"array of objects": "[]map[string]interface{}",
"object|array": "interface{}",
"scene|array": "[]map[string]interface{}",
"source|array": "[]map[string]interface{}",
}
unknown_types = []
def process_json(d: Dict):
"""Generate Go code for the entire protocol file."""
if "--events" in sys.argv or "--all" in sys.argv:
gen_events(d["events"])
if "--requests" in sys.argv or "--all" in sys.argv:
gen_requests(d["requests"])
def gen_category(prefix: str, category: str, data: Dict):
"""Generate all events or requests in one category."""
func = gen_event if prefix == "events" else gen_request
content = "\n".join(
filter(
lambda s: not s.isspace(),
"\n".join(func(thing) for thing in data).split("\n"),
)
)
with open(f"{prefix}_{category}.go".replace(" ", "_"), "w") as f:
f.write(
f"""
package {package}
{disclaimer}
{content}
"""
)
def gen_events(events: Dict):
"""Generate all events."""
for category, data in events.items():
gen_category("events", category, data)
gen_event_utils(events)
def gen_event(data: Dict) -> str:
"""Write Go code with a type definition and interface functions."""
struct = f"""
type {data["name"]}Event struct {{
{go_struct_variables(go_variables(data.get("returns", [])))}
_event `json:",squash"`
}}
"""
description = newlinify(f"{data['name']}Event : {data['description']}")
if not description.endswith("."):
description += "."
if data.get("since"):
description += (
f"\n//\n// Since obs-websocket version: {data['since'].capitalize()}."
)
return f"""
{description}
//
// {doc}#{data["heading"]["text"].lower()}
{struct}
"""
def gen_requests(requests: Dict):
"""Generate all requests and responses."""
for category, data in requests.items():
gen_category("requests", category, data)
def gen_request(data: Dict) -> str:
"""Write Go code with type definitions and interface functions."""
struct = f"""
type {data["name"]}Request struct {{
{go_struct_variables(go_variables(data.get("params", [])))}
_request `json:",squash"`
response chan {data["name"]}Response
}}
"""
description = newlinify(f"{data['name']}Request : {data['description']}")
if description and not description.endswith("."):
description += "."
if data.get("since"):
description += (
f"\n//\n// Since obs-websocket version: {data['since'].capitalize()}."
)
request = f"""
{description}
//
// {doc}#{data["heading"]["text"].lower()}
{struct}
{gen_request_new(data)}
// Send sends the request.
func (r *{data["name"]}Request) Send(c *Client) error {{
if r.sent {{
return ErrAlreadySent
}}
future, err := c.sendRequest(r)
if err != nil {{
return err
}}
r.sent = true
go func() {{
m := <-future
var resp {data["name"]}Response
if err = mapToStruct(m, &resp); err != nil {{
r.err <- err
}} else if resp.Status() != StatusOK {{
r.err <- errors.New(resp.Error())
}} else {{
r.response <- resp
}}
}}()
return nil
}}
// Receive waits for the response.
func (r {data["name"]}Request) Receive() ({data["name"]}Response, error) {{
if !r.sent {{
return {data["name"]}Response{{}}, ErrNotSent
}}
if receiveTimeout == 0 {{
select {{
case resp := <-r.response:
return resp, nil
case err := <-r.err:
return {data["name"]}Response{{}}, err
}}
}} else {{
select {{
case resp := <-r.response:
return resp, nil
case err := <-r.err:
return {data["name"]}Response{{}}, err
case <-time.After(receiveTimeout):
return {data["name"]}Response{{}}, ErrReceiveTimeout
}}
}}
}}
// SendReceive sends the request then immediately waits for the response.
func (r {data["name"]}Request) SendReceive(c *Client) ({data["name"]}Response, error) {{
if err := r.Send(c); err != nil {{
return {data["name"]}Response{{}}, err
}}
return r.Receive()
}}
"""
if data.get("returns"):
struct = f"""
type {data["name"]}Response struct {{
{go_struct_variables(go_variables(data["returns"]))}
_response `json:",squash"`
}}
"""
else:
struct = (
f"""type {data["name"]}Response struct {{ _response `json:",squash"`}}"""
)
description = f"// {data['name']}Response : Response for {data['name']}Request."
if data.get("since"):
description += (
f"\n//\n// Since obs-websocket version: {data['since'].capitalize()}."
)
response = f"""
{description}
//
// {doc}#{data["heading"]["text"].lower()}
{struct}
"""
return f"{request}\n\n{response}"
def gen_request_new(request: Dict):
"""Generate Go code with a New___Request function for a request type."""
base = f"""
// New{request["name"]}Request returns a new {request["name"]}Request.
func New{request["name"]}Request(\
"""
variables = go_variables(request.get("params", []), export=False)
default_args = f"""
_request{{
ID_: getMessageID(),
Type_: "{request["name"]}",
err: make(chan error, 1),
}},
make(chan {request["name"]}Response, 1),
"""
if not variables:
sig = f"{base}) {request['name']}Request"
constructor_args = f"""{{
{default_args}
}}
"""
else:
args = "\n".join(
f"{'_type' if var['name'] == 'type' else var['name']} {var['type']},"
for var in variables
)
constructor_args = (
"{\n"
+ "\n".join(
"_type," if var["name"] == "type" else f"{var['name']},"
for var in variables
)
+ default_args
+ "}"
)
if len(variables) == 1:
sig = f"{base}{args}) {request['name']}Request"
else:
sig = f"""
{base}
{args}
) {request["name"]}Request\
"""
return f"{sig} {{ return {request['name']}Request{constructor_args} }}"
def gen_event_utils(events: Dict):
"""
Generate a Go file with a mappings from type names to structs,
and a function for dereferencing interface pointers.
"""
event_map = {}
event_list = []
for category in events.values():
for e in category:
event_map[e["name"]] = f"func() Event {{ return &{e['name']}Event{{}} }}"
event_list.append(f"*{e['name']}Event")
event_entries = "\n".join(f'"{k}": {v},' for k, v in event_map.items())
event_cases = "\n".join(f"case {e}:\nreturn *e" for e in event_list)
deref = f"""
func derefEvent(e Event) Event {{
switch e := e.(type) {{
{event_cases}
default:
return nil
}}
}}
"""
with open("event_utils.go", "w") as f:
f.write(
f"""
package {package}
{disclaimer}
var eventMap = map[string]func() Event {{
{event_entries}
}}
// derefEvent returns an Event struct from a pointer to an Event struct.
{deref}
"""
)
def go_variables(variables: List[Dict], export: bool = True) -> str:
"""
Convert a list of variable names into Go code to be put
inside a struct definition.
"""
vardicts, varnames = [], []
for v in variables:
typename, optional = optional_type(v["type"])
varname = go_var(v["name"], export=export)
vardicts.append(
{
"name": varname,
"type": type_map[typename.lower()],
"tag": f'`json:"{v["name"]}"`',
"description": v["description"].replace("\n", " "),
"optional": optional,
"unknown": typename.lower() in unknown_types,
"actual_type": v["type"],
"duplicate": varname in varnames,
}
)
varnames.append(varname)
return vardicts
def go_var(s: str, export: bool = True) -> str:
"""Convert a variable name in the input file to a Go variable name."""
s = f"{(str.upper if export else str.lower)(s[0])}{s[1:]}"
for sep in ["-", "_", ".*.", "[].", "."]:
while sep in s:
_len = len(sep)
if s.endswith(sep):
s = s[:-_len]
continue
i = s.find(sep)
s = f"{s[:i]}{s[i+_len].upper()}{s[i+_len+1:]}"
return s.replace("Id", "ID").replace("Obs", "OBS").replace("Fps", "FPS")
def go_struct_variables(variables: List[Dict]) -> str:
"""Generate Go code containing struct field definitions."""
lines = []
for var in variables:
if var["description"]:
description = (
var["description"]
.replace("e.g. ", "e.g.")
.replace(". ", "\n")
.replace("e.g.", "e.g. ")
)
for desc_line in description.split("\n"):
desc_line = desc_line.strip()
if desc_line and not desc_line.endswith("."):
desc_line += "."
lines.append(f"// {desc_line}")
lines.append(f"// Required: {'Yes' if not var['optional'] else 'No'}.")
todos = []
if var["unknown"]:
todos.append(f"Unknown type ({var['actual_type']})")
if var["duplicate"]:
todos.append("Duplicate name")
todos = " ".join(f"TODO: {todo}." for todo in todos)
if todos:
lines.append(f"// {todos}")
lines.append(f"{var['name']} {var['type']} {var['tag']}")
return "\n".join(lines)
def newlinify(s: str, comment: bool = True) -> str:
"""Put each sentence of a string onto its own line."""
s = s.replace("e.g. ", "e.g.").replace(". ", "\n").replace("e.g.", "e.g. ")
if comment:
s = "\n".join(
[f"// {_s}" if not _s.startswith("//") else _s for _s in s.split("\n")]
)
return s
def optional_type(s: str) -> Tuple[str, bool]:
"""Determine if a type is optional and parse the actual type name."""
if s.endswith("(optional)"):
return s[: s.find("(optional)")].strip(), True
return s, False
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Missing filename argument")
exit(1)
if not os.path.isfile(sys.argv[1]):
print(f"file '{sys.argv[1]}' does not exist")
exit(1)
with open(sys.argv[1]) as f:
d = json.load(f)
process_json(d)
os.system("goimports -w *.go")