commit 649cec1c25eeadff663eb4ae361957ee1624e71f Author: Tyler Date: Sat Apr 15 01:35:29 2017 -0400 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5657f6e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +vendor \ No newline at end of file diff --git a/src/meow.tf/twitchpubsub/eventhandlers.go b/src/meow.tf/twitchpubsub/eventhandlers.go new file mode 100644 index 0000000..b9db7bf --- /dev/null +++ b/src/meow.tf/twitchpubsub/eventhandlers.go @@ -0,0 +1,111 @@ +package twitchpubsub + +const ( + viewerCountEventType = "viewcount" + streamUpEventType = "stream-up" + streamDownEventType = "stream-down" + + reconnectEventType = "RECONNECT" +) + +// viewerCountEventHandler is an event handler for ViewerCount events. +type viewerCountEventHandler func(*TwitchPubSub, *ViewerCount) + +// Type returns the event type for ViewerCount events. +func (eh viewerCountEventHandler) Type() string { + return viewerCountEventType +} + +// New returns a new instance of ViewerCount. +func (eh viewerCountEventHandler) New() interface{} { + return &ViewerCount{} +} + +// Handle is the handler for ViewerCount events. +func (eh viewerCountEventHandler) Handle(s *TwitchPubSub, i interface{}) { + if t, ok := i.(*ViewerCount); ok { + eh(s, t) + } +} + +// streamUpEventHandler is an event handler for StreamUp events. +type streamUpEventHandler func(*TwitchPubSub, *StreamUp) + +// Type returns the event type for StreamUp events. +func (eh streamUpEventHandler) Type() string { + return streamUpEventType +} + +// New returns a new instance of StreamUp. +func (eh streamUpEventHandler) New() interface{} { + return &StreamUp{} +} + +// Handle is the handler for StreamUp events. +func (eh streamUpEventHandler) Handle(s *TwitchPubSub, i interface{}) { + if t, ok := i.(*StreamUp); ok { + eh(s, t) + } +} + +// viewerCountEventHandler is an event handler for StreamDown events. +type streamDownEventHandler func(*TwitchPubSub, *StreamDown) + +// Type returns the event type for StreamDown events. +func (eh streamDownEventHandler) Type() string { + return streamDownEventType +} + +// New returns a new instance of StreamDown. +func (eh streamDownEventHandler) New() interface{} { + return &StreamDown{} +} + +// Handle is the handler for StreamDown events. +func (eh streamDownEventHandler) Handle(s *TwitchPubSub, i interface{}) { + if t, ok := i.(*StreamDown); ok { + eh(s, t) + } +} + +// viewerCountEventHandler is an event handler for StreamDown events. +type reconnectEventHandler func(*TwitchPubSub, *Reconnect) + +// Type returns the event type for StreamDown events. +func (eh reconnectEventHandler) Type() string { + return reconnectEventType +} + +// New returns a new instance of StreamDown. +func (eh reconnectEventHandler) New() interface{} { + return &Reconnect{} +} + +// Handle is the handler for StreamDown events. +func (eh reconnectEventHandler) Handle(s *TwitchPubSub, i interface{}) { + if t, ok := i.(*Reconnect); ok { + eh(s, t) + } +} + +func handlerForInterface(handler interface{}) EventHandler { + switch v := handler.(type) { + case func(*TwitchPubSub, interface{}): + return interfaceEventHandler(v) + case func(*TwitchPubSub, *ViewerCount): + return viewerCountEventHandler(v) + case func(*TwitchPubSub, *StreamUp): + return streamUpEventHandler(v) + case func(*TwitchPubSub, *StreamDown): + return streamDownEventHandler(v) + } + + return nil +} + +func init() { + registerInterfaceProvider(viewerCountEventHandler(nil)) + registerInterfaceProvider(streamUpEventHandler(nil)) + registerInterfaceProvider(streamDownEventHandler(nil)) + registerInterfaceProvider(reconnectEventHandler(nil)) +} \ No newline at end of file diff --git a/src/meow.tf/twitchpubsub/events.go b/src/meow.tf/twitchpubsub/events.go new file mode 100644 index 0000000..8d64799 --- /dev/null +++ b/src/meow.tf/twitchpubsub/events.go @@ -0,0 +1,180 @@ +package twitchpubsub + +import ( + "fmt" +) + +// EventHandler is an interface for Discord events. +type EventHandler interface { + // Type returns the type of event this handler belongs to. + Type() string + + // Handle is called whenever an event of Type() happens. + // It is the recievers responsibility to type assert that the interface + // is the expected struct. + Handle(*TwitchPubSub, interface{}) +} + +// EventInterfaceProvider is an interface for providing empty interfaces for +// Discord events. +type EventInterfaceProvider interface { + // Type is the type of event this handler belongs to. + Type() string + + // New returns a new instance of the struct this event handler handles. + // This is called once per event. + // The struct is provided to all handlers of the same Type(). + New() interface{} +} + +// interfaceEventType is the event handler type for interface{} events. +const interfaceEventType = "__INTERFACE__" + +// interfaceEventHandler is an event handler for interface{} events. +type interfaceEventHandler func(*TwitchPubSub, interface{}) + +// Type returns the event type for interface{} events. +func (eh interfaceEventHandler) Type() string { + return interfaceEventType +} + +// Handle is the handler for an interface{} event. +func (eh interfaceEventHandler) Handle(t *TwitchPubSub, i interface{}) { + eh(t, i) +} + +var registeredInterfaceProviders = map[string]EventInterfaceProvider{} + +// registerInterfaceProvider registers a provider so that DiscordGo can +// access it's New() method. +func registerInterfaceProvider(eh EventInterfaceProvider) error { + if _, ok := registeredInterfaceProviders[eh.Type()]; ok { + return fmt.Errorf("event %s already registered", eh.Type()) + } + registeredInterfaceProviders[eh.Type()] = eh + return nil +} + +// eventHandlerInstance is a wrapper around an event handler, as functions +// cannot be compared directly. +type eventHandlerInstance struct { + eventHandler EventHandler +} + +// addEventHandler adds an event handler that will be fired anytime +// the Discord WSAPI matching eventHandler.Type() fires. +func (t *TwitchPubSub) addEventHandler(eventHandler EventHandler) func() { + t.handlersMu.Lock() + defer t.handlersMu.Unlock() + + if t.handlers == nil { + t.handlers = map[string][]*eventHandlerInstance{} + } + + ehi := &eventHandlerInstance{eventHandler} + t.handlers[eventHandler.Type()] = append(t.handlers[eventHandler.Type()], ehi) + + return func() { + t.removeEventHandlerInstance(eventHandler.Type(), ehi) + } +} + +// addEventHandler adds an event handler that will be fired the next time +// the Discord WSAPI matching eventHandler.Type() fires. +func (s *TwitchPubSub) addEventHandlerOnce(eventHandler EventHandler) func() { + s.handlersMu.Lock() + defer s.handlersMu.Unlock() + + if s.onceHandlers == nil { + s.onceHandlers = map[string][]*eventHandlerInstance{} + } + + ehi := &eventHandlerInstance{eventHandler} + s.onceHandlers[eventHandler.Type()] = append(s.onceHandlers[eventHandler.Type()], ehi) + + return func() { + s.removeEventHandlerInstance(eventHandler.Type(), ehi) + } +} + +// AddHandler allows you to add an event handler that will be fired anytime +// the Discord WSAPI event that matches the function fires. +// events.go contains all the Discord WSAPI events that can be fired. +// eg: +// Session.AddHandler(func(s *discordgo.Session, m *discordgo.MessageCreate) { +// }) +// +// or: +// Session.AddHandler(func(s *discordgo.Session, m *discordgo.PresenceUpdate) { +// }) +// The return value of this method is a function, that when called will remove the +// event handler. +func (s *TwitchPubSub) AddHandler(handler interface{}) func() { + eh := handlerForInterface(handler) + + if eh == nil { + return func() {} + } + + return s.addEventHandler(eh) +} + +// AddHandlerOnce allows you to add an event handler that will be fired the next time +// the Discord WSAPI event that matches the function fires. +// See AddHandler for more details. +func (s *TwitchPubSub) AddHandlerOnce(handler interface{}) func() { + eh := handlerForInterface(handler) + + if eh == nil { + return func() {} + } + + return s.addEventHandlerOnce(eh) +} + +// removeEventHandler instance removes an event handler instance. +func (s *TwitchPubSub) removeEventHandlerInstance(t string, ehi *eventHandlerInstance) { + s.handlersMu.Lock() + defer s.handlersMu.Unlock() + + handlers := s.handlers[t] + for i := range handlers { + if handlers[i] == ehi { + s.handlers[t] = append(handlers[:i], handlers[i+1:]...) + } + } + + onceHandlers := s.onceHandlers[t] + for i := range onceHandlers { + if onceHandlers[i] == ehi { + s.onceHandlers[t] = append(onceHandlers[:i], handlers[i+1:]...) + } + } +} + +// Handles calling permanent and once handlers for an event type. +func (s *TwitchPubSub) handle(t string, i interface{}) { + for _, eh := range s.handlers[t] { + go eh.eventHandler.Handle(s, i) + } + + if len(s.onceHandlers[t]) > 0 { + for _, eh := range s.onceHandlers[t] { + go eh.eventHandler.Handle(s, i) + } + s.onceHandlers[t] = nil + } +} + +// Handles an event type by calling internal methods, firing handlers and firing the +// interface{} event. +func (s *TwitchPubSub) handleEvent(t string, i interface{}) { + s.handlersMu.RLock() + defer s.handlersMu.RUnlock() + + // Then they are dispatched to anyone handling interface{} events. + s.handle(interfaceEventType, i) + + // Finally they are dispatched to any typed handlers. + s.handle(t, i) +} \ No newline at end of file diff --git a/src/meow.tf/twitchpubsub/glide.lock b/src/meow.tf/twitchpubsub/glide.lock new file mode 100644 index 0000000..f0a47ea --- /dev/null +++ b/src/meow.tf/twitchpubsub/glide.lock @@ -0,0 +1,8 @@ +hash: 4e49f14d80169d6af200346e2f7a618e92433b4041223a8e3e7ef3b221c0679b +updated: 2017-04-12T14:11:38.1784086-04:00 +imports: +- name: github.com/google/uuid + version: 6a5e28554805e78ea6141142aba763936c4761c0 +- name: github.com/gorilla/websocket + version: a91eba7f97777409bc2c443f5534d41dd20c5720 +testImports: [] diff --git a/src/meow.tf/twitchpubsub/glide.yaml b/src/meow.tf/twitchpubsub/glide.yaml new file mode 100644 index 0000000..e88b7c8 --- /dev/null +++ b/src/meow.tf/twitchpubsub/glide.yaml @@ -0,0 +1,4 @@ +package: meow.tf/twitchpubsub +import: +- package: github.com/google/uuid +- package: github.com/gorilla/websocket diff --git a/src/meow.tf/twitchpubsub/pubsub.go b/src/meow.tf/twitchpubsub/pubsub.go new file mode 100644 index 0000000..7edc102 --- /dev/null +++ b/src/meow.tf/twitchpubsub/pubsub.go @@ -0,0 +1,264 @@ +package twitchpubsub + +import ( + "time" + + "github.com/gorilla/websocket" + "github.com/google/uuid" + "strings" + "errors" + "encoding/json" + "sync" +) + +const ( + Ping = "PING" + Pong = "PONG" + Message = "MESSAGE" + Response = "RESPONSE" + + TwitchUrl = "wss://pubsub-edge.twitch.tv" +) + +type twitchMessage struct { + Type string `json:"type"` + Nonce string `json:"nonce,omitempty"` + Error string `json:"error,omitempty"` + Data struct { + Topic string + Message string + } `json:"data"` +} + +type twitchListen struct { + Type string `json:"type"` + Nonce string `json:"nonce"` + Data *twitchListenData `json:"data"` +} + +type twitchListenData struct { + Topics []string `json:"topics"` + AuthToken string `json:"auth_token,omitempty"` +} + +type TwitchPubSub struct { + sync.RWMutex + + wsConn *websocket.Conn + wsMutex sync.RWMutex + + // Event handlers + handlersMu sync.RWMutex + handlers map[string][]*eventHandlerInstance + onceHandlers map[string][]*eventHandlerInstance + + // Responses + responseCh chan *twitchMessage + + SubscribedTopics []string +} + +func NewTwitchPubSub() *TwitchPubSub { + t := &TwitchPubSub{ + responseCh: make(chan *twitchMessage), + } + + return t +} + +func (t *TwitchPubSub) Open() error { + t.Lock() + defer t.Unlock() + + c, _, err := websocket.DefaultDialer.Dial(TwitchUrl, nil) + + if err != nil { + return err + } + + t.wsConn = c + + go t.reader(c) + go t.pinger() + + if len(t.SubscribedTopics) > 0 { + return t.listen(t.SubscribedTopics) + } else { + return t.wsConn.WriteJSON(&twitchMessage{Type: Ping}) + } + + return nil +} + +func (t *TwitchPubSub) Close() error { + if t.wsConn != nil { + // To cleanly close a connection, a client should send a close + // frame and wait for the server to close the connection. + t.wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + + time.Sleep(1 * time.Second) + + t.wsConn.Close() + + t.wsConn = nil + } + + t.Unlock() + + return nil +} + +func (t *TwitchPubSub) reconnect() error { + wait := time.Duration(1) + + for { + err := t.Open() + + if err == nil { + return err + } + + <-time.After(wait * time.Second) + wait *= 2 + + if wait > 600 { + wait = 600 + } + } +} + +func (t *TwitchPubSub) Listen(topics []string) error { + err := t.listen(topics) + + if err != nil { + // Update topic list + t.SubscribedTopics = append(t.SubscribedTopics, topics...) + } + + return err +} + +func (t *TwitchPubSub) listen(topics []string) error { + nonce := strings.Replace(uuid.New().String(), "-", "", -1) + + t.wsMutex.Lock() + t.wsConn.WriteJSON(&twitchListen{ + Type: "LISTEN", + Nonce: nonce, + Data: &twitchListenData{Topics: topics}, + }) + t.wsMutex.Unlock() + + msg := <- t.responseCh + + if msg.Nonce != nonce { + // Something is wrong, concurrent access? + return errors.New("Unexpected nonce") + } + + if len(msg.Error) > 0 { + return errors.New(msg.Error) + } + + return nil +} + +func (t *TwitchPubSub) Unlisten(topics []string) error { + nonce := strings.Replace(uuid.New().String(), "-", "", -1) + + for i := 0; i < len(topics); i++ { + for x := 0; x < len(t.SubscribedTopics); x++ { + if t.SubscribedTopics[x] == topics[i] { + t.SubscribedTopics = append(t.SubscribedTopics[:x], t.SubscribedTopics[x+1:]...) + break + } + } + } + + t.wsMutex.Lock() + t.wsConn.WriteJSON(&twitchListen{ + Type: "UNLISTEN", + Nonce: nonce, + Data: &twitchListenData{Topics: topics}, + }) + t.wsMutex.Unlock() + + msg := <- t.responseCh + + if msg.Nonce != nonce { + // Something is wrong, concurrent access? + return errors.New("Unexpected nonce") + } + + if len(msg.Error) > 0 { + return errors.New(msg.Error) + } + + return nil +} + +func (t *TwitchPubSub) reader(wsConn *websocket.Conn) { + for { + var message twitchMessage + err := t.wsConn.ReadJSON(&message) + + if err != nil { + t.RLock() + sameConnection := t.wsConn == wsConn + t.RUnlock() + + if sameConnection { + t.Close() + t.reconnect() + } + return + } + + if message.Type == Pong { + // PONG! + } else if message.Type == Message { + var data interface{} + + if err := json.Unmarshal([]byte(message.Data.Message), &data); err != nil { + continue + } + + if strings.Index(message.Data.Topic, "video-playback") == 0 { + m := data.(map[string]interface{}) + ch := message.Data.Topic[strings.Index(message.Data.Topic, ".") + 1:] + ty := m["type"].(string) + server_time := time.Unix(int64(m["server_time"].(float64)), 0) + + if ty == "viewcount" { + go t.handle(ty, &ViewerCount{Channel: ch, Viewers: int(m["viewers"].(float64)), ServerTime: server_time}) + } else if ty == "stream-up" { + go t.handle(ty, &StreamUp{Channel: ch, ServerTime: server_time}) + } else if ty == "stream-down" { + go t.handle(ty, &StreamDown{Channel: ch, ServerTime: server_time}) + } + } + } else if message.Type == Response { + t.responseCh <- &message + } else if message.Type == reconnectEventType { + go t.handle(message.Type, &Reconnect{}) + t.reconnect() + } + } +} + +func (t *TwitchPubSub) pinger() { + ticker := time.NewTicker(150 * time.Second) + defer ticker.Stop() + + for { + <- ticker.C + + t.wsMutex.Lock() + err := t.wsConn.WriteJSON(&twitchMessage{Type: Ping}) + t.wsMutex.Unlock() + + if err != nil { + return + } + } +} \ No newline at end of file diff --git a/src/meow.tf/twitchpubsub/structs.go b/src/meow.tf/twitchpubsub/structs.go new file mode 100644 index 0000000..9978869 --- /dev/null +++ b/src/meow.tf/twitchpubsub/structs.go @@ -0,0 +1,23 @@ +package twitchpubsub + +import "time" + +type ViewerCount struct { + Channel string + Viewers int + ServerTime time.Time +} + +type StreamUp struct { + Channel string + ServerTime time.Time +} + +type StreamDown struct { + Channel string + ServerTime time.Time +} + +type Reconnect struct { + +} \ No newline at end of file