gavalink/node.go

327 lines
7.0 KiB
Go
Raw Normal View History

2018-08-24 01:24:20 +00:00
package gavalink
import (
"context"
2018-08-24 01:24:20 +00:00
"encoding/json"
"fmt"
2019-01-13 23:37:38 +00:00
"github.com/valyala/fastjson"
2018-08-24 01:24:20 +00:00
"net/http"
2020-05-03 21:00:54 +00:00
"net/url"
2018-08-24 01:24:20 +00:00
"strconv"
"strings"
"time"
2018-08-24 01:24:20 +00:00
"github.com/gorilla/websocket"
)
// NodeConfig configures a Lavalink Node
type NodeConfig struct {
// Node identifier (uuid, hostname, etc)
Identifier string
2018-08-24 01:24:20 +00:00
// REST is the host where Lavalink's REST server runs
//
// This value is expected without a trailing slash, e.g. like
// `http://localhost:2333`
REST string
2018-08-24 01:24:20 +00:00
// WebSocket is the host where Lavalink's WebSocket server runs
//
// This value is expected without a trailing slash, e.g. like
// `http://localhost:8012`
WebSocket string
2018-08-24 01:24:20 +00:00
// Password is the expected Authorization header for the Node
Password string
}
// Node wraps a Lavalink Node
type Node struct {
config NodeConfig
stats *RemoteStats
2018-08-24 01:24:20 +00:00
manager *Lavalink
wsConn *websocket.Conn
2019-10-13 01:13:23 +00:00
client *http.Client
2018-08-24 01:24:20 +00:00
}
type RemoteStats struct {
Op string `json:"op"`
Players int `json:"players"`
ActivePlayers int `json:"playingPlayers"`
Uptime int64 `json:"uptime"`
Memory *MemoryStats `json:"memory"`
Cpu *CpuStats `json:"cpu"`
Frames *FrameStats `json:"frameStats"`
}
type MemoryStats struct {
Free uint64 `json:"free"`
Used uint64 `json:"used"`
Allocated uint64 `json:"allocated"`
Reserveable uint64 `json:"reserveable"`
}
type CpuStats struct {
Cores int `json:"cores"`
SystemLoad float64 `json:"systemLoad"`
LavalinkLoad float64 `json:"lavalinkLoad"`
}
type FrameStats struct {
Sent int `json:"sent"`
Nulled int `json:"nulled"`
Deficit int `json:"deficit"`
}
// Opens the connection to the Lavalink server
func (node *Node) open(ctx context.Context) error {
2018-08-24 01:24:20 +00:00
header := http.Header{}
2019-10-13 01:13:23 +00:00
header.Set("User-Agent", gavalinkUserAgent())
2018-08-24 01:24:20 +00:00
header.Set("Authorization", node.config.Password)
header.Set("Num-Shards", strconv.Itoa(node.manager.shards))
2018-08-24 01:24:20 +00:00
header.Set("User-Id", node.manager.userID)
if node.manager.capabilities != nil {
v := make([]string, 0)
for k, vals := range node.manager.capabilities {
b, err := json.Marshal(vals)
if err != nil {
continue
}
v = append(v, k+"="+string(b))
}
header.Set("Capabilities", strings.Join(v, ";"))
}
ws, resp, err := websocket.DefaultDialer.DialContext(ctx, node.config.WebSocket, header)
2019-10-13 01:13:23 +00:00
2018-08-24 01:24:20 +00:00
if err != nil {
return err
}
2019-10-13 01:13:23 +00:00
2018-08-24 01:24:20 +00:00
vstr := resp.Header.Get("Lavalink-Major-Version")
2019-10-13 01:13:23 +00:00
2018-08-24 01:24:20 +00:00
v, err := strconv.Atoi(vstr)
2019-10-13 01:13:23 +00:00
2018-08-24 01:24:20 +00:00
if err != nil {
return err
}
2019-10-13 01:13:23 +00:00
2018-08-24 01:24:20 +00:00
if v < 3 {
return errInvalidVersion
}
node.wsConn = ws
go node.listen()
Log.Println("node", node.config.WebSocket, "opened")
return nil
}
func (node *Node) stop() {
// someone already stopped this
if node.wsConn == nil {
return
}
2019-10-13 01:13:23 +00:00
2018-08-24 01:24:20 +00:00
_ = node.wsConn.Close()
}
func (node *Node) listen() {
2019-01-13 23:37:38 +00:00
var p fastjson.Parser
2018-08-24 01:24:20 +00:00
for {
msgType, msg, err := node.wsConn.ReadMessage()
2019-01-13 23:37:38 +00:00
2018-08-24 01:24:20 +00:00
if err != nil {
Log.Println(err)
// try to reconnect
oerr := node.open(context.Background())
2019-10-13 01:13:23 +00:00
2018-08-24 01:24:20 +00:00
if oerr != nil {
Log.Println("node", node.config.WebSocket, "failed and could not reconnect, destroying.", err, oerr)
node.manager.removeNode(node)
return
}
2019-10-13 01:13:23 +00:00
2018-08-24 01:24:20 +00:00
Log.Println("node", node.config.WebSocket, "reconnected")
return
}
2019-01-13 23:37:38 +00:00
if msgType != websocket.TextMessage {
continue
}
v, err := p.ParseBytes(msg)
2018-08-24 01:24:20 +00:00
if err != nil {
2019-01-13 23:37:38 +00:00
continue
2018-08-24 01:24:20 +00:00
}
node.onEvent(v, msg)
2018-08-24 01:24:20 +00:00
}
2019-01-13 23:37:38 +00:00
}
2018-08-24 01:24:20 +00:00
// Handle an event from the node
func (node *Node) onEvent(v *fastjson.Value, msg []byte) error {
2019-01-13 23:37:38 +00:00
op := jsonStringValue(v, "op")
2018-08-24 01:24:20 +00:00
2019-01-13 23:37:38 +00:00
switch op {
case opStats:
node.stats = &RemoteStats{}
err := json.Unmarshal(msg, &node.stats)
if err != nil {
return err
}
2018-08-24 01:24:20 +00:00
case opPlayerUpdate:
2019-01-13 23:37:38 +00:00
player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId"))
2018-08-24 01:24:20 +00:00
if err != nil {
return err
}
2019-01-13 23:37:38 +00:00
player.time = v.GetInt("state", "time")
player.position = v.GetInt("state", "position")
2018-08-24 01:24:20 +00:00
case opEvent:
2019-01-13 23:37:38 +00:00
player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId"))
2018-08-24 01:24:20 +00:00
if err != nil {
return err
}
2019-01-13 23:37:38 +00:00
track := jsonStringValue(v, "track")
switch jsonStringValue(v, "type") {
case eventTrackStart:
player.track = track
node.manager.handle(player, eventTrackStart, &TrackStart{
Track: track,
})
2018-08-24 01:24:20 +00:00
case eventTrackEnd:
player.track = ""
2018-08-24 01:24:20 +00:00
node.manager.handle(player, eventTrackEnd, &TrackEnd{
Track: track,
Reason: jsonStringValue(v, "reason"),
})
case eventTrackException:
ex := &TrackException{
Track: track,
Error: jsonStringValue(v, "error"),
}
2019-02-04 00:31:53 +00:00
if obj := v.Get("exception"); obj != nil {
var exception Exception
jsonUnmarshal(obj, &exception)
ex.Exception = exception
}
2019-02-04 00:31:53 +00:00
node.manager.handle(player, eventTrackException, ex)
case eventTrackStuck:
node.manager.handle(player, eventTrackStuck, &TrackStuck{
Track: track,
Threshold: time.Duration(v.GetInt("thresholdMs")) * time.Millisecond,
})
case eventVoiceProcessed:
data := &VoiceProcessingData{
node: node,
UserID: jsonStringValue(v, "userId"),
URL: fmt.Sprintf("%s/audio/%s", node.config.REST, track),
File: track,
}
2019-02-04 00:31:53 +00:00
node.manager.handle(player, eventVoiceProcessed, &VoiceProcessed{
Data: data,
Hotword: v.GetBool("hotword"),
Override: v.GetBool("override"),
})
2019-02-04 00:31:53 +00:00
}
return nil
2018-08-24 01:24:20 +00:00
default:
return errUnknownPayload
}
return nil
}
// CreatePlayer creates an audio player on this node
func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServerUpdate) (*Player, error) {
2019-01-13 23:37:38 +00:00
msg := voiceUpdateMessage{
2018-08-24 01:24:20 +00:00
Op: opVoiceUpdate,
GuildID: guildID,
SessionID: sessionID,
Event: &event,
}
2019-01-13 23:37:38 +00:00
err := node.writeMessage(msg)
2018-08-24 01:24:20 +00:00
if err != nil {
return nil, err
}
2019-01-13 23:37:38 +00:00
2018-08-24 01:24:20 +00:00
player := &Player{
2019-10-13 03:57:59 +00:00
guildID: guildID,
2019-10-13 04:04:01 +00:00
sessionID: sessionID,
2019-10-13 03:57:59 +00:00
manager: node.manager,
node: node,
vol: 100,
lastVoiceServerUpdate: event,
2018-08-24 01:24:20 +00:00
}
2019-01-13 23:37:38 +00:00
node.manager.playersMu.Lock()
defer node.manager.playersMu.Unlock()
2018-08-24 01:24:20 +00:00
node.manager.players[guildID] = player
2019-01-13 23:37:38 +00:00
2018-08-24 01:24:20 +00:00
return player, nil
}
// LoadTracks queries lavalink to return a Tracks object
//
// query should be a valid Lavaplayer query, including but not limited to:
// - A direct media URI
// - A direct Youtube /watch URI
// - A search query, prefixed with ytsearch: or scsearch:
//
// See the Lavaplayer Source Code for all valid options.
func (node *Node) LoadTracks(query string) (*Tracks, error) {
2020-05-03 21:00:54 +00:00
v := url.Values{}
v.Set("identifier", query)
2019-01-13 23:37:38 +00:00
2020-05-03 21:00:54 +00:00
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/loadtracks?%s", node.config.REST, v.Encode()), nil)
2019-01-13 23:37:38 +00:00
2018-08-24 01:24:20 +00:00
if err != nil {
return nil, err
}
2019-01-13 23:37:38 +00:00
req.Header.Set("User-Agent", gavalinkUserAgent())
2018-08-24 01:24:20 +00:00
req.Header.Set("Authorization", node.config.Password)
2019-10-13 01:13:23 +00:00
resp, err := node.client.Do(req)
2019-01-13 23:37:38 +00:00
2018-08-24 01:24:20 +00:00
if err != nil {
return nil, err
}
2019-01-13 23:37:38 +00:00
2018-08-24 01:24:20 +00:00
tracks := new(Tracks)
2019-01-13 23:37:38 +00:00
if err := json.NewDecoder(resp.Body).Decode(&tracks); err != nil {
2018-08-24 01:24:20 +00:00
return nil, err
}
2019-01-13 23:37:38 +00:00
2018-08-24 01:24:20 +00:00
return tracks, nil
}
2019-01-13 23:37:38 +00:00
// Write a JSON message via the node's websocket connection
2019-01-13 23:37:38 +00:00
func (node *Node) writeMessage(v interface{}) error {
return node.wsConn.WriteJSON(v)
}