Compare commits

...

3 Commits

5 changed files with 65 additions and 45 deletions

2
go.mod
View File

@ -1,9 +1,11 @@
module meow.tf/astra/gavalink module meow.tf/astra/gavalink
require ( require (
github.com/bwmarrin/discordgo v0.20.3
github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300 github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/valyala/fastjson v1.4.1 github.com/valyala/fastjson v1.4.1
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
) )
go 1.13 go 1.13

6
go.sum
View File

@ -1,6 +1,12 @@
github.com/bwmarrin/discordgo v0.20.3 h1:AxjcHGbyBFSC0a3Zx5nDQwbOjU7xai5dXjRnZ0YB7nU=
github.com/bwmarrin/discordgo v0.20.3/go.mod h1:O9S4p+ofTFwB02em7jkpkV8M3R0/PUVOwN61zSZ0r4Q=
github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300 h1:/p4AzwhPqvi9V0Ktsd+4jpkTpkLbrns2AyCUrylJFDo= github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300 h1:/p4AzwhPqvi9V0Ktsd+4jpkTpkLbrns2AyCUrylJFDo=
github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300/go.mod h1:MDlIQ50PLaU0fUW0JcHFOxec8Q17F9byrnGi8ok5vVQ= github.com/foxbot/gavalink v0.0.0-20181105223750-6252b1245300/go.mod h1:MDlIQ50PLaU0fUW0JcHFOxec8Q17F9byrnGi8ok5vVQ=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/valyala/fastjson v1.4.1 h1:hrltpHpIpkaxll8QltMU8c3QZ5+qIiCL8yKqPFJI/yE= github.com/valyala/fastjson v1.4.1 h1:hrltpHpIpkaxll8QltMU8c3QZ5+qIiCL8yKqPFJI/yE=
github.com/valyala/fastjson v1.4.1/go.mod h1:nV6MsjxL2IMJQUoHDIrjEI7oLyeqK6aBD7EFWPsvP8o= github.com/valyala/fastjson v1.4.1/go.mod h1:nV6MsjxL2IMJQUoHDIrjEI7oLyeqK6aBD7EFWPsvP8o=
golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16 h1:y6ce7gCWtnH+m3dCjzQ1PCuwl28DDIc3VNnvY29DlIA=
golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

View File

