diff --git a/client.go b/client.go new file mode 100644 index 0000000..51c454c --- /dev/null +++ b/client.go @@ -0,0 +1,35 @@ +package main + +import ( + "io" + "io/ioutil" + "net/http" + "strconv" +) + +type Client interface { + Connect() error + Disconnect() error + Connected() bool + + ToggleReplay() error + SaveReplay() error +} + +func NewClient(key, host string, port int, password string) Client { + res, err := http.Get("http://" + host + ":" + strconv.Itoa(port) + "/api/info") + + if err != nil { + return nil + } + + defer res.Body.Close() + + io.Copy(ioutil.Discard, res.Body) + + if res.StatusCode == 200 { + return NewSlobsClient(key, host, port, password) + } + + return NewOBSWSClient(key, host, port, password) +} diff --git a/client_obsws.go b/client_obsws.go new file mode 100644 index 0000000..2ccb8e8 --- /dev/null +++ b/client_obsws.go @@ -0,0 +1,64 @@ +package main + +import ( + "meow.tf/streamdeck/obs-replay/obsws" +) + +type OBSWSClient struct { + client *obsws.Client + key string +} + +func NewOBSWSClient(key, host string, port int, password string) *OBSWSClient { + obsc := &obsws.Client{Host: host, Port: port, Password: password} + + return &OBSWSClient{client: obsc, key: key} +} + +func (c *OBSWSClient) Connect() error { + err := c.client.Connect() + + if err != nil { + return err + } + + c.client.AddEventHandler("StreamStatus", streamStatusUpdate(c.key)) + c.client.AddEventHandler("ReplayStarted", func(obsws.Event) { loopContextState(c.key, 1) }) + c.client.AddEventHandler("ReplayStopped", func(obsws.Event) { loopContextState(c.key, 0) }) + + return nil +} + +func (c *OBSWSClient) Disconnect() error { + return c.client.Disconnect() +} + +func (c *OBSWSClient) Connected() bool { + return c.client.Connected() +} + +func (c *OBSWSClient) ToggleReplay() error { + req := obsws.NewStartStopReplayBufferRequest() + + return req.Send(c.client) +} + +func (c *OBSWSClient) SaveReplay() error { + req := obsws.NewSaveReplayBufferRequest() + + return req.Send(c.client) +} + +func streamStatusUpdate(key string) func(obsws.Event) { + return func(e obsws.Event) { + evt := e.(obsws.StreamStatusEvent) + + state := 0 + + if evt.Replay { + state = 1 + } + + loopContextState(key, state) + } +} diff --git a/client_slobs.go b/client_slobs.go new file mode 100644 index 0000000..f9c9409 --- /dev/null +++ b/client_slobs.go @@ -0,0 +1,104 @@ +package main + +import ( + "meow.tf/streamdeck/obs-replay/slobs" + "strconv" +) + +type SlobsClient struct { + client *slobs.Client + key string + recording bool +} + +func NewSlobsClient(key, host string, port int, password string) *SlobsClient { + slobsc := slobs.NewClient(host + ":" + strconv.Itoa(port)) + + return &SlobsClient{client: slobsc, key: key} +} + +const ( + Running = "running" + Saving = "saving" + Stopping = "stopping" + Offline = "offline" +) + +func (c *SlobsClient) Connect() error { + err := c.client.Connect() + + if err != nil { + return err + } + + c.client.SendRPC("StreamingService", "getModel", func(e *slobs.RPCResponse) { + state := &slobs.IStreamingState{} + + e.DecodeTo(&state) + + switch state.ReplayBufferStatus { + case Saving: + fallthrough + case Running: + loopContextState(c.key, 1) + c.recording = true + case Offline: + loopContextState(c.key, 0) + c.recording = false + } + }) + + c.client.Subscribe("StreamingService", "replayBufferStatusChange", func(e *slobs.ResourceEvent) { + var status string + e.DecodeTo(&status) + + switch status { + case Saving: + return + case Running: + loopContextState(c.key, 1) + c.recording = true + case Offline: + loopContextState(c.key, 0) + c.recording = false + } + }) + + return nil +} + +func (c *SlobsClient) Disconnect() error { + return c.client.Disconnect() +} + +func (c *SlobsClient) Connected() bool { + return c.client.Connected() +} + +func (c *SlobsClient) ToggleReplay() error { + ret := make(chan struct{}, 1) + + handler := func(res *slobs.RPCResponse) { + close(ret) + } + + if c.recording { + c.client.SendRPC("StreamingService", "stopReplayBuffer", handler) + } else { + c.client.SendRPC("StreamingService", "startReplayBuffer", handler) + } + + <-ret + return nil +} + +func (c *SlobsClient) SaveReplay() error { + ret := make(chan struct{}, 1) + + c.client.SendRPC("StreamingService", "saveReplay", func(res *slobs.RPCResponse) { + close(ret) + }) + + <-ret + return nil +} diff --git a/go.mod b/go.mod index b7b1ee1..5b13ee5 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,11 @@ module meow.tf/streamdeck/obs-replay go 1.13 require ( - github.com/christopher-dG/go-obs-websocket v0.0.0-20181224025342-2efc3605bff5 + github.com/cenkalti/backoff v2.2.1+incompatible // indirect + github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 github.com/gorilla/websocket v1.4.1 github.com/mitchellh/mapstructure v1.1.2 + github.com/mordillo123/sockjs-go-client v0.0.0-20161009150606-c1486c966a40 github.com/valyala/fastjson v1.4.1 meow.tf/streamdeck/sdk v0.0.0-20190519021527-54a933f8777d ) diff --git a/go.sum b/go.sum index 92f5122..0720036 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,13 @@ -github.com/christopher-dG/go-obs-websocket v0.0.0-20181224025342-2efc3605bff5 h1:VtKPsvxzKt/+EnkhcPp0Xg7MDjt/a+CNRSj5phITbjo= -github.com/christopher-dG/go-obs-websocket v0.0.0-20181224025342-2efc3605bff5/go.mod h1:hFg9UFHefvNCvpWpYtOaP/VT2HyokIJsmV1AUBjpTeQ= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 h1:74lLNRzvsdIlkTgfDSMuaPjBr4cf6k7pwQQANm/yLKU= +github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9/go.mod h1:GgB8SF9nRG+GqaDtLcwJZsQFhcogVCJ79j4EdT0c2V4= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mordillo123/sockjs-go-client v0.0.0-20161009150606-c1486c966a40 h1:d/FazCBU+C1trw+F+Gdah5Q3QX5kNsOigUCes3z0gVc= +github.com/mordillo123/sockjs-go-client v0.0.0-20161009150606-c1486c966a40/go.mod h1:b+6Qgn9fCwiTtpRr6lHe51DY2g69kcMnWc53J4EV4Jc= github.com/valyala/fastjson v1.4.1 h1:hrltpHpIpkaxll8QltMU8c3QZ5+qIiCL8yKqPFJI/yE= github.com/valyala/fastjson v1.4.1/go.mod h1:nV6MsjxL2IMJQUoHDIrjEI7oLyeqK6aBD7EFWPsvP8o= meow.tf/streamdeck/sdk v0.0.0-20190519021527-54a933f8777d h1:PPZHRoZFy9p4GjXssLvTneJfX6cS0bEm51md5TqXFgU= diff --git a/plugin/manifest.json b/plugin/manifest.json index b8d9698..d397a15 100644 --- a/plugin/manifest.json +++ b/plugin/manifest.json @@ -37,12 +37,12 @@ "Author": "Meow.tf", "Category": "OBS Replay", "CodePathWin": "replay.exe", - "Description": "Control OBS' Replays using StreamDeck and obs-websocket", + "Description": "Control OBS' Replays using StreamDeck and Streamlabs OBS/OBS Studio + obs-websocket", "Name": "OBS Replay", "Icon": "images/pluginIcon", "CategoryIcon": "images/pluginIcon", "URL": "https://streamdeck.meow.tf/obsreplay", - "Version": "1.0.2", + "Version": "1.1.0", "SDKVersion": 2, "OS": [ { diff --git a/replay.go b/replay.go index f877850..f4b8623 100644 --- a/replay.go +++ b/replay.go @@ -3,10 +3,10 @@ package main import ( "github.com/valyala/fastjson" "log" - "meow.tf/streamdeck/obs-replay/obsws" "meow.tf/streamdeck/sdk" "strconv" "sync" + "time" ) const ( @@ -19,9 +19,10 @@ var ( clientMutex sync.RWMutex stateMutex sync.RWMutex - clients = make(map[string]*obsws.Client) - contexts = make(map[string]string) - cachedStates = make(map[string]int) + clients = make(map[string]Client) + contexts = make(map[string]string) + contextActions = make(map[string]string) + cachedStates = make(map[string]int) ) func replayToggle(action, context string, payload *fastjson.Value, deviceId string) { @@ -32,16 +33,30 @@ func replayToggle(action, context string, payload *fastjson.Value, deviceId stri return } - req := obsws.NewStartStopReplayBufferRequest() - - err := req.Send(c) - - if err != nil { + if err := c.ToggleReplay(); err != nil { sdk.ShowAlert(context) return } - sdk.ShowOk(context) + time.AfterFunc(1*time.Second, func() { + contextMutex.RLock() + key, exists := contexts[context] + contextMutex.RUnlock() + + if !exists { + return + } + + stateMutex.RLock() + state, exists := cachedStates[key] + stateMutex.RUnlock() + + if !exists { + return + } + + sdk.SetState(context, state) + }) } func replaySave(action, context string, payload *fastjson.Value, deviceId string) { @@ -52,18 +67,22 @@ func replaySave(action, context string, payload *fastjson.Value, deviceId string return } - stateMutex.RLock() - state, exists := cachedStates[context] - stateMutex.RUnlock() + contextMutex.RLock() + key, exists := contexts[context] + contextMutex.RUnlock() - if exists && state == 0 { - sdk.ShowAlert(context) - return + if exists { + stateMutex.RLock() + state, exists := cachedStates[key] + stateMutex.RUnlock() + + if exists && state == 0 { + sdk.ShowAlert(context) + return + } } - req := obsws.NewSaveReplayBufferRequest() - - if err := req.Send(c); err != nil { + if err := c.SaveReplay(); err != nil { sdk.ShowAlert(context) return } @@ -71,7 +90,7 @@ func replaySave(action, context string, payload *fastjson.Value, deviceId string sdk.ShowOk(context) } -func clientForContext(context string) *obsws.Client { +func clientForContext(context string) Client { contextMutex.RLock() key, exists := contexts[context] contextMutex.RUnlock() @@ -104,9 +123,15 @@ func onWillAppear(e *sdk.WillAppearEvent) { if settings != nil { host := sdk.JsonStringValue(settings, "host") - port := settings.GetInt("port") + portStr := sdk.JsonStringValue(settings, "port") password := sdk.JsonStringValue(settings, "password") + port, err := strconv.Atoi(portStr) + + if err != nil { + port = 4444 + } + key := checkClient(host, port, password) if key == "" { @@ -115,17 +140,24 @@ func onWillAppear(e *sdk.WillAppearEvent) { contextMutex.Lock() contexts[e.Context] = key + contextActions[e.Context] = e.Action contextMutex.Unlock() - stateMutex.RLock() - if state, ok := cachedStates[key]; ok { - sdk.SetState(e.Context, state) + if e.Action == actionReplayToggle { + stateMutex.RLock() + if state, ok := cachedStates[key]; ok { + sdk.SetState(e.Context, state) + } + stateMutex.RUnlock() } - stateMutex.RUnlock() } } func checkClient(host string, port int, password string) string { + if host == "" { + host = "127.0.0.1" + } + if port == 0 { port = 4444 } @@ -137,11 +169,9 @@ func checkClient(host string, port int, password string) string { clientMutex.RUnlock() if !ok { - client = &obsws.Client{Host: host, Port: port, Password: password} + client = NewClient(key, host, port, password) - client.AddEventHandler("StreamStatus", streamStatusUpdate(key)) - client.AddEventHandler("ReplayStarted", loopContextState(key, 1)) - client.AddEventHandler("ReplayStopped", loopContextState(key, 0)) + defer client.Connect() clientMutex.Lock() clients[key] = client @@ -151,45 +181,19 @@ func checkClient(host string, port int, password string) string { return key } -func streamStatusUpdate(key string) func(obsws.Event) { - return func(e obsws.Event) { - evt := e.(obsws.StreamStatusEvent) - - state := 0 - - if evt.Replay { - state = 1 - } - - loopContextState(key, state) - } -} - -func loopContextState(key string, state int) func(obsws.Event) { - stateMutex.Lock() - cachedStates[key] = state - stateMutex.Unlock() - - return func(event obsws.Event) { - contextMutex.RLock() - defer contextMutex.RUnlock() - - for ctx, ctxKey := range contexts { - if ctxKey == key { - sdk.SetState(ctx, state) - } - } - } -} - func onWillDisappear(e *sdk.WillDisappearEvent) { + contextDiscounnected(e.Context) +} + +func contextDiscounnected(context string) { contextMutex.Lock() defer contextMutex.Unlock() // replayToggleContexts - key, ok := contexts[e.Context] + key, ok := contexts[context] - delete(contexts, e.Context) + delete(contexts, context) + delete(contextActions, context) if !ok { return @@ -209,18 +213,30 @@ func onWillDisappear(e *sdk.WillDisappearEvent) { } func onSettingsReceived(e *sdk.ReceiveSettingsEvent) { - var host, password string + host := sdk.JsonStringValue(e.Settings, "host") + portStr := sdk.JsonStringValue(e.Settings, "port") + password := sdk.JsonStringValue(e.Settings, "password") - host = sdk.JsonStringValue(e.Settings, "host") - port := e.Settings.GetInt("port") - password = sdk.JsonStringValue(e.Settings, "password") + if host == "" { + host = "127.0.0.1" + } - if port == 0 { + port, err := strconv.Atoi(portStr) + + if err != nil { port = 4444 } key := checkClient(host, port, password) + contextMutex.RLock() + previousKey, existing := contexts[e.Context] + contextMutex.RUnlock() + + if existing && previousKey != key { + contextDiscounnected(e.Context) + } + if key == "" { return } @@ -265,3 +281,23 @@ func cleanupSockets() { client.Disconnect() } } + +func loopContextState(key string, state int) { + + stateMutex.Lock() + cachedStates[key] = state + stateMutex.Unlock() + + contextMutex.RLock() + defer contextMutex.RUnlock() + + for ctx, ctxKey := range contexts { + if ctxKey == key { + action := contextActions[ctx] + + if action == actionReplayToggle { + sdk.SetState(ctx, state) + } + } + } +} diff --git a/slobs/client.go b/slobs/client.go new file mode 100644 index 0000000..53b5b31 --- /dev/null +++ b/slobs/client.go @@ -0,0 +1,245 @@ +package slobs + +import ( + "encoding/json" + "errors" + "github.com/dchest/uniuri" + "github.com/gorilla/websocket" + "log" + "math/rand" + "net/http" + "strconv" + "strings" + "sync" + "sync/atomic" +) + +type SubscriptionHandler func(*ResourceEvent) +type ResponseHandler func(*RPCResponse) + +const ( + Event = "EVENT" + Subscription = "SUBSCRIPTION" +) + +var ( + ErrNoRequest = errors.New("request not found") + ErrNoHandler = errors.New("handler not found") +) + +type Client struct { + conn *websocket.Conn + connected bool + + Address string + + requestId int32 + requests map[int]ResponseHandler + requestLock sync.RWMutex + + subscriptions map[string]SubscriptionHandler + subscriptionLock sync.RWMutex +} + +func NewClient(address string) *Client { + return &Client{ + Address: address, + requests: make(map[int]ResponseHandler), + subscriptions: make(map[string]SubscriptionHandler), + } +} + +func (c *Client) Connected() bool { + return c.connected +} + +func (c *Client) Connect() error { + if c.connected { + return errors.New("already connected") + } + + endpoint := "ws://" + c.Address + "/api/" + paddedRandomIntn(999) + "/" + uniuri.New() + "/websocket" + + var err error + + c.conn, _, err = websocket.DefaultDialer.Dial(endpoint, http.Header{}) + + if err != nil { + return err + } + + _, data, err := c.conn.ReadMessage() + + if err != nil { + return err + } + + if data[0] != 'o' { + return errors.New("invalid initial message") + } + + c.connected = true + + go c.loop() + + return nil +} + +func (c *Client) Disconnect() error { + return c.conn.Close() +} + +func (c *Client) Subscribe(resource, method string, handler SubscriptionHandler) { + c.SendRPC(resource, method, func(response *RPCResponse) { + res := &ResourceEvent{} + + json.Unmarshal(*response.Result, &res) + + c.subscriptionLock.Lock() + c.subscriptions[res.ResourceId] = handler + c.subscriptionLock.Unlock() + }) +} + +func (c *Client) SendRPC(resource, method string, handler ResponseHandler) error { + m := make(map[string]interface{}) + + m["resource"] = resource + m["args"] = []string{} + + atomic.AddInt32(&c.requestId, 1) + + newRequestId := int(atomic.LoadInt32(&c.requestId)) + + request := &RPCRequest{ + ID: newRequestId, + Method: method, + Params: m, + JSONRPC: "2.0", + } + + b, err := json.Marshal(request) + + if err != nil { + return err + } + + if handler != nil { + c.requestLock.Lock() + c.requests[newRequestId] = handler + c.requestLock.Unlock() + } + + return c.conn.WriteJSON([]string{string(b) + "\n"}) +} + +func (c *Client) loop() error { + for { + _, data, err := c.conn.ReadMessage() + + if err != nil { + c.connected = false + return err + } + + if len(data) < 1 { + continue + } + + switch data[0] { + case 'h': + // Heartbeat + continue + case 'a': + // Normal message + arr := make([]string, 0) + err := json.Unmarshal(data[1:], &arr) + + if err != nil { + continue + } + + for _, message := range arr { + resp := &RPCResponse{} + + message = strings.TrimSpace(message) + + log.Println("Handling", message) + + err = json.Unmarshal([]byte(message), &resp) + + if err != nil { + continue + } + + go c.handle(resp) + } + case 'c': + // Session closed + var v []interface{} + if err := json.Unmarshal(data[1:], &v); err != nil { + log.Printf("Closing session: %s", err) + return nil + } + break + default: + log.Println("Unknown:", data[0]) + } + } +} + +func (c *Client) handle(resp *RPCResponse) error { + if resp.ID != nil { + c.requestLock.RLock() + h, ok := c.requests[*resp.ID] + c.requestLock.RUnlock() + + if !ok { + return ErrNoRequest + } + + h(resp) + + c.requestLock.Lock() + delete(c.requests, *resp.ID) + c.requestLock.Unlock() + return nil + } + + res := &ResourceEvent{} + + err := json.Unmarshal(*resp.Result, &res) + + if err != nil { + return err + } + + switch res.Type { + case Event: + c.subscriptionLock.RLock() + h, exists := c.subscriptions[res.ResourceId] + c.subscriptionLock.RUnlock() + + if !exists { + return ErrNoHandler + } + + h(res) + } + + return nil +} + +func paddedRandomIntn(max int) string { + var ( + ml = len(strconv.Itoa(max)) + ri = rand.Intn(max) + is = strconv.Itoa(ri) + ) + + if len(is) < ml { + is = strings.Repeat("0", ml-len(is)) + is + } + + return is +} diff --git a/slobs/rpc.go b/slobs/rpc.go new file mode 100644 index 0000000..c81818e --- /dev/null +++ b/slobs/rpc.go @@ -0,0 +1,47 @@ +package slobs + +import "encoding/json" + +type RPCRequest struct { + Method string `json:"method"` + Params interface{} `json:"params,omitempty"` + ID int `json:"id"` + JSONRPC string `json:"jsonrpc"` +} + +// RPCResponse represents a JSON-RPC response object. +// +// Result: holds the result of the rpc call if no error occurred, nil otherwise. can be nil even on success. +// +// Error: holds an RPCError object if an error occurred. must be nil on success. +// +// ID: may always be 0 for single requests. is unique for each request in a batch call (see CallBatch()) +// +// JSONRPC: must always be set to "2.0" for JSON-RPC version 2.0 +// +// See: http://www.jsonrpc.org/specification#response_object +type RPCResponse struct { + JSONRPC string `json:"jsonrpc"` + Result *json.RawMessage `json:"result,omitempty"` + Error *RPCError `json:"error,omitempty"` + ID *int `json:"id"` +} + +func (r *RPCResponse) DecodeTo(v interface{}) error { + return json.Unmarshal(*r.Result, &v) +} + +// RPCError represents a JSON-RPC error object if an RPC error occurred. +// +// Code: holds the error code +// +// Message: holds a short error message +// +// Data: holds additional error data, may be nil +// +// See: http://www.jsonrpc.org/specification#error_object +type RPCError struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} diff --git a/slobs/slobs.go b/slobs/slobs.go new file mode 100644 index 0000000..2532826 --- /dev/null +++ b/slobs/slobs.go @@ -0,0 +1,20 @@ +package slobs + +import "encoding/json" + +type ResourceEvent struct { + Type string `json:"_type"` + ResourceId string `json:"resourceId"` + Emitter string `json:"emitter"` + Data *json.RawMessage `json:"data"` +} + +func (e *ResourceEvent) DecodeTo(v interface{}) error { + return json.Unmarshal(*e.Data, &v) +} + +type IStreamingState struct { + StreamingStatus string `json:"streamingStatus"` + RecordingStatus string `json:"recordingStatus"` + ReplayBufferStatus string `json:"replayBufferStatus"` +} diff --git a/slobs/slobs_test.go b/slobs/slobs_test.go new file mode 100644 index 0000000..27ff69b --- /dev/null +++ b/slobs/slobs_test.go @@ -0,0 +1,25 @@ +package slobs + +import ( + "log" + "testing" +) + +func Test_StreamlabsOBS(t *testing.T) { + c := NewClient("127.0.0.1:59650") + + err := c.Connect() + + if err != nil { + t.Fatal(err) + } + + c.Subscribe("StreamingService", "replayBufferStatusChange", func(event *ResourceEvent) { + var status string + event.DecodeTo(&status) + log.Println("Event received:", status) + }) + + ch := make(chan struct{}, 1) + <-ch +}