diff --git a/pubsub.go b/pubsub.go index 7edc102..c458c75 100644 --- a/pubsub.go +++ b/pubsub.go @@ -53,6 +53,7 @@ type TwitchPubSub struct { onceHandlers map[string][]*eventHandlerInstance // Responses + listening chan interface{} responseCh chan *twitchMessage SubscribedTopics []string @@ -78,8 +79,10 @@ func (t *TwitchPubSub) Open() error { t.wsConn = c - go t.reader(c) - go t.pinger() + t.listening = make(chan interface{}) + + go t.reader(t.wsConn, t.listening) + go t.pinger(t.wsConn, t.listening) if len(t.SubscribedTopics) > 0 { return t.listen(t.SubscribedTopics) @@ -91,6 +94,11 @@ func (t *TwitchPubSub) Open() error { } func (t *TwitchPubSub) Close() error { + if t.listening != nil { + close(t.listening) + t.listening = nil + } + if t.wsConn != nil { // To cleanly close a connection, a client should send a close // frame and wait for the server to close the connection. @@ -197,7 +205,7 @@ func (t *TwitchPubSub) Unlisten(topics []string) error { return nil } -func (t *TwitchPubSub) reader(wsConn *websocket.Conn) { +func (t *TwitchPubSub) reader(wsConn *websocket.Conn, listening <-chan interface{}) { for { var message twitchMessage err := t.wsConn.ReadJSON(&message) @@ -214,51 +222,61 @@ func (t *TwitchPubSub) reader(wsConn *websocket.Conn) { return } - if message.Type == Pong { - // PONG! - } else if message.Type == Message { - var data interface{} + select { + case <-listening: + return + default: + if message.Type == Pong { + // PONG! + } else if message.Type == Message { + var data interface{} - if err := json.Unmarshal([]byte(message.Data.Message), &data); err != nil { - continue - } - - if strings.Index(message.Data.Topic, "video-playback") == 0 { - m := data.(map[string]interface{}) - ch := message.Data.Topic[strings.Index(message.Data.Topic, ".") + 1:] - ty := m["type"].(string) - server_time := time.Unix(int64(m["server_time"].(float64)), 0) - - if ty == "viewcount" { - go t.handle(ty, &ViewerCount{Channel: ch, Viewers: int(m["viewers"].(float64)), ServerTime: server_time}) - } else if ty == "stream-up" { - go t.handle(ty, &StreamUp{Channel: ch, ServerTime: server_time}) - } else if ty == "stream-down" { - go t.handle(ty, &StreamDown{Channel: ch, ServerTime: server_time}) + if err := json.Unmarshal([]byte(message.Data.Message), &data); err != nil { + continue } + + if strings.Index(message.Data.Topic, "video-playback") == 0 { + m := data.(map[string]interface{}) + ch := message.Data.Topic[strings.Index(message.Data.Topic, ".") + 1:] + ty := m["type"].(string) + server_time := time.Unix(int64(m["server_time"].(float64)), 0) + + if ty == "viewcount" { + go t.handle(ty, &ViewerCount{Channel: ch, Viewers: int(m["viewers"].(float64)), ServerTime: server_time}) + } else if ty == "stream-up" { + go t.handle(ty, &StreamUp{Channel: ch, ServerTime: server_time}) + } else if ty == "stream-down" { + go t.handle(ty, &StreamDown{Channel: ch, ServerTime: server_time}) + } + } + } else if message.Type == Response { + t.responseCh <- &message + } else if message.Type == reconnectEventType { + go t.handle(message.Type, &Reconnect{}) + t.reconnect() } - } else if message.Type == Response { - t.responseCh <- &message - } else if message.Type == reconnectEventType { - go t.handle(message.Type, &Reconnect{}) - t.reconnect() } } } -func (t *TwitchPubSub) pinger() { +func (t *TwitchPubSub) pinger(wsConn *websocket.Conn, listening <-chan interface{}) { ticker := time.NewTicker(150 * time.Second) defer ticker.Stop() for { - <- ticker.C - t.wsMutex.Lock() - err := t.wsConn.WriteJSON(&twitchMessage{Type: Ping}) + err := wsConn.WriteJSON(&twitchMessage{Type: Ping}) t.wsMutex.Unlock() if err != nil { return } + + select { + case <-ticker.C: + // continue loop and send heartbeat + case <-listening: + return + } } } \ No newline at end of file