340 lines
6.5 KiB
Go
340 lines
6.5 KiB
Go
package twitchpubsub
|
|
|
|
import (
|
|
"time"
|
|
|
|
"encoding/json"
|
|
"errors"
|
|
"github.com/google/uuid"
|
|
"github.com/gorilla/websocket"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
)
|
|
|
|
const (
|
|
Ping = "PING"
|
|
Pong = "PONG"
|
|
Message = "MESSAGE"
|
|
Response = "RESPONSE"
|
|
|
|
TwitchUrl = "wss://pubsub-edge.twitch.tv"
|
|
)
|
|
|
|
type twitchMessage struct {
|
|
Type string `json:"type"`
|
|
Nonce string `json:"nonce,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
Data struct {
|
|
Topic string
|
|
Message string
|
|
} `json:"data"`
|
|
}
|
|
|
|
type twitchListen struct {
|
|
Type string `json:"type"`
|
|
Nonce string `json:"nonce"`
|
|
Data *twitchListenData `json:"data"`
|
|
}
|
|
|
|
type twitchListenData struct {
|
|
Topics []string `json:"topics"`
|
|
AuthToken string `json:"auth_token,omitempty"`
|
|
}
|
|
|
|
type TwitchPubSub struct {
|
|
sync.RWMutex
|
|
|
|
wsConn *websocket.Conn
|
|
wsMutex sync.RWMutex
|
|
|
|
debug bool
|
|
|
|
// Event handlers
|
|
handlersMu sync.RWMutex
|
|
handlers map[string][]*eventHandlerInstance
|
|
onceHandlers map[string][]*eventHandlerInstance
|
|
|
|
// Responses
|
|
listening chan interface{}
|
|
responseCh chan *twitchMessage
|
|
|
|
SubscribedTopics []string
|
|
LastPing time.Time
|
|
LastPong time.Time
|
|
}
|
|
|
|
func NewTwitchPubSub() *TwitchPubSub {
|
|
t := &TwitchPubSub{
|
|
responseCh: make(chan *twitchMessage),
|
|
SubscribedTopics: make([]string, 0),
|
|
}
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *TwitchPubSub) EnableDebug() {
|
|
t.debug = true
|
|
}
|
|
|
|
func (t *TwitchPubSub) Open() error {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
if t.debug {
|
|
log.Println("Opening connection to", TwitchUrl)
|
|
}
|
|
|
|
c, _, err := websocket.DefaultDialer.Dial(TwitchUrl, nil)
|
|
|
|
if err != nil {
|
|
if t.debug {
|
|
log.Println("error opening connection:", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
t.wsConn = c
|
|
|
|
t.listening = make(chan interface{})
|
|
|
|
go t.reader(t.wsConn, t.listening)
|
|
go t.pinger(t.wsConn, t.listening)
|
|
|
|
if len(t.SubscribedTopics) > 0 {
|
|
return t.listen(t.SubscribedTopics)
|
|
} else {
|
|
t.wsMutex.Lock()
|
|
defer t.wsMutex.Unlock()
|
|
t.LastPing = time.Now()
|
|
return t.wsConn.WriteJSON(&twitchMessage{Type: Ping})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TwitchPubSub) Close() error {
|
|
if t.listening != nil {
|
|
close(t.listening)
|
|
t.listening = nil
|
|
}
|
|
|
|
t.Lock()
|
|
|
|
if t.wsConn != nil {
|
|
// To cleanly close a connection, a client should send a close
|
|
// frame and wait for the server to close the connection.
|
|
t.wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
t.wsConn.Close()
|
|
|
|
t.wsConn = nil
|
|
}
|
|
|
|
t.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TwitchPubSub) reconnect() error {
|
|
wait := time.Duration(1)
|
|
|
|
for {
|
|
if t.debug {
|
|
log.Println("Reconnecting")
|
|
}
|
|
|
|
err := t.Open()
|
|
|
|
if err == nil {
|
|
return err
|
|
}
|
|
|
|
if t.debug {
|
|
log.Println("Unable to reconnect")
|
|
}
|
|
|
|
<-time.After(wait * time.Second)
|
|
wait *= 2
|
|
|
|
if wait > 600 {
|
|
wait = 600
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *TwitchPubSub) Listen(topics []string) error {
|
|
err := t.listen(topics)
|
|
|
|
if err == nil {
|
|
// Update topic list
|
|
t.SubscribedTopics = append(t.SubscribedTopics, topics...)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (t *TwitchPubSub) listen(topics []string) error {
|
|
if t.debug {
|
|
log.Println("Attempting to listen to topics", topics)
|
|
}
|
|
|
|
nonce := strings.Replace(uuid.New().String(), "-", "", -1)
|
|
|
|
t.wsMutex.Lock()
|
|
t.wsConn.WriteJSON(&twitchListen{
|
|
Type: "LISTEN",
|
|
Nonce: nonce,
|
|
Data: &twitchListenData{Topics: topics},
|
|
})
|
|
t.wsMutex.Unlock()
|
|
|
|
msg := <- t.responseCh
|
|
|
|
if msg.Nonce != nonce {
|
|
// Something is wrong, concurrent access?
|
|
return errors.New("unexpected nonce")
|
|
}
|
|
|
|
if len(msg.Error) > 0 {
|
|
return errors.New(msg.Error)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TwitchPubSub) Unlisten(topics []string) error {
|
|
nonce := strings.Replace(uuid.New().String(), "-", "", -1)
|
|
|
|
for i := 0; i < len(topics); i++ {
|
|
for x := 0; x < len(t.SubscribedTopics); x++ {
|
|
if t.SubscribedTopics[x] == topics[i] {
|
|
t.SubscribedTopics = append(t.SubscribedTopics[:x], t.SubscribedTopics[x+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
t.wsMutex.Lock()
|
|
t.wsConn.WriteJSON(&twitchListen{
|
|
Type: "UNLISTEN",
|
|
Nonce: nonce,
|
|
Data: &twitchListenData{Topics: topics},
|
|
})
|
|
t.wsMutex.Unlock()
|
|
|
|
msg := <- t.responseCh
|
|
|
|
if msg.Nonce != nonce {
|
|
// Something is wrong, concurrent access?
|
|
return errors.New("unexpected nonce")
|
|
}
|
|
|
|
if len(msg.Error) > 0 {
|
|
return errors.New(msg.Error)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TwitchPubSub) reader(wsConn *websocket.Conn, listening <-chan interface{}) {
|
|
for {
|
|
t.wsConn.SetReadDeadline(t.LastPing.Add(45 * time.Second))
|
|
|
|
var message twitchMessage
|
|
err := t.wsConn.ReadJSON(&message)
|
|
|
|
if err != nil {
|
|
t.RLock()
|
|
sameConnection := t.wsConn == wsConn
|
|
t.RUnlock()
|
|
|
|
if t.debug {
|
|
log.Println("Unexpected error", err, "- attempting to reconnect")
|
|
}
|
|
|
|
if sameConnection {
|
|
t.Close()
|
|
err := t.reconnect()
|
|
|
|
if err != nil {
|
|
log.Fatalln("Unable to reconnect:", err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-listening:
|
|
return
|
|
default:
|
|
if message.Type == Pong {
|
|
// PONG!
|
|
t.LastPong = time.Now()
|
|
} else if message.Type == Message {
|
|
var data interface{}
|
|
|
|
if err := json.Unmarshal([]byte(message.Data.Message), &data); err != nil {
|
|
continue
|
|
}
|
|
|
|
if strings.Index(message.Data.Topic, "video-playback") == 0 {
|
|
m := data.(map[string]interface{})
|
|
ch := message.Data.Topic[strings.Index(message.Data.Topic, ".") + 1:]
|
|
ty := m["type"].(string)
|
|
serverTime := time.Unix(int64(m["serverTime"].(float64)), 0)
|
|
|
|
if ty == "viewcount" {
|
|
go t.handle(ty, &ViewerCount{Channel: ch, Viewers: int(m["viewers"].(float64)), ServerTime: serverTime})
|
|
} else if ty == "stream-up" {
|
|
go t.handle(ty, &StreamUp{Channel: ch, ServerTime: serverTime})
|
|
} else if ty == "stream-down" {
|
|
go t.handle(ty, &StreamDown{Channel: ch, ServerTime: serverTime})
|
|
}
|
|
}
|
|
} else if message.Type == Response {
|
|
t.responseCh <- &message
|
|
} else if message.Type == reconnectEventType {
|
|
go t.handle(message.Type, &Reconnect{})
|
|
t.Close()
|
|
err := t.reconnect()
|
|
|
|
if err != nil {
|
|
log.Fatalln("Unable to reconnect after event:", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *TwitchPubSub) pinger(wsConn *websocket.Conn, listening <-chan interface{}) {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
if t.debug {
|
|
log.Println("Sending ping")
|
|
}
|
|
|
|
t.LastPing = time.Now()
|
|
t.wsMutex.Lock()
|
|
err := wsConn.WriteJSON(&twitchMessage{Type: Ping})
|
|
t.wsMutex.Unlock()
|
|
|
|
if err != nil {
|
|
if t.debug {
|
|
log.Println("Unable to send ping:", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
// continue loop and send heartbeat
|
|
case <-listening:
|
|
return
|
|
}
|
|
}
|
|
} |