Use a channel to notify when the connection is closed

This commit is contained in:
Tyler 2017-04-16 15:07:27 -04:00
parent 1aa3e42f91
commit 956d93782b
1 changed files with 50 additions and 32 deletions

View File

@ -53,6 +53,7 @@ type TwitchPubSub struct {
onceHandlers map[string][]*eventHandlerInstance onceHandlers map[string][]*eventHandlerInstance
// Responses // Responses
listening chan interface{}
responseCh chan *twitchMessage responseCh chan *twitchMessage
SubscribedTopics []string SubscribedTopics []string
@ -78,8 +79,10 @@ func (t *TwitchPubSub) Open() error {
t.wsConn = c t.wsConn = c
go t.reader(c) t.listening = make(chan interface{})
go t.pinger()
go t.reader(t.wsConn, t.listening)
go t.pinger(t.wsConn, t.listening)
if len(t.SubscribedTopics) > 0 { if len(t.SubscribedTopics) > 0 {
return t.listen(t.SubscribedTopics) return t.listen(t.SubscribedTopics)
@ -91,6 +94,11 @@ func (t *TwitchPubSub) Open() error {
} }
func (t *TwitchPubSub) Close() error { func (t *TwitchPubSub) Close() error {
if t.listening != nil {
close(t.listening)
t.listening = nil
}
if t.wsConn != nil { if t.wsConn != nil {
// To cleanly close a connection, a client should send a close // To cleanly close a connection, a client should send a close
// frame and wait for the server to close the connection. // frame and wait for the server to close the connection.
@ -197,7 +205,7 @@ func (t *TwitchPubSub) Unlisten(topics []string) error {
return nil return nil
} }
func (t *TwitchPubSub) reader(wsConn *websocket.Conn) { func (t *TwitchPubSub) reader(wsConn *websocket.Conn, listening <-chan interface{}) {
for { for {
var message twitchMessage var message twitchMessage
err := t.wsConn.ReadJSON(&message) err := t.wsConn.ReadJSON(&message)
@ -214,51 +222,61 @@ func (t *TwitchPubSub) reader(wsConn *websocket.Conn) {
return return
} }
if message.Type == Pong { select {
// PONG! case <-listening:
} else if message.Type == Message { return
var data interface{} default:
if message.Type == Pong {
// PONG!
} else if message.Type == Message {
var data interface{}
if err := json.Unmarshal([]byte(message.Data.Message), &data); err != nil { if err := json.Unmarshal([]byte(message.Data.Message), &data); err != nil {
continue continue
}
if strings.Index(message.Data.Topic, "video-playback") == 0 {
m := data.(map[string]interface{})
ch := message.Data.Topic[strings.Index(message.Data.Topic, ".") + 1:]
ty := m["type"].(string)
server_time := time.Unix(int64(m["server_time"].(float64)), 0)
if ty == "viewcount" {
go t.handle(ty, &ViewerCount{Channel: ch, Viewers: int(m["viewers"].(float64)), ServerTime: server_time})
} else if ty == "stream-up" {
go t.handle(ty, &StreamUp{Channel: ch, ServerTime: server_time})
} else if ty == "stream-down" {
go t.handle(ty, &StreamDown{Channel: ch, ServerTime: server_time})
} }
if strings.Index(message.Data.Topic, "video-playback") == 0 {
m := data.(map[string]interface{})
ch := message.Data.Topic[strings.Index(message.Data.Topic, ".") + 1:]
ty := m["type"].(string)
server_time := time.Unix(int64(m["server_time"].(float64)), 0)
if ty == "viewcount" {
go t.handle(ty, &ViewerCount{Channel: ch, Viewers: int(m["viewers"].(float64)), ServerTime: server_time})
} else if ty == "stream-up" {
go t.handle(ty, &StreamUp{Channel: ch, ServerTime: server_time})
} else if ty == "stream-down" {
go t.handle(ty, &StreamDown{Channel: ch, ServerTime: server_time})
}
}
} else if message.Type == Response {
t.responseCh <- &message
} else if message.Type == reconnectEventType {
go t.handle(message.Type, &Reconnect{})
t.reconnect()
} }
} else if message.Type == Response {
t.responseCh <- &message
} else if message.Type == reconnectEventType {
go t.handle(message.Type, &Reconnect{})
t.reconnect()
} }
} }
} }
func (t *TwitchPubSub) pinger() { func (t *TwitchPubSub) pinger(wsConn *websocket.Conn, listening <-chan interface{}) {
ticker := time.NewTicker(150 * time.Second) ticker := time.NewTicker(150 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
<- ticker.C
t.wsMutex.Lock() t.wsMutex.Lock()
err := t.wsConn.WriteJSON(&twitchMessage{Type: Ping}) err := wsConn.WriteJSON(&twitchMessage{Type: Ping})
t.wsMutex.Unlock() t.wsMutex.Unlock()
if err != nil { if err != nil {
return return
} }
select {
case <-ticker.C:
// continue loop and send heartbeat
case <-listening:
return
}
} }
} }