327 lines
7.0 KiB
Go
327 lines
7.0 KiB
Go
package gavalink
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/valyala/fastjson"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// NodeConfig configures a Lavalink Node
|
|
type NodeConfig struct {
|
|
// Node identifier (uuid, hostname, etc)
|
|
Identifier string
|
|
|
|
// REST is the host where Lavalink's REST server runs
|
|
//
|
|
// This value is expected without a trailing slash, e.g. like
|
|
// `http://localhost:2333`
|
|
REST string
|
|
|
|
// WebSocket is the host where Lavalink's WebSocket server runs
|
|
//
|
|
// This value is expected without a trailing slash, e.g. like
|
|
// `http://localhost:8012`
|
|
WebSocket string
|
|
|
|
// Password is the expected Authorization header for the Node
|
|
Password string
|
|
}
|
|
|
|
// Node wraps a Lavalink Node
|
|
type Node struct {
|
|
config NodeConfig
|
|
stats *RemoteStats
|
|
manager *Lavalink
|
|
wsConn *websocket.Conn
|
|
client *http.Client
|
|
}
|
|
|
|
type RemoteStats struct {
|
|
Op string `json:"op"`
|
|
Players int `json:"players"`
|
|
ActivePlayers int `json:"playingPlayers"`
|
|
Uptime int64 `json:"uptime"`
|
|
Memory *MemoryStats `json:"memory"`
|
|
Cpu *CpuStats `json:"cpu"`
|
|
Frames *FrameStats `json:"frameStats"`
|
|
}
|
|
|
|
type MemoryStats struct {
|
|
Free uint64 `json:"free"`
|
|
Used uint64 `json:"used"`
|
|
Allocated uint64 `json:"allocated"`
|
|
Reserveable uint64 `json:"reserveable"`
|
|
}
|
|
|
|
type CpuStats struct {
|
|
Cores int `json:"cores"`
|
|
SystemLoad float64 `json:"systemLoad"`
|
|
LavalinkLoad float64 `json:"lavalinkLoad"`
|
|
}
|
|
|
|
type FrameStats struct {
|
|
Sent int `json:"sent"`
|
|
Nulled int `json:"nulled"`
|
|
Deficit int `json:"deficit"`
|
|
}
|
|
|
|
// Opens the connection to the Lavalink server
|
|
func (node *Node) open(ctx context.Context) error {
|
|
header := http.Header{}
|
|
|
|
header.Set("User-Agent", gavalinkUserAgent())
|
|
header.Set("Authorization", node.config.Password)
|
|
header.Set("Num-Shards", strconv.Itoa(node.manager.shards))
|
|
header.Set("User-Id", node.manager.userID)
|
|
|
|
if node.manager.capabilities != nil {
|
|
v := make([]string, 0)
|
|
|
|
for k, vals := range node.manager.capabilities {
|
|
b, err := json.Marshal(vals)
|
|
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
v = append(v, k+"="+string(b))
|
|
}
|
|
|
|
header.Set("Capabilities", strings.Join(v, ";"))
|
|
}
|
|
|
|
ws, resp, err := websocket.DefaultDialer.DialContext(ctx, node.config.WebSocket, header)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
vstr := resp.Header.Get("Lavalink-Major-Version")
|
|
|
|
v, err := strconv.Atoi(vstr)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if v < 3 {
|
|
return errInvalidVersion
|
|
}
|
|
|
|
node.wsConn = ws
|
|
go node.listen()
|
|
|
|
Log.Println("node", node.config.WebSocket, "opened")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (node *Node) stop() {
|
|
// someone already stopped this
|
|
if node.wsConn == nil {
|
|
return
|
|
}
|
|
|
|
_ = node.wsConn.Close()
|
|
}
|
|
|
|
func (node *Node) listen() {
|
|
var p fastjson.Parser
|
|
|
|
for {
|
|
msgType, msg, err := node.wsConn.ReadMessage()
|
|
|
|
if err != nil {
|
|
Log.Println(err)
|
|
// try to reconnect
|
|
oerr := node.open(context.Background())
|
|
|
|
if oerr != nil {
|
|
Log.Println("node", node.config.WebSocket, "failed and could not reconnect, destroying.", err, oerr)
|
|
node.manager.removeNode(node)
|
|
return
|
|
}
|
|
|
|
Log.Println("node", node.config.WebSocket, "reconnected")
|
|
return
|
|
}
|
|
|
|
if msgType != websocket.TextMessage {
|
|
continue
|
|
}
|
|
|
|
v, err := p.ParseBytes(msg)
|
|
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
node.onEvent(v, msg)
|
|
}
|
|
}
|
|
|
|
// Handle an event from the node
|
|
func (node *Node) onEvent(v *fastjson.Value, msg []byte) error {
|
|
op := jsonStringValue(v, "op")
|
|
|
|
switch op {
|
|
case opStats:
|
|
node.stats = &RemoteStats{}
|
|
|
|
err := json.Unmarshal(msg, &node.stats)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case opPlayerUpdate:
|
|
player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId"))
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
player.time = v.GetInt("state", "time")
|
|
player.position = v.GetInt("state", "position")
|
|
case opEvent:
|
|
player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
track := jsonStringValue(v, "track")
|
|
|
|
switch jsonStringValue(v, "type") {
|
|
case eventTrackStart:
|
|
player.track = track
|
|
|
|
node.manager.handle(player, eventTrackStart, &TrackStart{
|
|
Track: track,
|
|
})
|
|
case eventTrackEnd:
|
|
player.track = ""
|
|
|
|
node.manager.handle(player, eventTrackEnd, &TrackEnd{
|
|
Track: track,
|
|
Reason: jsonStringValue(v, "reason"),
|
|
})
|
|
case eventTrackException:
|
|
ex := &TrackException{
|
|
Track: track,
|
|
Error: jsonStringValue(v, "error"),
|
|
}
|
|
|
|
if obj := v.Get("exception"); obj != nil {
|
|
var exception Exception
|
|
jsonUnmarshal(obj, &exception)
|
|
ex.Exception = exception
|
|
}
|
|
|
|
node.manager.handle(player, eventTrackException, ex)
|
|
case eventTrackStuck:
|
|
node.manager.handle(player, eventTrackStuck, &TrackStuck{
|
|
Track: track,
|
|
Threshold: time.Duration(v.GetInt("thresholdMs")) * time.Millisecond,
|
|
})
|
|
case eventVoiceProcessed:
|
|
data := &VoiceProcessingData{
|
|
node: node,
|
|
UserID: jsonStringValue(v, "userId"),
|
|
URL: fmt.Sprintf("%s/audio/%s", node.config.REST, track),
|
|
File: track,
|
|
}
|
|
|
|
node.manager.handle(player, eventVoiceProcessed, &VoiceProcessed{
|
|
Data: data,
|
|
Hotword: v.GetBool("hotword"),
|
|
Override: v.GetBool("override"),
|
|
})
|
|
}
|
|
|
|
return nil
|
|
default:
|
|
return errUnknownPayload
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreatePlayer creates an audio player on this node
|
|
func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServerUpdate) (*Player, error) {
|
|
msg := voiceUpdateMessage{
|
|
Op: opVoiceUpdate,
|
|
GuildID: guildID,
|
|
SessionID: sessionID,
|
|
Event: &event,
|
|
}
|
|
|
|
err := node.writeMessage(msg)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
player := &Player{
|
|
guildID: guildID,
|
|
sessionID: sessionID,
|
|
manager: node.manager,
|
|
node: node,
|
|
vol: 100,
|
|
lastVoiceServerUpdate: event,
|
|
}
|
|
|
|
node.manager.playersMu.Lock()
|
|
defer node.manager.playersMu.Unlock()
|
|
node.manager.players[guildID] = player
|
|
|
|
return player, nil
|
|
}
|
|
|
|
// LoadTracks queries lavalink to return a Tracks object
|
|
//
|
|
// query should be a valid Lavaplayer query, including but not limited to:
|
|
// - A direct media URI
|
|
// - A direct Youtube /watch URI
|
|
// - A search query, prefixed with ytsearch: or scsearch:
|
|
//
|
|
// See the Lavaplayer Source Code for all valid options.
|
|
func (node *Node) LoadTracks(query string) (*Tracks, error) {
|
|
v := url.Values{}
|
|
v.Set("identifier", query)
|
|
|
|
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/loadtracks?%s", node.config.REST, v.Encode()), nil)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req.Header.Set("User-Agent", gavalinkUserAgent())
|
|
req.Header.Set("Authorization", node.config.Password)
|
|
|
|
resp, err := node.client.Do(req)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tracks := new(Tracks)
|
|
|
|
if err := json.NewDecoder(resp.Body).Decode(&tracks); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return tracks, nil
|
|
}
|
|
|
|
// Write a JSON message via the node's websocket connection
|
|
func (node *Node) writeMessage(v interface{}) error {
|
|
return node.wsConn.WriteJSON(v)
|
|
}
|