Compare commits

..

13 Commits

15 changed files with 863 additions and 131 deletions

4
.gitignore vendored
View File

@ -23,3 +23,7 @@
# End of https://www.gitignore.io/api/go # End of https://www.gitignore.io/api/go
.env .env
# IntelliJ
.idea/
*.iml

51
balancer.go Normal file
View File

@ -0,0 +1,51 @@
package gavalink
import (
"math"
"sort"
)
type balancePenalty struct {
node *Node
penalty int
}
func BestNodeByPenalties(nodes []*Node) (*Node, error) {
penalties := make([]balancePenalty, len(nodes))
var playerPenalty, cpuPenalty, deficitFramePenalty, nullFramePenalty int
for i, node := range nodes {
playerPenalty = 0
cpuPenalty = 0
deficitFramePenalty = 0
nullFramePenalty = 0
if node.stats != nil {
playerPenalty = node.stats.ActivePlayers
cpuPenalty = int(math.Pow(1.05, 100*node.stats.Cpu.SystemLoad)*10 - 10)
if node.stats.Frames != nil && node.stats.Frames.Deficit != -1 {
deficitFramePenalty = int(math.Pow(1.03, 500*float64(node.stats.Frames.Deficit/3000))*600 - 600)
nullFramePenalty = int(math.Pow(1.03, 500*float64(node.stats.Frames.Nulled/3000))*300 - 300)
nullFramePenalty *= 2
}
}
penalties[i] = balancePenalty{node, playerPenalty + cpuPenalty + deficitFramePenalty + nullFramePenalty}
}
sort.SliceStable(penalties, func(i, j int) bool {
return penalties[i].penalty < penalties[j].penalty
})
return penalties[0].node, nil
}
func BestNodeByLoad(n []*Node) (*Node, error) {
sort.SliceStable(n, func(i, j int) bool {
return n[i].stats.Cpu.LavalinkLoad < n[j].stats.Cpu.LavalinkLoad
})
return n[0], nil
}

View File

@ -4,8 +4,10 @@ import (
"bytes" "bytes"
"encoding/base64" "encoding/base64"
"encoding/binary" "encoding/binary"
"encoding/json"
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
"io" "io"
"time"
) )
const trackInfoVersioned int32 = 1 const trackInfoVersioned int32 = 1
@ -109,7 +111,7 @@ func Decode(r io.Reader) (*TrackInfo, error) {
Author: author, Author: author,
URI: url, URI: url,
Stream: stream == 1, Stream: stream == 1,
Length: int(length), Length: time.Duration(length) * time.Millisecond,
} }
return track, nil return track, nil
@ -143,3 +145,7 @@ func jsonStringValue(v *fastjson.Value, keys ... string) string {
return string(strB) return string(strB)
} }
func jsonUnmarshal(v *fastjson.Value, dst interface{}) error {
return json.Unmarshal(v.MarshalTo(nil), dst)
}

189
event.go
View File

