twitchpubsub/pubsub.go

264 lines
5.0 KiB
Go

package twitchpubsub
import (
"time"
"github.com/gorilla/websocket"
"github.com/google/uuid"
"strings"
"errors"
"encoding/json"
"sync"
)
const (
Ping = "PING"
Pong = "PONG"
Message = "MESSAGE"
Response = "RESPONSE"
TwitchUrl = "wss://pubsub-edge.twitch.tv"
)
type twitchMessage struct {
Type string `json:"type"`
Nonce string `json:"nonce,omitempty"`
Error string `json:"error,omitempty"`
Data struct {
Topic string
Message string
} `json:"data"`
}
type twitchListen struct {
Type string `json:"type"`
Nonce string `json:"nonce"`
Data *twitchListenData `json:"data"`
}
type twitchListenData struct {
Topics []string `json:"topics"`
AuthToken string `json:"auth_token,omitempty"`
}
type TwitchPubSub struct {
sync.RWMutex
wsConn *websocket.Conn
wsMutex sync.RWMutex
// Event handlers
handlersMu sync.RWMutex
handlers map[string][]*eventHandlerInstance
onceHandlers map[string][]*eventHandlerInstance
// Responses
responseCh chan *twitchMessage
SubscribedTopics []string
}
func NewTwitchPubSub() *TwitchPubSub {
t := &TwitchPubSub{
responseCh: make(chan *twitchMessage),
}
return t
}
func (t *TwitchPubSub) Open() error {
t.Lock()
defer t.Unlock()
c, _, err := websocket.DefaultDialer.Dial(TwitchUrl, nil)
if err != nil {
return err
}
t.wsConn = c
go t.reader(c)
go t.pinger()
if len(t.SubscribedTopics) > 0 {
return t.listen(t.SubscribedTopics)
} else {
return t.wsConn.WriteJSON(&twitchMessage{Type: Ping})
}
return nil
}
func (t *TwitchPubSub) Close() error {
if t.wsConn != nil {
// To cleanly close a connection, a client should send a close
// frame and wait for the server to close the connection.
t.wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
time.Sleep(1 * time.Second)
t.wsConn.Close()
t.wsConn = nil
}
t.Unlock()
return nil
}
func (t *TwitchPubSub) reconnect() error {
wait := time.Duration(1)
for {
err := t.Open()
if err == nil {
return err
}
<-time.After(wait * time.Second)
wait *= 2
if wait > 600 {
wait = 600
}
}
}
func (t *TwitchPubSub) Listen(topics []string) error {
err := t.listen(topics)
if err != nil {
// Update topic list
t.SubscribedTopics = append(t.SubscribedTopics, topics...)
}
return err
}
func (t *TwitchPubSub) listen(topics []string) error {
nonce := strings.Replace(uuid.New().String(), "-", "", -1)
t.wsMutex.Lock()
t.wsConn.WriteJSON(&twitchListen{
Type: "LISTEN",
Nonce: nonce,
Data: &twitchListenData{Topics: topics},
})
t.wsMutex.Unlock()
msg := <- t.responseCh
if msg.Nonce != nonce {
// Something is wrong, concurrent access?
return errors.New("Unexpected nonce")
}
if len(msg.Error) > 0 {
return errors.New(msg.Error)
}
return nil
}
func (t *TwitchPubSub) Unlisten(topics []string) error {
nonce := strings.Replace(uuid.New().String(), "-", "", -1)
for i := 0; i < len(topics); i++ {
for x := 0; x < len(t.SubscribedTopics); x++ {
if t.SubscribedTopics[x] == topics[i] {
t.SubscribedTopics = append(t.SubscribedTopics[:x], t.SubscribedTopics[x+1:]...)
break
}
}
}
t.wsMutex.Lock()
t.wsConn.WriteJSON(&twitchListen{
Type: "UNLISTEN",
Nonce: nonce,
Data: &twitchListenData{Topics: topics},
})
t.wsMutex.Unlock()
msg := <- t.responseCh
if msg.Nonce != nonce {
// Something is wrong, concurrent access?
return errors.New("Unexpected nonce")
}
if len(msg.Error) > 0 {
return errors.New(msg.Error)
}
return nil
}
func (t *TwitchPubSub) reader(wsConn *websocket.Conn) {
for {
var message twitchMessage
err := t.wsConn.ReadJSON(&message)
if err != nil {
t.RLock()
sameConnection := t.wsConn == wsConn
t.RUnlock()
if sameConnection {
t.Close()
t.reconnect()
}
return
}
if message.Type == Pong {
// PONG!
} else if message.Type == Message {
var data interface{}
if err := json.Unmarshal([]byte(message.Data.Message), &data); err != nil {
continue
}
if strings.Index(message.Data.Topic, "video-playback") == 0 {
m := data.(map[string]interface{})
ch := message.Data.Topic[strings.Index(message.Data.Topic, ".") + 1:]
ty := m["type"].(string)
server_time := time.Unix(int64(m["server_time"].(float64)), 0)
if ty == "viewcount" {
go t.handle(ty, &ViewerCount{Channel: ch, Viewers: int(m["viewers"].(float64)), ServerTime: server_time})
} else if ty == "stream-up" {
go t.handle(ty, &StreamUp{Channel: ch, ServerTime: server_time})
} else if ty == "stream-down" {
go t.handle(ty, &StreamDown{Channel: ch, ServerTime: server_time})
}
}
} else if message.Type == Response {
t.responseCh <- &message
} else if message.Type == reconnectEventType {
go t.handle(message.Type, &Reconnect{})
t.reconnect()
}
}
}
func (t *TwitchPubSub) pinger() {
ticker := time.NewTicker(150 * time.Second)
defer ticker.Stop()
for {
<- ticker.C
t.wsMutex.Lock()
err := t.wsConn.WriteJSON(&twitchMessage{Type: Ping})
t.wsMutex.Unlock()
if err != nil {
return
}
}
}