@ -1,7 +1,9 @@
package gavalink package gavalink
import ( import (
"context"
"errors" "errors"
"golang.org/x/sync/errgroup"
"log" "log"
"net/http" "net/http"
"os" "os"
@ -65,41 +67,29 @@ func (l *Lavalink) AddNodes(nodeConfigs ...NodeConfig) error {
Timeout: 60 * time.Second, Timeout: 60 * time.Second,
} }
wg := &sync.WaitGroup{} eg, ctx := errgroup.WithContext(context.Background())
errCh := make(chan error)
for _, c := range nodeConfigs { for _, c := range nodeConfigs {
n := &Node{ eg.Go(func() error {
config: c, n := &Node{
manager: l, config: c,
client: client, manager: l,
} client: client,
}
wg.Add(1) err := n.open(ctx)
go func(n *Node, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
err := n.open()
if err != nil { if err != nil {
errCh <- err return err
return
} }
l.nodes = append(l.nodes, n) l.nodes = append(l.nodes, n)
}(n, wg, errCh)
return nil
})
} }
wg.Wait() return eg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
} }
// RemoveNode removes a node from the manager // RemoveNode removes a node from the manager
@ -117,12 +107,17 @@ func (l *Lavalink) removeNode(node *Node) error {
node.stop() node.stop()
danglingPlayers := make([]*Player, 0)
l.playersMu.RLock() l.playersMu.RLock()
for _, player := range l.players { for _, player := range l.players {
if player.node == node { if player.node == node {
player.node = nil
n, err := l.BestNode() n, err := l.BestNode()
if err != nil { if err != nil || n == nil {
danglingPlayers = append(danglingPlayers, player)
continue continue
} }
@ -131,6 +126,12 @@ func (l *Lavalink) removeNode(node *Node) error {
} }
l.playersMu.RUnlock() l.playersMu.RUnlock()
if len(danglingPlayers) > 0 {
for _, player := range danglingPlayers {
player.Destroy()
}
}
// temp var for easier reading // temp var for easier reading
n := l.nodes n := l.nodes
z := len(n) - 1 z := len(n) - 1

30
node.go
View File

@ -1,6 +1,7 @@
package gavalink package gavalink
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
@ -15,16 +16,21 @@ import (
// NodeConfig configures a Lavalink Node // NodeConfig configures a Lavalink Node
type NodeConfig struct { type NodeConfig struct {
// Node identifier (uuid, hostname, etc)
Identifier string
// REST is the host where Lavalink's REST server runs // REST is the host where Lavalink's REST server runs
// //
// This value is expected without a trailing slash, e.g. like // This value is expected without a trailing slash, e.g. like
// `http://localhost:2333` // `http://localhost:2333`
REST string REST string
// WebSocket is the host where Lavalink's WebSocket server runs // WebSocket is the host where Lavalink's WebSocket server runs
// //
// This value is expected without a trailing slash, e.g. like // This value is expected without a trailing slash, e.g. like
// `http://localhost:8012` // `http://localhost:8012`
WebSocket string WebSocket string
// Password is the expected Authorization header for the Node // Password is the expected Authorization header for the Node
Password string Password string
} }
@ -68,13 +74,14 @@ type FrameStats struct {
} }
// Opens the connection to the Lavalink server // Opens the connection to the Lavalink server
func (node *Node) open() error { func (node *Node) open(ctx context.Context) error {
header := http.Header{} header := http.Header{}
header.Set("User-Agent", gavalinkUserAgent())
header.Set("Authorization", node.config.Password) header.Set("Authorization", node.config.Password)
header.Set("Num-Shards", strconv.Itoa(node.manager.shards)) header.Set("Num-Shards", strconv.Itoa(node.manager.shards))
header.Set("User-Id", node.manager.userID) header.Set("User-Id", node.manager.userID)
header.Set("Client-Name", "Gavalink")
header.Set("User-Agent", gavalinkUserAgent())
if node.manager.capabilities != nil { if node.manager.capabilities != nil {
v := make([]string, 0) v := make([]string, 0)
@ -92,22 +99,25 @@ func (node *Node) open() error {
header.Set("Capabilities", strings.Join(v, ";")) header.Set("Capabilities", strings.Join(v, ";"))
} }
ws, resp, err := websocket.DefaultDialer.Dial(node.config.WebSocket, header) ws, resp, err := websocket.DefaultDialer.DialContext(ctx, node.config.WebSocket, header)
if err != nil { if err != nil {
return err return err
} }
// TODO: This isn't officially required, so we ignore it for now if it's empty.
vstr := resp.Header.Get("Lavalink-Major-Version") vstr := resp.Header.Get("Lavalink-Major-Version")
v, err := strconv.Atoi(vstr) if vstr != "" {
v, err := strconv.Atoi(vstr)
if err != nil { if err != nil {
return err return err
} }
if v < 3 { if v < 3 {
return errInvalidVersion return errInvalidVersion
}
} }
node.wsConn = ws node.wsConn = ws
@ -136,7 +146,7 @@ func (node *Node) listen() {
if err != nil { if err != nil {
Log.Println(err) Log.Println(err)
// try to reconnect // try to reconnect
oerr := node.open() oerr := node.open(context.Background())
if oerr != nil { if oerr != nil {
Log.Println("node", node.config.WebSocket, "failed and could not reconnect, destroying.", err, oerr) Log.Println("node", node.config.WebSocket, "failed and could not reconnect, destroying.", err, oerr)

View File

@ -184,22 +184,23 @@ func (player *Player) Forward(sessionID string, event VoiceServerUpdate) error {
func (player *Player) ChangeNode(node *Node) error { func (player *Player) ChangeNode(node *Node) error {
player.node = node player.node = node
player.Forward(player.sessionID, player.lastVoiceServerUpdate) if err := player.Forward(player.sessionID, player.lastVoiceServerUpdate); err != nil {
return err
}
return player.PlayAt(player.track, player.position, 0) return player.PlayAt(player.track, player.position, 0)
} }
// Destroy will destroy this player // Destroy will destroy this player
func (player *Player) Destroy() error { func (player *Player) Destroy() error {
msg := basicMessage{ if player.node != nil && player.node.wsConn != nil {
Op: opDestroy, msg := basicMessage{
GuildID: player.guildID, Op: opDestroy,
} GuildID: player.guildID,
}
err := player.node.wsConn.WriteJSON(msg) // We don't actually care if this goes through, since the node/connection may be invalid anyway.
player.node.wsConn.WriteJSON(msg)
if err != nil {
return err
} }
player.manager.playersMu.Lock() player.manager.playersMu.Lock()