Compare commits

...

6 Commits

Author SHA1 Message Date
Tyler 2baa15923a Add auth support
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/tag Build is failing Details
2020-06-27 16:59:56 -04:00
Tyler b154704f1d Update readme
continuous-integration/drone/push Build is passing Details
2020-01-16 00:27:02 -05:00
Tyler 2d7b272b63 Add support for streamlabs obs
continuous-integration/drone/push Build is passing Details
2020-01-16 00:19:22 -05:00
Tyler 0ee045f172 Handle cases where the client won't be connected if OBS isn't running
continuous-integration/drone/push Build is passing Details
2020-01-14 23:49:46 -05:00
Tyler c5db9d1f88 Warn/show alert when state is not active
continuous-integration/drone/push Build is passing Details
2020-01-14 23:40:02 -05:00
Tyler 8de69d7c40 [ci-skip] Add readme
continuous-integration/drone/push Build was killed Details
2020-01-14 23:36:12 -05:00
12 changed files with 744 additions and 76 deletions

31
README.md Normal file
View File

@ -0,0 +1,31 @@
StreamDeck OBS Replay
=====================
Enables toggling replay recording and saving replays from Stream Deck with feedback shown on the icons.
Installation - OBS Studio
------------
Install [obs-websocket](https://github.com/Palakis/obs-websocket) and configure
Add plugin to Stream Deck (double click) and configure your icons with the host, port (default: 4444) and password (if configured).
Installation - Streamlabs OBS
-----------------------------
Set port to `59650` and the plugin will auto detect.
Credits
-------
StreamLabs OBS (Base software)
OBS Studio (Base software)
Stéphane Lepin ([obs-websocket](https://github.com/Palakis/obs-websocket))
Chris de Graaf ([go-obs-websocket](https://github.com/christopher-dG/go-obs-websocket))
Elgato/Corsair for Stream Deck + SDK/Documentation
Bumbler (https://bumbler.tv) for icons/plugin icon

35
client.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
"io"
"io/ioutil"
"net/http"
"strconv"
)
type Client interface {
Connect() error
Disconnect() error
Connected() bool
ToggleReplay() error
SaveReplay() error
}
func NewClient(key, host string, port int, password string) Client {
res, err := http.Get("http://" + host + ":" + strconv.Itoa(port) + "/api/info")
if err != nil {
return nil
}
defer res.Body.Close()
io.Copy(ioutil.Discard, res.Body)
if res.StatusCode == 200 {
return NewSlobsClient(key, host, port, password)
}
return NewOBSWSClient(key, host, port, password)
}

64
client_obsws.go Normal file
View File

@ -0,0 +1,64 @@
package main
import (
"meow.tf/streamdeck/obs-replay/obsws"
)
type OBSWSClient struct {
client *obsws.Client
key string
}
func NewOBSWSClient(key, host string, port int, password string) *OBSWSClient {
obsc := &obsws.Client{Host: host, Port: port, Password: password}
return &OBSWSClient{client: obsc, key: key}
}
func (c *OBSWSClient) Connect() error {
err := c.client.Connect()
if err != nil {
return err
}
c.client.AddEventHandler("StreamStatus", streamStatusUpdate(c.key))
c.client.AddEventHandler("ReplayStarted", func(obsws.Event) { loopContextState(c.key, 1) })
c.client.AddEventHandler("ReplayStopped", func(obsws.Event) { loopContextState(c.key, 0) })
return nil
}
func (c *OBSWSClient) Disconnect() error {
return c.client.Disconnect()
}
func (c *OBSWSClient) Connected() bool {
return c.client.Connected()
}
func (c *OBSWSClient) ToggleReplay() error {
req := obsws.NewStartStopReplayBufferRequest()
return req.Send(c.client)
}
func (c *OBSWSClient) SaveReplay() error {
req := obsws.NewSaveReplayBufferRequest()
return req.Send(c.client)
}
func streamStatusUpdate(key string) func(obsws.Event) {
return func(e obsws.Event) {
evt := e.(obsws.StreamStatusEvent)
state := 0
if evt.Replay {
state = 1
}
loopContextState(key, state)
}
}

116
client_slobs.go Normal file
View File

@ -0,0 +1,116 @@
package main
import (
"meow.tf/streamdeck/obs-replay/slobs"
"strconv"
)
type SlobsClient struct {
client *slobs.Client
key string
password string
recording bool
}
func NewSlobsClient(key, host string, port int, password string) *SlobsClient {
slobsc := slobs.NewClient(host + ":" + strconv.Itoa(port))
return &SlobsClient{client: slobsc, key: key, password: password}
}
const (
Running = "running"
Saving = "saving"
Stopping = "stopping"
Offline = "offline"
)
func (c *SlobsClient) Connect() error {
err := c.client.Connect()
if err != nil {
return err
}
c.client.Auth(c.password, func(err error) {
if err != nil {
// TODO alert that it failed to startup?
return
}
c.requestInitialData()
})
return nil
}
func (c *SlobsClient) requestInitialData() {
c.client.SendRPC("StreamingService", "getModel", func(e *slobs.RPCResponse) {
state := &slobs.IStreamingState{}
e.DecodeTo(&state)
switch state.ReplayBufferStatus {
case Saving:
fallthrough
case Running:
loopContextState(c.key, 1)
c.recording = true
case Offline:
loopContextState(c.key, 0)
c.recording = false
}
})
c.client.Subscribe("StreamingService", "replayBufferStatusChange", func(e *slobs.ResourceEvent) {
var status string
e.DecodeTo(&status)
switch status {
case Saving:
return
case Running:
loopContextState(c.key, 1)
c.recording = true
case Offline:
loopContextState(c.key, 0)
c.recording = false
}
})
}
func (c *SlobsClient) Disconnect() error {
return c.client.Disconnect()
}
func (c *SlobsClient) Connected() bool {
return c.client.Connected()
}
func (c *SlobsClient) ToggleReplay() error {
ret := make(chan struct{}, 1)
handler := func(res *slobs.RPCResponse) {
close(ret)
}
if c.recording {
c.client.SendRPC("StreamingService", "stopReplayBuffer", handler)
} else {
c.client.SendRPC("StreamingService", "startReplayBuffer", handler)
}
<-ret
return nil
}
func (c *SlobsClient) SaveReplay() error {
ret := make(chan struct{}, 1)
c.client.SendRPC("StreamingService", "saveReplay", func(res *slobs.RPCResponse) {
close(ret)
})
<-ret
return nil
}

2
go.mod
View File

@ -3,7 +3,7 @@ module meow.tf/streamdeck/obs-replay
go 1.13
require (
github.com/christopher-dG/go-obs-websocket v0.0.0-20181224025342-2efc3605bff5
github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9
github.com/gorilla/websocket v1.4.1
github.com/mitchellh/mapstructure v1.1.2
github.com/valyala/fastjson v1.4.1

5
go.sum
View File

@ -1,6 +1,5 @@
github.com/christopher-dG/go-obs-websocket v0.0.0-20181224025342-2efc3605bff5 h1:VtKPsvxzKt/+EnkhcPp0Xg7MDjt/a+CNRSj5phITbjo=
github.com/christopher-dG/go-obs-websocket v0.0.0-20181224025342-2efc3605bff5/go.mod h1:hFg9UFHefvNCvpWpYtOaP/VT2HyokIJsmV1AUBjpTeQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9 h1:74lLNRzvsdIlkTgfDSMuaPjBr4cf6k7pwQQANm/yLKU=
github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9/go.mod h1:GgB8SF9nRG+GqaDtLcwJZsQFhcogVCJ79j4EdT0c2V4=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=

View File

@ -37,12 +37,12 @@
"Author": "Meow.tf",
"Category": "OBS Replay",
"CodePathWin": "replay.exe",
"Description": "Control OBS' Replays using StreamDeck.",
"Description": "Control OBS' Replays using StreamDeck and Streamlabs OBS/OBS Studio + obs-websocket",
"Name": "OBS Replay",
"Icon": "images/pluginIcon",
"CategoryIcon": "images/pluginIcon",
"URL": "https://streamdeck.meow.tf/obsreplay",
"Version": "1.0.0",
"Version": "1.1.2",
"SDKVersion": 2,
"OS": [
{

187
replay.go
View File

@ -3,25 +3,26 @@ package main
import (
"github.com/valyala/fastjson"
"log"
"meow.tf/streamdeck/obs-replay/obsws"
"meow.tf/streamdeck/sdk"
"strconv"
"sync"
"time"
)
const (
actionReplayToggle = "tf.meow.obsreplay.replay_toggle"
actionReplaySave = "tf.meow.obsreplay.replay_save"
actionReplaySave = "tf.meow.obsreplay.replay_save"
)
var (
contextMutex sync.RWMutex
clientMutex sync.RWMutex
stateMutex sync.RWMutex
clientMutex sync.RWMutex
stateMutex sync.RWMutex
clients = make(map[string]*obsws.Client)
contexts = make(map[string]string)
cachedStates = make(map[string]int)
clients = make(map[string]Client)
contexts = make(map[string]string)
contextActions = make(map[string]string)
cachedStates = make(map[string]int)
)
func replayToggle(action, context string, payload *fastjson.Value, deviceId string) {
@ -32,16 +33,30 @@ func replayToggle(action, context string, payload *fastjson.Value, deviceId stri
return
}
req := obsws.NewStartStopReplayBufferRequest()
err := req.Send(c)
if err != nil {
if err := c.ToggleReplay(); err != nil {
sdk.ShowAlert(context)
return
}
sdk.ShowOk(context)
time.AfterFunc(1*time.Second, func() {
contextMutex.RLock()
key, exists := contexts[context]
contextMutex.RUnlock()
if !exists {
return
}
stateMutex.RLock()
state, exists := cachedStates[key]
stateMutex.RUnlock()
if !exists {
return
}
sdk.SetState(context, state)
})
}
func replaySave(action, context string, payload *fastjson.Value, deviceId string) {
@ -52,9 +67,22 @@ func replaySave(action, context string, payload *fastjson.Value, deviceId string
return
}
req := obsws.NewSaveReplayBufferRequest()
contextMutex.RLock()
key, exists := contexts[context]
contextMutex.RUnlock()
if err := req.Send(c); err != nil {
if exists {
stateMutex.RLock()
state, exists := cachedStates[key]
stateMutex.RUnlock()
if exists && state == 0 {
sdk.ShowAlert(context)
return
}
}
if err := c.SaveReplay(); err != nil {
sdk.ShowAlert(context)
return
}
@ -62,7 +90,7 @@ func replaySave(action, context string, payload *fastjson.Value, deviceId string
sdk.ShowOk(context)
}
func clientForContext(context string) *obsws.Client {
func clientForContext(context string) Client {
contextMutex.RLock()
key, exists := contexts[context]
contextMutex.RUnlock()
@ -79,6 +107,14 @@ func clientForContext(context string) *obsws.Client {
return nil
}
if !c.Connected() {
err := c.Connect()
if err != nil {
return nil
}
}
return c
}
@ -87,9 +123,15 @@ func onWillAppear(e *sdk.WillAppearEvent) {
if settings != nil {
host := sdk.JsonStringValue(settings, "host")
port := settings.GetInt("port")
portStr := sdk.JsonStringValue(settings, "port")
password := sdk.JsonStringValue(settings, "password")
port, err := strconv.Atoi(portStr)
if err != nil {
port = 4444
}
key := checkClient(host, port, password)
if key == "" {
@ -98,17 +140,24 @@ func onWillAppear(e *sdk.WillAppearEvent) {
contextMutex.Lock()
contexts[e.Context] = key
contextActions[e.Context] = e.Action
contextMutex.Unlock()
stateMutex.RLock()
if state, ok := cachedStates[key]; ok {
sdk.SetState(e.Context, state)
if e.Action == actionReplayToggle {
stateMutex.RLock()
if state, ok := cachedStates[key]; ok {
sdk.SetState(e.Context, state)
}
stateMutex.RUnlock()
}
stateMutex.RUnlock()
}
}
func checkClient(host string, port int, password string) string {
if host == "" {
host = "127.0.0.1"
}
if port == 0 {
port = 4444
}
@ -120,17 +169,9 @@ func checkClient(host string, port int, password string) string {
clientMutex.RUnlock()
if !ok {
client = &obsws.Client{Host: host, Port: port, Password: password}
client = NewClient(key, host, port, password)
err := client.Connect()
if err != nil {
return ""
}
client.AddEventHandler("StreamStatus", streamStatusUpdate(key))
client.AddEventHandler("ReplayStarted", loopContextState(key, 1))
client.AddEventHandler("ReplayStopped", loopContextState(key, 0))
defer client.Connect()
clientMutex.Lock()
clients[key] = client
@ -140,45 +181,19 @@ func checkClient(host string, port int, password string) string {
return key
}
func streamStatusUpdate(key string) func(obsws.Event) {
return func(e obsws.Event) {
evt := e.(obsws.StreamStatusEvent)
state := 0
if evt.Replay {
state = 1
}
loopContextState(key, state)
}
}
func loopContextState(key string, state int) func(obsws.Event) {
stateMutex.Lock()
cachedStates[key] = state
stateMutex.Unlock()
return func(event obsws.Event) {
contextMutex.RLock()
defer contextMutex.RUnlock()
for ctx, ctxKey := range contexts {
if ctxKey == key {
sdk.SetState(ctx, state)
}
}
}
}
func onWillDisappear(e *sdk.WillDisappearEvent) {
contextDiscounnected(e.Context)
}
func contextDiscounnected(context string) {
contextMutex.Lock()
defer contextMutex.Unlock()
// replayToggleContexts
key, ok := contexts[e.Context]
key, ok := contexts[context]
delete(contexts, e.Context)
delete(contexts, context)
delete(contextActions, context)
if !ok {
return
@ -198,18 +213,30 @@ func onWillDisappear(e *sdk.WillDisappearEvent) {
}
func onSettingsReceived(e *sdk.ReceiveSettingsEvent) {
var host, password string
host := sdk.JsonStringValue(e.Settings, "host")
portStr := sdk.JsonStringValue(e.Settings, "port")
password := sdk.JsonStringValue(e.Settings, "password")
host = sdk.JsonStringValue(e.Settings, "host")
port := e.Settings.GetInt("port")
password = sdk.JsonStringValue(e.Settings, "password")
if host == "" {
host = "127.0.0.1"
}
if port == 0 {
port, err := strconv.Atoi(portStr)
if err != nil {
port = 4444
}
key := checkClient(host, port, password)
contextMutex.RLock()
previousKey, existing := contexts[e.Context]
contextMutex.RUnlock()
if existing && previousKey != key {
contextDiscounnected(e.Context)
}
if key == "" {
return
}
@ -253,4 +280,24 @@ func cleanupSockets() {
for _, client := range clients {
client.Disconnect()
}
}
}
func loopContextState(key string, state int) {
stateMutex.Lock()
cachedStates[key] = state
stateMutex.Unlock()
contextMutex.RLock()
defer contextMutex.RUnlock()
for ctx, ctxKey := range contexts {
if ctxKey == key {
action := contextActions[ctx]
if action == actionReplayToggle {
sdk.SetState(ctx, state)
}
}
}
}

275
slobs/client.go Normal file
View File

@ -0,0 +1,275 @@
package slobs
import (
"encoding/json"
"errors"
"github.com/dchest/uniuri"
"github.com/gorilla/websocket"
"log"
"math/rand"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
)
type SubscriptionHandler func(*ResourceEvent)
type ResponseHandler func(*RPCResponse)
const (
Event = "EVENT"
Subscription = "SUBSCRIPTION"
)
var (
ErrNoRequest = errors.New("request not found")
ErrNoHandler = errors.New("handler not found")
)
type Client struct {
conn *websocket.Conn
connected bool
Address string
requestId int32
requests map[int]ResponseHandler
requestLock sync.RWMutex
subscriptions map[string]SubscriptionHandler
subscriptionLock sync.RWMutex
}
func NewClient(address string) *Client {
return &Client{
Address: address,
requests: make(map[int]ResponseHandler),
subscriptions: make(map[string]SubscriptionHandler),
}
}
func (c *Client) Connected() bool {
return c.connected
}
func (c *Client) Connect() error {
if c.connected {
return errors.New("already connected")
}
endpoint := "ws://" + c.Address + "/api/" + paddedRandomIntn(999) + "/" + uniuri.New() + "/websocket"
var err error
c.conn, _, err = websocket.DefaultDialer.Dial(endpoint, http.Header{})
if err != nil {
return err
}
_, data, err := c.conn.ReadMessage()
if err != nil {
return err
}
if data[0] != 'o' {
return errors.New("invalid initial message")
}
c.connected = true
go c.loop()
return nil
}
func (c *Client) Auth(key string, callback func(error)) {
c.SendRPC("TcpServerService", "auth", func(response *RPCResponse) {
if response.Error != nil {
callback(errors.New(response.Error.Message))
return
}
var result bool
json.Unmarshal(*response.Result, &result)
if !result {
return
}
callback(nil)
}, key)
}
func (c *Client) Disconnect() error {
return c.conn.Close()
}
func (c *Client) Subscribe(resource, method string, handler SubscriptionHandler) error {
responseCh := make(chan error, 1)
c.SendRPC(resource, method, func(response *RPCResponse) {
if response.Error != nil {
responseCh <- errors.New(response.Error.Message)
return
}
res := &ResourceEvent{}
json.Unmarshal(*response.Result, &res)
c.subscriptionLock.Lock()
c.subscriptions[res.ResourceId] = handler
c.subscriptionLock.Unlock()
close(responseCh)
})
return <-responseCh
}
func (c *Client) SendRPC(resource, method string, handler ResponseHandler, args ...string) error {
m := make(map[string]interface{})
m["resource"] = resource
m["args"] = args
atomic.AddInt32(&c.requestId, 1)
newRequestId := int(atomic.LoadInt32(&c.requestId))
request := &RPCRequest{
ID: newRequestId,
Method: method,
Params: m,
JSONRPC: "2.0",
}
b, err := json.Marshal(request)
if err != nil {
return err
}
if handler != nil {
c.requestLock.Lock()
c.requests[newRequestId] = handler
c.requestLock.Unlock()
}
return c.conn.WriteJSON([]string{string(b) + "\n"})
}
func (c *Client) loop() error {
for {
_, data, err := c.conn.ReadMessage()
if err != nil {
c.connected = false
return err
}
if len(data) < 1 {
continue
}
switch data[0] {
case 'h':
// Heartbeat
continue
case 'a':
// Normal message
arr := make([]string, 0)
err := json.Unmarshal(data[1:], &arr)
if err != nil {
continue
}
for _, message := range arr {
resp := &RPCResponse{}
message = strings.TrimSpace(message)
log.Println("Handling", message)
err = json.Unmarshal([]byte(message), &resp)
if err != nil {
continue
}
go c.handle(resp)
}
case 'c':
// Session closed
var v []interface{}
if err := json.Unmarshal(data[1:], &v); err != nil {
log.Printf("Closing session: %s", err)
return nil
}
break
default:
log.Println("Unknown:", data[0])
}
}
}
func (c *Client) handle(resp *RPCResponse) error {
if resp.ID != nil {
c.requestLock.RLock()
h, ok := c.requests[*resp.ID]
c.requestLock.RUnlock()
if !ok {
return ErrNoRequest
}
h(resp)
c.requestLock.Lock()
delete(c.requests, *resp.ID)
c.requestLock.Unlock()
return nil
}
res := &ResourceEvent{}
err := json.Unmarshal(*resp.Result, &res)
if err != nil {
return err
}
switch res.Type {
case Event:
c.subscriptionLock.RLock()
h, exists := c.subscriptions[res.ResourceId]
c.subscriptionLock.RUnlock()
if !exists {
return ErrNoHandler
}
h(res)
}
return nil
}
func paddedRandomIntn(max int) string {
var (
ml = len(strconv.Itoa(max))
ri = rand.Intn(max)
is = strconv.Itoa(ri)
)
if len(is) < ml {
is = strings.Repeat("0", ml-len(is)) + is
}
return is
}

47
slobs/rpc.go Normal file
View File

@ -0,0 +1,47 @@
package slobs
import "encoding/json"
type RPCRequest struct {
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
ID int `json:"id"`
JSONRPC string `json:"jsonrpc"`
}
// RPCResponse represents a JSON-RPC response object.
//
// Result: holds the result of the rpc call if no error occurred, nil otherwise. can be nil even on success.
//
// Error: holds an RPCError object if an error occurred. must be nil on success.
//
// ID: may always be 0 for single requests. is unique for each request in a batch call (see CallBatch())
//
// JSONRPC: must always be set to "2.0" for JSON-RPC version 2.0
//
// See: http://www.jsonrpc.org/specification#response_object
type RPCResponse struct {
JSONRPC string `json:"jsonrpc"`
Result *json.RawMessage `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
ID *int `json:"id"`
}
func (r *RPCResponse) DecodeTo(v interface{}) error {
return json.Unmarshal(*r.Result, &v)
}
// RPCError represents a JSON-RPC error object if an RPC error occurred.
//
// Code: holds the error code
//
// Message: holds a short error message
//
// Data: holds additional error data, may be nil
//
// See: http://www.jsonrpc.org/specification#error_object
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}

20
slobs/slobs.go Normal file
View File

@ -0,0 +1,20 @@
package slobs
import "encoding/json"
type ResourceEvent struct {
Type string `json:"_type"`
ResourceId string `json:"resourceId"`
Emitter string `json:"emitter"`
Data *json.RawMessage `json:"data"`
}
func (e *ResourceEvent) DecodeTo(v interface{}) error {
return json.Unmarshal(*e.Data, &v)
}
type IStreamingState struct {
StreamingStatus string `json:"streamingStatus"`
RecordingStatus string `json:"recordingStatus"`
ReplayBufferStatus string `json:"replayBufferStatus"`
}

34
slobs/slobs_test.go Normal file
View File

@ -0,0 +1,34 @@
package slobs
import (
"log"
"testing"
)
func Test_StreamlabsOBS(t *testing.T) {
c := NewClient("127.0.0.1:59650")
err := c.Connect()
if err != nil {
t.Fatal(err)
}
closeCh := make(chan struct{}, 1)
c.Auth("a", func(err error) {
if err != nil {
t.Fatal(err)
closeCh <- struct{}{}
return
}
c.Subscribe("StreamingService", "replayBufferStatusChange", func(event *ResourceEvent) {
var status string
event.DecodeTo(&status)
log.Println("Event received:", status)
})
})
<-closeCh
}