@ -1,32 +1,183 @@
package gavalink package gavalink
// EventHandler defines events that Lavalink may send to a player // EventHandler is an interface for Lavalink events.
type EventHandler interface { type EventHandler interface {
OnTrackEnd(player *Player, track string, reason string) error // Type returns the type of event this handler belongs to.
OnTrackException(player *Player, track string, reason string) error Type() string
OnTrackStuck(player *Player, track string, threshold int) error
OnVoiceProcessed(player *Player, data *VoiceProcessingData, hotword, override bool) error // Handle is called whenever an event of Type() happens.
// It is the receivers responsibility to type assert that the interface
// is the expected struct.
Handle(*Player, interface{})
} }
// DummyEventHandler provides an empty event handler for users who // EventInterfaceProvider is an interface for providing empty interfaces for
// wish to drop events outright. This is not recommended. // Lavalink events.
type DummyEventHandler struct{} type EventInterfaceProvider interface {
// Type is the type of event this handler belongs to.
Type() string
// OnTrackEnd is raised when a track ends // New returns a new instance of the struct this event handler handles.
func (d DummyEventHandler) OnTrackEnd(player *Player, track string, reason string) error { // This is called once per event.
return nil // The struct is provided to all handlers of the same Type().
New() interface{}
} }
// OnTrackException is raised when a track throws an exception // interfaceEventType is the event handler type for interface{} events.
func (d DummyEventHandler) OnTrackException(player *Player, track string, reason string) error { const interfaceEventType = "__INTERFACE__"
return nil
// interfaceEventHandler is an event handler for interface{} events.
type interfaceEventHandler func(*Player, interface{})
// Type returns the event type for interface{} events.
func (eh interfaceEventHandler) Type() string {
return interfaceEventType
} }
// OnTrackStuck is raised when a track gets stuck // Handle is the handler for an interface{} event.
func (d DummyEventHandler) OnTrackStuck(player *Player, track string, threshold int) error { func (eh interfaceEventHandler) Handle(p *Player, i interface{}) {
return nil eh(p, i)
} }
func (d DummyEventHandler) OnVoiceProcessed(player *Player, data *VoiceProcessingData, hotword, override bool) error { var registeredInterfaceProviders = map[string]EventInterfaceProvider{}
return nil
// registerInterfaceProvider registers a provider so that Gavalink can
// access it's New() method.
func registerInterfaceProvider(eh EventInterfaceProvider) {
if _, ok := registeredInterfaceProviders[eh.Type()]; ok {
return
// XXX:
// if we should error here, we need to do something with it.
// fmt.Errorf("event %s already registered", eh.Type())
}
registeredInterfaceProviders[eh.Type()] = eh
return
}
// eventHandlerInstance is a wrapper around an event handler, as functions
// cannot be compared directly.
type eventHandlerInstance struct {
eventHandler EventHandler
}
// addEventHandler adds an event handler that will be fired anytime
// the Lavalink event matching eventHandler.Type() fires.
func (l *Lavalink) addEventHandler(eventHandler EventHandler) func() {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()
if l.handlers == nil {
l.handlers = map[string][]*eventHandlerInstance{}
}
ehi := &eventHandlerInstance{eventHandler}
l.handlers[eventHandler.Type()] = append(l.handlers[eventHandler.Type()], ehi)
return func() {
l.removeEventHandlerInstance(eventHandler.Type(), ehi)
}
}
// addEventHandler adds an event handler that will be fired the next time
// the Lavalink event matching eventHandler.Type() fires.
func (l *Lavalink) addEventHandlerOnce(eventHandler EventHandler) func() {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()
if l.onceHandlers == nil {
l.onceHandlers = map[string][]*eventHandlerInstance{}
}
ehi := &eventHandlerInstance{eventHandler}
l.onceHandlers[eventHandler.Type()] = append(l.onceHandlers[eventHandler.Type()], ehi)
return func() {
l.removeEventHandlerInstance(eventHandler.Type(), ehi)
}
}
// AddHandler allows you to add an event handler that will be fired anytime
// the Lavalink event that matches the function fires.
// The first parameter is a *Session, and the second parameter is a pointer
// to a struct corresponding to the event for which you want to listen.
//
// eg:
// Player.AddHandler(func(s *gavalink.Player, m *gavalink.TrackStart) {
// })
//
// or:
// Player.AddHandler(func(s *gavalink.Player, m *gavalink.TrackEnd) {
// })
//
//
// The return value of this method is a function, that when called will remove the
// event handler.
func (l *Lavalink) AddHandler(handler interface{}) func() {
eh := handlerForInterface(handler)
if eh == nil {
return func() {}
}
return l.addEventHandler(eh)
}
// AddHandlerOnce allows you to add an event handler that will be fired the next time
// the Lavalink event that matches the function fires.
// See AddHandler for more details.
func (l *Lavalink) AddHandlerOnce(handler interface{}) func() {
eh := handlerForInterface(handler)
if eh == nil {
return func() {}
}
return l.addEventHandlerOnce(eh)
}
// removeEventHandler instance removes an event handler instance.
func (l *Lavalink) removeEventHandlerInstance(t string, ehi *eventHandlerInstance) {
l.handlersMu.Lock()
defer l.handlersMu.Unlock()
handlers := l.handlers[t]
for i := range handlers {
if handlers[i] == ehi {
l.handlers[t] = append(handlers[:i], handlers[i+1:]...)
}
}
onceHandlers := l.onceHandlers[t]
for i := range onceHandlers {
if onceHandlers[i] == ehi {
l.onceHandlers[t] = append(onceHandlers[:i], handlers[i+1:]...)
}
}
}
// Handles calling permanent and once handlers for an event type.
func (l *Lavalink) handle(p *Player, t string, i interface{}) {
for _, eh := range l.handlers[t] {
go eh.eventHandler.Handle(p, i)
}
if len(l.onceHandlers[t]) > 0 {
for _, eh := range l.onceHandlers[t] {
go eh.eventHandler.Handle(p, i)
}
l.onceHandlers[t] = nil
}
}
// Handles an event type by calling internal methods, firing handlers and firing the
// interface{} event.
func (l *Lavalink) handleEvent(p *Player, t string, i interface{}) {
l.handlersMu.RLock()
defer l.handlersMu.RUnlock()
// Then they are dispatched to anyone handling interface{} events.
l.handle(p, interfaceEventType, i)
// Finally they are dispatched to any typed handlers.
l.handle(p, t, i)
} }

165
eventhandlers.go Normal file
View File

