package twitchpubsub import ( "time" "github.com/gorilla/websocket" "github.com/google/uuid" "strings" "errors" "encoding/json" "sync" "log" ) const ( Ping = "PING" Pong = "PONG" Message = "MESSAGE" Response = "RESPONSE" TwitchUrl = "wss://pubsub-edge.twitch.tv" ) type twitchMessage struct { Type string `json:"type"` Nonce string `json:"nonce,omitempty"` Error string `json:"error,omitempty"` Data struct { Topic string Message string } `json:"data"` } type twitchListen struct { Type string `json:"type"` Nonce string `json:"nonce"` Data *twitchListenData `json:"data"` } type twitchListenData struct { Topics []string `json:"topics"` AuthToken string `json:"auth_token,omitempty"` } type TwitchPubSub struct { sync.RWMutex wsConn *websocket.Conn wsMutex sync.RWMutex debug bool // Event handlers handlersMu sync.RWMutex handlers map[string][]*eventHandlerInstance onceHandlers map[string][]*eventHandlerInstance // Responses listening chan interface{} responseCh chan *twitchMessage SubscribedTopics []string } func NewTwitchPubSub() *TwitchPubSub { t := &TwitchPubSub{ responseCh: make(chan *twitchMessage), } return t } func (t *TwitchPubSub) EnableDebug() { t.debug = true } func (t *TwitchPubSub) Open() error { t.Lock() defer t.Unlock() if t.debug { log.Println("Opening connection to", TwitchUrl) } c, _, err := websocket.DefaultDialer.Dial(TwitchUrl, nil) if err != nil { if t.debug { log.Println("error opening connection:", err) } return err } c.SetReadDeadline(45 * time.Second) t.wsConn = c t.listening = make(chan interface{}) go t.reader(t.wsConn, t.listening) go t.pinger(t.wsConn, t.listening) if len(t.SubscribedTopics) > 0 { return t.listen(t.SubscribedTopics) } else { return t.wsConn.WriteJSON(&twitchMessage{Type: Ping}) } return nil } func (t *TwitchPubSub) Close() error { if t.listening != nil { close(t.listening) t.listening = nil } if t.wsConn != nil { // To cleanly close a connection, a client should send a close // frame and wait for the server to close the connection. t.wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) time.Sleep(1 * time.Second) t.wsConn.Close() t.wsConn = nil } t.Unlock() return nil } func (t *TwitchPubSub) reconnect() error { wait := time.Duration(1) for { if t.debug { log.Println("Reconnecting") } err := t.Open() if err == nil { return err } if t.debug { log.Println("Unable to reconnect") } <-time.After(wait * time.Second) wait *= 2 if wait > 600 { wait = 600 } } } func (t *TwitchPubSub) Listen(topics []string) error { err := t.listen(topics) if err != nil { // Update topic list t.SubscribedTopics = append(t.SubscribedTopics, topics...) } return err } func (t *TwitchPubSub) listen(topics []string) error { if t.debug { log.Println("Attempting to listen to topics", topics) } nonce := strings.Replace(uuid.New().String(), "-", "", -1) t.wsMutex.Lock() t.wsConn.WriteJSON(&twitchListen{ Type: "LISTEN", Nonce: nonce, Data: &twitchListenData{Topics: topics}, }) t.wsMutex.Unlock() msg := <- t.responseCh if msg.Nonce != nonce { // Something is wrong, concurrent access? return errors.New("Unexpected nonce") } if len(msg.Error) > 0 { return errors.New(msg.Error) } return nil } func (t *TwitchPubSub) Unlisten(topics []string) error { nonce := strings.Replace(uuid.New().String(), "-", "", -1) for i := 0; i < len(topics); i++ { for x := 0; x < len(t.SubscribedTopics); x++ { if t.SubscribedTopics[x] == topics[i] { t.SubscribedTopics = append(t.SubscribedTopics[:x], t.SubscribedTopics[x+1:]...) break } } } t.wsMutex.Lock() t.wsConn.WriteJSON(&twitchListen{ Type: "UNLISTEN", Nonce: nonce, Data: &twitchListenData{Topics: topics}, }) t.wsMutex.Unlock() msg := <- t.responseCh if msg.Nonce != nonce { // Something is wrong, concurrent access? return errors.New("Unexpected nonce") } if len(msg.Error) > 0 { return errors.New(msg.Error) } return nil } func (t *TwitchPubSub) reader(wsConn *websocket.Conn, listening <-chan interface{}) { for { var message twitchMessage err := t.wsConn.ReadJSON(&message) if err != nil { t.RLock() sameConnection := t.wsConn == wsConn t.RUnlock() if t.debug { log.Println("Unexpected error", err, "- attempting to reconnect") } if sameConnection { t.Close() t.reconnect() } return } select { case <-listening: return default: if message.Type == Pong { // PONG! } else if message.Type == Message { var data interface{} if err := json.Unmarshal([]byte(message.Data.Message), &data); err != nil { continue } if strings.Index(message.Data.Topic, "video-playback") == 0 { m := data.(map[string]interface{}) ch := message.Data.Topic[strings.Index(message.Data.Topic, ".") + 1:] ty := m["type"].(string) server_time := time.Unix(int64(m["server_time"].(float64)), 0) if ty == "viewcount" { go t.handle(ty, &ViewerCount{Channel: ch, Viewers: int(m["viewers"].(float64)), ServerTime: server_time}) } else if ty == "stream-up" { go t.handle(ty, &StreamUp{Channel: ch, ServerTime: server_time}) } else if ty == "stream-down" { go t.handle(ty, &StreamDown{Channel: ch, ServerTime: server_time}) } } } else if message.Type == Response { t.responseCh <- &message } else if message.Type == reconnectEventType { go t.handle(message.Type, &Reconnect{}) t.reconnect() } } } } func (t *TwitchPubSub) pinger(wsConn *websocket.Conn, listening <-chan interface{}) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { if t.debug { log.Println("Sending ping") } t.wsMutex.Lock() err := wsConn.WriteJSON(&twitchMessage{Type: Ping}) t.wsMutex.Unlock() if err != nil { return } select { case <-ticker.C: // continue loop and send heartbeat case <-listening: return } } }