@ -0,0 +1,165 @@
// Code generated by "eventhandlers"; DO NOT EDIT
// See events.go
package gavalink
// Following are all the event types.
// Event type values are used to match the events returned by Lavalink.
const (
trackEndEventType = "TrackEndEvent"
trackExceptionEventType = "TrackExceptionEvent"
trackStartEventType = "TrackStartEvent"
trackStuckEventType = "TrackStuckEvent"
voiceProcessedEventType = "VoiceProcessedEvent"
webSocketClosedEventType = "WebSocketClosedEvent"
)
// trackEndEventHandler is an event handler for TrackEnd events.
type trackEndEventHandler func(*Player, *TrackEnd)
// Type returns the event type for TrackEnd events.
func (eh trackEndEventHandler) Type() string {
return trackEndEventType
}
// New returns a new instance of TrackEnd.
func (eh trackEndEventHandler) New() interface{} {
return &TrackEnd{}
}
// Handle is the handler for TrackEnd events.
func (eh trackEndEventHandler) Handle(p *Player, i interface{}) {
if t, ok := i.(*TrackEnd); ok {
eh(p, t)
}
}
// trackExceptionEventHandler is an event handler for TrackException events.
type trackExceptionEventHandler func(*Player, *TrackException)
// Type returns the event type for TrackException events.
func (eh trackExceptionEventHandler) Type() string {
return trackExceptionEventType
}
// New returns a new instance of TrackException.
func (eh trackExceptionEventHandler) New() interface{} {
return &TrackException{}
}
// Handle is the handler for TrackException events.
func (eh trackExceptionEventHandler) Handle(p *Player, i interface{}) {
if t, ok := i.(*TrackException); ok {
eh(p, t)
}
}
// trackStartEventHandler is an event handler for TrackStart events.
type trackStartEventHandler func(*Player, *TrackStart)
// Type returns the event type for TrackStart events.
func (eh trackStartEventHandler) Type() string {
return trackStartEventType
}
// New returns a new instance of TrackStart.
func (eh trackStartEventHandler) New() interface{} {
return &TrackStart{}
}
// Handle is the handler for TrackStart events.
func (eh trackStartEventHandler) Handle(p *Player, i interface{}) {
if t, ok := i.(*TrackStart); ok {
eh(p, t)
}
}
// trackStuckEventHandler is an event handler for TrackStuck events.
type trackStuckEventHandler func(*Player, *TrackStuck)
// Type returns the event type for TrackStuck events.
func (eh trackStuckEventHandler) Type() string {
return trackStuckEventType
}
// New returns a new instance of TrackStuck.
func (eh trackStuckEventHandler) New() interface{} {
return &TrackStuck{}
}
// Handle is the handler for TrackStuck events.
func (eh trackStuckEventHandler) Handle(p *Player, i interface{}) {
if t, ok := i.(*TrackStuck); ok {
eh(p, t)
}
}
// voiceProcessedEventHandler is an event handler for VoiceProcessed events.
type voiceProcessedEventHandler func(*Player, *VoiceProcessed)
// Type returns the event type for VoiceProcessed events.
func (eh voiceProcessedEventHandler) Type() string {
return voiceProcessedEventType
}
// New returns a new instance of VoiceProcessed.
func (eh voiceProcessedEventHandler) New() interface{} {
return &VoiceProcessed{}
}
// Handle is the handler for VoiceProcessed events.
func (eh voiceProcessedEventHandler) Handle(p *Player, i interface{}) {
if t, ok := i.(*VoiceProcessed); ok {
eh(p, t)
}
}
// webSocketClosedEventHandler is an event handler for WebSocketClosed events.
type webSocketClosedEventHandler func(*Player, *WebSocketClosed)
// Type returns the event type for WebSocketClosed events.
func (eh webSocketClosedEventHandler) Type() string {
return webSocketClosedEventType
}
// New returns a new instance of WebSocketClosed.
func (eh webSocketClosedEventHandler) New() interface{} {
return &WebSocketClosed{}
}
// Handle is the handler for WebSocketClosed events.
func (eh webSocketClosedEventHandler) Handle(p *Player, i interface{}) {
if t, ok := i.(*WebSocketClosed); ok {
eh(p, t)
}
}
func handlerForInterface(handler interface{}) EventHandler {
switch v := handler.(type) {
case func(*Player, interface{}):
return interfaceEventHandler(v)
case func(*Player, *TrackEnd):
return trackEndEventHandler(v)
case func(*Player, *TrackException):
return trackExceptionEventHandler(v)
case func(*Player, *TrackStart):
return trackStartEventHandler(v)
case func(*Player, *TrackStuck):
return trackStuckEventHandler(v)
case func(*Player, *VoiceProcessed):
return voiceProcessedEventHandler(v)
case func(*Player, *WebSocketClosed):
return webSocketClosedEventHandler(v)
}
return nil
}
func init() {
registerInterfaceProvider(trackEndEventHandler(nil))
registerInterfaceProvider(trackExceptionEventHandler(nil))
registerInterfaceProvider(trackStartEventHandler(nil))
registerInterfaceProvider(trackStuckEventHandler(nil))
registerInterfaceProvider(voiceProcessedEventHandler(nil))
registerInterfaceProvider(webSocketClosedEventHandler(nil))
}

41
events.go Normal file
View File

@ -0,0 +1,41 @@
package gavalink
import "time"
// Event for when a track starts playing
type TrackStart struct {
Track string
}
// Event for when a track ends.
type TrackEnd struct {
Track string
Reason string
}
// Event for when a track encounters an error in playback.
type TrackException struct {
Track string
Error string
Exception Exception
}
// Event when a track gets stuck
type TrackStuck struct {
Track string
Threshold time.Duration
}
// Event for when voice is processed and sent back to the client.
type VoiceProcessed struct {
Data *VoiceProcessingData
Hotword bool
Override bool
}
// Event fired when the websocket is closed.
type WebSocketClosed struct {
Code int
Reason string
ByRemote bool
}

View File

@ -10,7 +10,7 @@ import (
"syscall" "syscall"
"github.com/bwmarrin/discordgo" "github.com/bwmarrin/discordgo"
"github.com/foxbot/gavalink" "meow.tf/astra/gavalink"
) )
var token string var token string
@ -55,7 +55,7 @@ func ready(s *discordgo.Session, event *discordgo.Ready) {
log.Println("discordgo ready!") log.Println("discordgo ready!")
s.UpdateStatus(0, "gavalink") s.UpdateStatus(0, "gavalink")
lavalink = gavalink.NewLavalink("1", event.User.ID) lavalink = gavalink.NewLavalink(1, event.User.ID)
err := lavalink.AddNodes(gavalink.NodeConfig{ err := lavalink.AddNodes(gavalink.NodeConfig{
REST: "http://localhost:2333", REST: "http://localhost:2333",
@ -162,8 +162,7 @@ func voiceServerUpdate(s *discordgo.Session, event *discordgo.VoiceServerUpdate)
return return
} }
handler := new(gavalink.DummyEventHandler) player, err = node.CreatePlayer(event.GuildID, s.State.SessionID, vsu)
player, err = node.CreatePlayer(event.GuildID, s.State.SessionID, vsu, handler)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return

2
go.mod
View File

@ -1,9 +1,11 @@
module meow.tf/astra/gavalink module meow.tf/astra/gavalink
require ( require (
github.com/bwmarrin/discordgo v0.20.3
github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300 github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/valyala/fastjson v1.4.1 github.com/valyala/fastjson v1.4.1
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
) )
go 1.13 go 1.13

6
go.sum
View File

@ -1,6 +1,12 @@
github.com/bwmarrin/discordgo v0.20.3 h1:AxjcHGbyBFSC0a3Zx5nDQwbOjU7xai5dXjRnZ0YB7nU=
github.com/bwmarrin/discordgo v0.20.3/go.mod h1:O9S4p+ofTFwB02em7jkpkV8M3R0/PUVOwN61zSZ0r4Q=
github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300 h1:/p4AzwhPqvi9V0Ktsd+4jpkTpkLbrns2AyCUrylJFDo= github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300 h1:/p4AzwhPqvi9V0Ktsd+4jpkTpkLbrns2AyCUrylJFDo=
github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300/go.mod h1:MDlIQ50PLaU0fUW0JcHFOxec8Q17F9byrnGi8ok5vVQ= github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300/go.mod h1:MDlIQ50PLaU0fUW0JcHFOxec8Q17F9byrnGi8ok5vVQ=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/valyala/fastjson v1.4.1 h1:hrltpHpIpkaxll8QltMU8c3QZ5+qIiCL8yKqPFJI/yE= github.com/valyala/fastjson v1.4.1 h1:hrltpHpIpkaxll8QltMU8c3QZ5+qIiCL8yKqPFJI/yE=
github.com/valyala/fastjson v1.4.1/go.mod h1:nV6MsjxL2IMJQUoHDIrjEI7oLyeqK6aBD7EFWPsvP8o= github.com/valyala/fastjson v1.4.1/go.mod h1:nV6MsjxL2IMJQUoHDIrjEI7oLyeqK6aBD7EFWPsvP8o=
golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16 h1:y6ce7gCWtnH+m3dCjzQ1PCuwl28DDIc3VNnvY29DlIA=
golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

View File

@ -1,11 +1,14 @@
package gavalink package gavalink
import ( import (
"context"
"errors" "errors"
"golang.org/x/sync/errgroup"
"log" "log"
"net/http" "net/http"
"os" "os"
"sort" "runtime"
"sync"
"time" "time"
) )
@ -18,11 +21,20 @@ func init() {
// Lavalink manages a connection to Lavalink Nodes // Lavalink manages a connection to Lavalink Nodes
type Lavalink struct { type Lavalink struct {
shards string shards int
userID string userID string
nodes []*Node nodes []*Node
players map[string]*Player players map[string]*Player
playersMu sync.RWMutex
// Event handlers
handlersMu sync.RWMutex
handlers map[string][]*eventHandlerInstance
onceHandlers map[string][]*eventHandlerInstance
capabilities map[string]interface{}
BestNodeFunc func([]*Node) (*Node, error) BestNodeFunc func([]*Node) (*Node, error)
} }
@ -34,53 +46,56 @@ var (
errVolumeOutOfRange = errors.New("Volume is out of range, must be within [0, 1000]") errVolumeOutOfRange = errors.New("Volume is out of range, must be within [0, 1000]")
errInvalidVersion = errors.New("This library requires Lavalink >= 3") errInvalidVersion = errors.New("This library requires Lavalink >= 3")
errUnknownPayload = errors.New("Lavalink sent an unknown payload") errUnknownPayload = errors.New("Lavalink sent an unknown payload")
errNilHandler = errors.New("You must provide an event handler. Use gavalink.DummyEventHandler if you wish to ignore events")
) )
// NewLavalink creates a new Lavalink manager // NewLavalink creates a new Lavalink manager
func NewLavalink(shards string, userID string) *Lavalink { func NewLavalink(shards int, userID string) *Lavalink {
return &Lavalink{ return &Lavalink{
shards: shards, shards: shards,
userID: userID, userID: userID,
players: make(map[string]*Player), players: make(map[string]*Player),
BestNodeFunc: BestNodeByLoad, BestNodeFunc: BestNodeByPenalties,
} }
} }
// AddNodes adds a node to the Lavalink manager // AddNodes adds a node to the Lavalink manager
func (lavalink *Lavalink) AddNodes(nodeConfigs ...NodeConfig) error { // This function calls all of the node connect methods at once.
nodes := make([]*Node, len(nodeConfigs)) // TODO perhaps add a pool/max at a time limit?
func (l *Lavalink) AddNodes(nodeConfigs ...NodeConfig) error {
client := &http.Client{ client := &http.Client{
Timeout: 60 * time.Second, Timeout: 60 * time.Second,
} }
for i, c := range nodeConfigs { eg, ctx := errgroup.WithContext(context.Background())
for _, c := range nodeConfigs {
eg.Go(func() error {
n := &Node{ n := &Node{
config: c, config: c,
manager: lavalink, manager: l,
client: client, client: client,
} }
err := n.open() err := n.open(ctx)
if err != nil { if err != nil {
return err return err
} }
nodes[i] = n l.nodes = append(l.nodes, n)
}
lavalink.nodes = append(lavalink.nodes, nodes...)
return nil return nil
})
}
return eg.Wait()
} }
// RemoveNode removes a node from the manager // RemoveNode removes a node from the manager
func (lavalink *Lavalink) removeNode(node *Node) error { func (l *Lavalink) removeNode(node *Node) error {
idx := -1 idx := -1
for i, n := range lavalink.nodes { for i, n := range l.nodes {
if n == node { if n == node {
idx = i idx = i
break break
@ -92,41 +107,57 @@ func (lavalink *Lavalink) removeNode(node *Node) error {
node.stop() node.stop()
for _, player := range lavalink.players { danglingPlayers := make([]*Player, 0)
if player.node == node {
n, err := lavalink.BestNode()
if err != nil { l.playersMu.RLock()
for _, player := range l.players {
if player.node == node {
player.node = nil
n, err := l.BestNode()
if err != nil || n == nil {
danglingPlayers = append(danglingPlayers, player)
continue continue
} }
player.ChangeNode(n) player.ChangeNode(n)
} }
} }
l.playersMu.RUnlock()
if len(danglingPlayers) > 0 {
for _, player := range danglingPlayers {
player.Destroy()
}
}
// temp var for easier reading // temp var for easier reading
n := lavalink.nodes n := l.nodes
z := len(n) - 1 z := len(n) - 1
n[idx] = n[z] // swap idx with last n[idx] = n[z] // swap idx with last
n = n[:z] n = n[:z]
lavalink.nodes = n l.nodes = n
return nil return nil
} }
// BestNode returns the Node with the lowest latency // BestNode returns the Node with the lowest latency
func (lavalink *Lavalink) BestNode() (*Node, error) { func (l *Lavalink) BestNode() (*Node, error) {
if len(lavalink.nodes) < 1 { if len(l.nodes) < 1 {
return nil, errNoNodes return nil, errNoNodes
} }
return lavalink.BestNodeFunc(lavalink.nodes) return l.BestNodeFunc(l.nodes)
} }
// GetPlayer gets a player for a guild // GetPlayer gets a player for a guild
func (lavalink *Lavalink) GetPlayer(guild string) (*Player, error) { func (l *Lavalink) GetPlayer(guild string) (*Player, error) {
p, ok := lavalink.players[guild] l.playersMu.RLock()
defer l.playersMu.RUnlock()
p, ok := l.players[guild]
if !ok { if !ok {
return nil, errPlayerNotFound return nil, errPlayerNotFound
@ -135,10 +166,15 @@ func (lavalink *Lavalink) GetPlayer(guild string) (*Player, error) {
return p, nil return p, nil
} }
func BestNodeByLoad(n []*Node) (*Node, error) { // Add capabilities mappings to the client, letting the server know what we support
sort.SliceStable(n, func(i, j int) bool { func (l *Lavalink) AddCapability(key string, i interface{}) {
return n[i].load < n[j].load if l.capabilities == nil {
}) l.capabilities = make(map[string]interface{})
}
return n[0], nil
l.capabilities[key] = i
}
func gavalinkUserAgent() string {
return "Gavalink (v1.0, " + runtime.Version() + ")"
} }

View File

@ -1,9 +1,11 @@
package gavalink package gavalink
import ( import (
"encoding/json"
"io" "io"
"net/http" "net/http"
"os" "os"
"time"
) )
const ( const (
@ -27,7 +29,7 @@ type Tracks struct {
// NoMatches, or LoadFailed // NoMatches, or LoadFailed
Type string `json:"loadType"` Type string `json:"loadType"`
PlaylistInfo *PlaylistInfo `json:"playlistInfo"` PlaylistInfo *PlaylistInfo `json:"playlistInfo"`
Tracks []Track `json:"tracks"` Tracks []*Track `json:"tracks"`
} }
// PlaylistInfo contains information about a loaded playlist // PlaylistInfo contains information about a loaded playlist
@ -43,7 +45,7 @@ type PlaylistInfo struct {
type Track struct { type Track struct {
// Data contains the base64 encoded Lavaplayer track // Data contains the base64 encoded Lavaplayer track
Data string `json:"track"` Data string `json:"track"`
Info TrackInfo `json:"info"` Info *TrackInfo `json:"info"`
} }
// TrackInfo contains more data about a loaded track // TrackInfo contains more data about a loaded track
@ -54,13 +56,38 @@ type TrackInfo struct {
URI string `json:"uri"` URI string `json:"uri"`
Seekable bool `json:"isSeekable"` Seekable bool `json:"isSeekable"`
Stream bool `json:"isStream"` Stream bool `json:"isStream"`
Length int `json:"length"` Length time.Duration `json:"length"`
Position int `json:"position"` Position int `json:"position"`
} }
func (t *TrackInfo) MarshalJSON() ([]byte, error) {
type Alias TrackInfo
return json.Marshal(&struct {
Length int64 `json:"length"`
*Alias
}{
Length: int64(t.Length / time.Millisecond),
Alias: (*Alias)(t),
})
}
func (t *TrackInfo) UnmarshalJSON(data []byte) error {
type Alias TrackInfo
aux := &struct {
Length int64 `json:"length"`
*Alias
}{
Alias: (*Alias)(t),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
t.Length = time.Duration(aux.Length) * time.Millisecond
return nil
}
const ( const (
opVoiceUpdate = "voiceUpdate" opVoiceUpdate = "voiceUpdate"
opVoiceProcessed = "voiceProcessed"
opUserJoin = "userJoin" opUserJoin = "userJoin"
opUserLeave = "userLeave" opUserLeave = "userLeave"
opUserListen = "userListen" opUserListen = "userListen"
@ -77,8 +104,16 @@ const (
eventTrackEnd = "TrackEndEvent" eventTrackEnd = "TrackEndEvent"
eventTrackException = "TrackExceptionEvent" eventTrackException = "TrackExceptionEvent"
eventTrackStuck = "TrackStuckEvent" eventTrackStuck = "TrackStuckEvent"
eventVoiceProcessed = "VoiceProcessedEvent"
) )
// Exception is a message from the Lavalink server
type Exception struct {
Message string
Severity string
Cause string
}
// VoiceServerUpdate is a raw Discord VOICE_SERVER_UPDATE event // VoiceServerUpdate is a raw Discord VOICE_SERVER_UPDATE event
type VoiceServerUpdate struct { type VoiceServerUpdate struct {
GuildID string `json:"guild_id"` GuildID string `json:"guild_id"`
@ -90,7 +125,9 @@ type VoiceServerUpdate struct {
type VoiceProcessingData struct { type VoiceProcessingData struct {
io.ReadCloser io.ReadCloser
Client *http.Client node *Node
UserID string
URL string URL string
File string File string
@ -98,7 +135,11 @@ type VoiceProcessingData struct {
} }
func (v *VoiceProcessingData) open() error { func (v *VoiceProcessingData) open() error {
res, err := v.Client.Get(v.URL) req, err := http.NewRequest(http.MethodGet, v.URL, nil)
req.Header.Set("Authorization", v.node.config.Password)
res, err := v.node.client.Do(req)
if err != nil { if err != nil {
return err return err

33
model_test.go Normal file
View File

@ -0,0 +1,33 @@
package gavalink
import (
"encoding/json"
"testing"
"time"
)
func TestTrackInfo_JSON(t *testing.T) {
i := &TrackInfo{
Length: 10 * time.Second,
}
b, err := json.Marshal(i)
if err != nil {
t.Fatal(err)
}
t.Log(string(b))
deserialize := &TrackInfo{}
if err = json.Unmarshal(b, &deserialize); err != nil {
t.Fatal(err)
}
t.Log("Deserialized length:", deserialize.Length)
if deserialize.Length != time.Second*10 {
t.Fatal("Expected deserialized time to be 10 seconds!")
}
}

152
node.go
View File

@ -1,27 +1,36 @@
package gavalink package gavalink
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
"net/http" "net/http"
"net/url"
"strconv" "strconv"
"strings"
"time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
// NodeConfig configures a Lavalink Node // NodeConfig configures a Lavalink Node
type NodeConfig struct { type NodeConfig struct {
// Node identifier (uuid, hostname, etc)
Identifier string
// REST is the host where Lavalink's REST server runs // REST is the host where Lavalink's REST server runs
// //
// This value is expected without a trailing slash, e.g. like // This value is expected without a trailing slash, e.g. like
// `http://localhost:2333` // `http://localhost:2333`
REST string REST string
// WebSocket is the host where Lavalink's WebSocket server runs // WebSocket is the host where Lavalink's WebSocket server runs
// //
// This value is expected without a trailing slash, e.g. like // This value is expected without a trailing slash, e.g. like
// `http://localhost:8012` // `http://localhost:8012`
WebSocket string WebSocket string
// Password is the expected Authorization header for the Node // Password is the expected Authorization header for the Node
Password string Password string
} }
@ -29,27 +38,77 @@ type NodeConfig struct {
// Node wraps a Lavalink Node // Node wraps a Lavalink Node
type Node struct { type Node struct {
config NodeConfig config NodeConfig
load float32 stats *RemoteStats
manager *Lavalink manager *Lavalink
wsConn *websocket.Conn wsConn *websocket.Conn
client *http.Client client *http.Client
} }
func (node *Node) open() error { type RemoteStats struct {
Op string `json:"op"`
Players int `json:"players"`
ActivePlayers int `json:"playingPlayers"`
Uptime int64 `json:"uptime"`
Memory *MemoryStats `json:"memory"`
Cpu *CpuStats `json:"cpu"`
Frames *FrameStats `json:"frameStats"`
}
type MemoryStats struct {
Free uint64 `json:"free"`
Used uint64 `json:"used"`
Allocated uint64 `json:"allocated"`
Reserveable uint64 `json:"reserveable"`
}
type CpuStats struct {
Cores int `json:"cores"`
SystemLoad float64 `json:"systemLoad"`
LavalinkLoad float64 `json:"lavalinkLoad"`
}
type FrameStats struct {
Sent int `json:"sent"`
Nulled int `json:"nulled"`
Deficit int `json:"deficit"`
}
// Opens the connection to the Lavalink server
func (node *Node) open(ctx context.Context) error {
header := http.Header{} header := http.Header{}
header.Set("Authorization", node.config.Password) header.Set("Authorization", node.config.Password)
header.Set("Num-Shards", node.manager.shards) header.Set("Num-Shards", strconv.Itoa(node.manager.shards))
header.Set("User-Id", node.manager.userID) header.Set("User-Id", node.manager.userID)
header.Set("Client-Name", "Gavalink")
header.Set("User-Agent", gavalinkUserAgent())
ws, resp, err := websocket.DefaultDialer.Dial(node.config.WebSocket, header) if node.manager.capabilities != nil {
v := make([]string, 0)
for k, vals := range node.manager.capabilities {
b, err := json.Marshal(vals)
if err != nil {
continue
}
v = append(v, k+"="+string(b))
}
header.Set("Capabilities", strings.Join(v, ";"))
}
ws, resp, err := websocket.DefaultDialer.DialContext(ctx, node.config.WebSocket, header)
if err != nil { if err != nil {
return err return err
} }
// TODO: This isn't officially required, so we ignore it for now if it's empty.
vstr := resp.Header.Get("Lavalink-Major-Version") vstr := resp.Header.Get("Lavalink-Major-Version")
if vstr != "" {
v, err := strconv.Atoi(vstr) v, err := strconv.Atoi(vstr)
if err != nil { if err != nil {
@ -59,6 +118,7 @@ func (node *Node) open() error {
if v < 3 { if v < 3 {
return errInvalidVersion return errInvalidVersion
} }
}
node.wsConn = ws node.wsConn = ws
go node.listen() go node.listen()
@ -86,7 +146,7 @@ func (node *Node) listen() {
if err != nil { if err != nil {
Log.Println(err) Log.Println(err)
// try to reconnect // try to reconnect
oerr := node.open() oerr := node.open(context.Background())
if oerr != nil { if oerr != nil {
Log.Println("node", node.config.WebSocket, "failed and could not reconnect, destroying.", err, oerr) Log.Println("node", node.config.WebSocket, "failed and could not reconnect, destroying.", err, oerr)
@ -108,16 +168,26 @@ func (node *Node) listen() {
continue continue
} }
node.onEvent(v) node.onEvent(v, msg)
} }
} }
func (node *Node) onEvent(v *fastjson.Value) error { // Handle an event from the node
func (node *Node) onEvent(v *fastjson.Value, msg []byte) error {
op := jsonStringValue(v, "op") op := jsonStringValue(v, "op")
switch op { switch op {
case opStats:
node.stats = &RemoteStats{}
err := json.Unmarshal(msg, &node.stats)
if err != nil {
return err
}
case opPlayerUpdate: case opPlayerUpdate:
player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId")) player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId"))
if err != nil { if err != nil {
return err return err
} }
@ -133,33 +203,53 @@ func (node *Node) onEvent(v *fastjson.Value) error {
track := jsonStringValue(v, "track") track := jsonStringValue(v, "track")
switch jsonStringValue(v, "type") { switch jsonStringValue(v, "type") {
case eventTrackStart:
player.track = track
node.manager.handle(player, eventTrackStart, &TrackStart{
Track: track,
})
case eventTrackEnd: case eventTrackEnd:
player.track = "" player.track = ""
err = player.handler.OnTrackEnd(player, track, jsonStringValue(v, "reason"))
node.manager.handle(player, eventTrackEnd, &TrackEnd{
Track: track,
Reason: jsonStringValue(v, "reason"),
})
case eventTrackException: case eventTrackException:
err = player.handler.OnTrackException(player, track, jsonStringValue(v, "reason")) ex := &TrackException{
Track: track,
Error: jsonStringValue(v, "error"),
}
if obj := v.Get("exception"); obj != nil {
var exception Exception
jsonUnmarshal(obj, &exception)
ex.Exception = exception
}
node.manager.handle(player, eventTrackException, ex)
case eventTrackStuck: case eventTrackStuck:
err = player.handler.OnTrackStuck(player, track, v.GetInt("thresholdMs")) node.manager.handle(player, eventTrackStuck, &TrackStuck{
} Track: track,
Threshold: time.Duration(v.GetInt("thresholdMs")) * time.Millisecond,
return err })
case opVoiceProcessed: case eventVoiceProcessed:
player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId"))
if err != nil {
return err
}
track := jsonStringValue(v, "track")
data := &VoiceProcessingData{ data := &VoiceProcessingData{
node: node,
UserID: jsonStringValue(v, "userId"),
URL: fmt.Sprintf("%s/audio/%s", node.config.REST, track), URL: fmt.Sprintf("%s/audio/%s", node.config.REST, track),
File: track, File: track,
} }
return player.handler.OnVoiceProcessed(player, data, v.GetBool("hotword"), v.GetBool("override")) node.manager.handle(player, eventVoiceProcessed, &VoiceProcessed{
case opStats: Data: data,
node.load = float32(v.GetFloat64("cpu", "lavalinkLoad")) Hotword: v.GetBool("hotword"),
Override: v.GetBool("override"),
})
}
return nil
default: default:
return errUnknownPayload return errUnknownPayload
} }
@ -168,7 +258,7 @@ func (node *Node) onEvent(v *fastjson.Value) error {
} }
// CreatePlayer creates an audio player on this node // CreatePlayer creates an audio player on this node
func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServerUpdate, handler EventHandler) (*Player, error) { func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServerUpdate) (*Player, error) {
msg := voiceUpdateMessage{ msg := voiceUpdateMessage{
Op: opVoiceUpdate, Op: opVoiceUpdate,
GuildID: guildID, GuildID: guildID,
@ -187,11 +277,12 @@ func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServ
sessionID: sessionID, sessionID: sessionID,
manager: node.manager, manager: node.manager,
node: node, node: node,
handler: handler,
vol: 100, vol: 100,
lastVoiceServerUpdate: event, lastVoiceServerUpdate: event,
} }
node.manager.playersMu.Lock()
defer node.manager.playersMu.Unlock()
node.manager.players[guildID] = player node.manager.players[guildID] = player
return player, nil return player, nil
@ -206,14 +297,16 @@ func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServ
// //
// See the Lavaplayer Source Code for all valid options. // See the Lavaplayer Source Code for all valid options.
func (node *Node) LoadTracks(query string) (*Tracks, error) { func (node *Node) LoadTracks(query string) (*Tracks, error) {
url := fmt.Sprintf("%s/loadtracks?identifier=%s", node.config.REST, query) v := url.Values{}
v.Set("identifier", query)
req, err := http.NewRequest(http.MethodGet, url, nil) req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/loadtracks?%s", node.config.REST, v.Encode()), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
req.Header.Set("User-Agent", gavalinkUserAgent())
req.Header.Set("Authorization", node.config.Password) req.Header.Set("Authorization", node.config.Password)
resp, err := node.client.Do(req) resp, err := node.client.Do(req)
@ -231,6 +324,7 @@ func (node *Node) LoadTracks(query string) (*Tracks, error) {
return tracks, nil return tracks, nil
} }
// Write a JSON message via the node's websocket connection
func (node *Node) writeMessage(v interface{}) error { func (node *Node) writeMessage(v interface{}) error {
return node.wsConn.WriteJSON(v) return node.wsConn.WriteJSON(v)
} }

View File

@ -15,7 +15,6 @@ type Player struct {
track string track string
manager *Lavalink manager *Lavalink
node *Node node *Node
handler EventHandler
lastVoiceServerUpdate VoiceServerUpdate lastVoiceServerUpdate VoiceServerUpdate
} }
@ -185,24 +184,27 @@ func (player *Player) Forward(sessionID string, event VoiceServerUpdate) error {
func (player *Player) ChangeNode(node *Node) error { func (player *Player) ChangeNode(node *Node) error {
player.node = node player.node = node
player.Forward(player.sessionID, player.lastVoiceServerUpdate) if err := player.Forward(player.sessionID, player.lastVoiceServerUpdate); err != nil {
return err
}
return player.PlayAt(player.track, player.position, 0) return player.PlayAt(player.track, player.position, 0)
} }
// Destroy will destroy this player // Destroy will destroy this player
func (player *Player) Destroy() error { func (player *Player) Destroy() error {
if player.node != nil && player.node.wsConn != nil {
msg := basicMessage{ msg := basicMessage{
Op: opDestroy, Op: opDestroy,
GuildID: player.guildID, GuildID: player.guildID,
} }
err := player.node.wsConn.WriteJSON(msg) // We don't actually care if this goes through, since the node/connection may be invalid anyway.
player.node.wsConn.WriteJSON(msg)
if err != nil {
return err
} }
player.manager.playersMu.Lock()
defer player.manager.playersMu.Unlock()
delete(player.manager.players, player.guildID) delete(player.manager.players, player.guildID)
return nil return nil
} }

101
tools/cmd/eventhandlers.go Normal file
View File

@ -0,0 +1,101 @@
package main
import (
"bytes"
"go/format"
"go/parser"
"go/token"
"io/ioutil"
"log"
"path/filepath"
"sort"
"strings"
"text/template"
)
var eventHandlerTmpl = template.Must(template.New("eventHandler").Funcs(template.FuncMap{
"constName": constName,
"privateName": privateName,
}).Parse(`// Code generated by "eventhandlers"; DO NOT EDIT
// See events.go
package gavalink
// Following are all the event types.
// Event type values are used to match the events returned by Lavalink.
const ({{range .}}
{{privateName .}}EventType = "{{constName .}}"{{end}}
)
{{range .}}
// {{privateName .}}EventHandler is an event handler for {{.}} events.
type {{privateName .}}EventHandler func(*Player, *{{.}})
// Type returns the event type for {{.}} events.
func (eh {{privateName .}}EventHandler) Type() string {
return {{privateName .}}EventType
}
// New returns a new instance of {{.}}.
func (eh {{privateName .}}EventHandler) New() interface{} {
return &{{.}}{}
}
// Handle is the handler for {{.}} events.
func (eh {{privateName .}}EventHandler) Handle(p *Player, i interface{}) {
if t, ok := i.(*{{.}}); ok {
eh(p, t)
}
}
{{end}}
func handlerForInterface(handler interface{}) EventHandler {
switch v := handler.(type) {
case func(*Player, interface{}):
return interfaceEventHandler(v){{range .}}
case func(*Player, *{{.}}):
return {{privateName .}}EventHandler(v){{end}}
}
return nil
}
func init() { {{range .}}
registerInterfaceProvider({{privateName .}}EventHandler(nil)){{end}}
}
`))
func main() {
var buf bytes.Buffer
dir := filepath.Dir(".")
fs := token.NewFileSet()
parsedFile, err := parser.ParseFile(fs, "events.go", nil, 0)
if err != nil {
log.Fatalf("warning: internal error: could not parse events.go: %s", err)
return
}
names := []string{}
for object := range parsedFile.Scope.Objects {
names = append(names, object)
}
sort.Strings(names)
eventHandlerTmpl.Execute(&buf, names)
src, err := format.Source(buf.Bytes())
if err != nil {
log.Println("warning: internal error: invalid Go generated:", err)
src = buf.Bytes()
}
err = ioutil.WriteFile(filepath.Join(dir, strings.ToLower("eventhandlers.go")), src, 0644)
if err != nil {
log.Fatal(buf, "writing output: %s", err)
}
}
func constName(name string) string {
return name + "Event"
}
func privateName(name string) string {
return strings.ToLower(string(name[0])) + name[1:]
}