package gavalink import ( "encoding/json" "fmt" "github.com/valyala/fastjson" "net/http" "net/url" "strconv" "strings" "time" "github.com/gorilla/websocket" ) // NodeConfig configures a Lavalink Node type NodeConfig struct { // REST is the host where Lavalink's REST server runs // // This value is expected without a trailing slash, e.g. like // `http://localhost:2333` REST string // WebSocket is the host where Lavalink's WebSocket server runs // // This value is expected without a trailing slash, e.g. like // `http://localhost:8012` WebSocket string // Password is the expected Authorization header for the Node Password string } // Node wraps a Lavalink Node type Node struct { config NodeConfig stats *RemoteStats manager *Lavalink wsConn *websocket.Conn client *http.Client } type RemoteStats struct { Op string `json:"op"` Players int `json:"players"` ActivePlayers int `json:"playingPlayers"` Uptime int64 `json:"uptime"` Memory *MemoryStats `json:"memory"` Cpu *CpuStats `json:"cpu"` Frames *FrameStats `json:"frameStats"` } type MemoryStats struct { Free uint64 `json:"free"` Used uint64 `json:"used"` Allocated uint64 `json:"allocated"` Reserveable uint64 `json:"reserveable"` } type CpuStats struct { Cores int `json:"cores"` SystemLoad float64 `json:"systemLoad"` LavalinkLoad float64 `json:"lavalinkLoad"` } type FrameStats struct { Sent int `json:"sent"` Nulled int `json:"nulled"` Deficit int `json:"deficit"` } func (node *Node) open() error { header := http.Header{} header.Set("Authorization", node.config.Password) header.Set("Num-Shards", node.manager.shards) header.Set("User-Id", node.manager.userID) if node.manager.capabilities != nil { v := make([]string, 0) for k, vals := range node.manager.capabilities { b, err := json.Marshal(vals) if err != nil { continue } v = append(v, k+"="+string(b)) } header.Set("Capabilities", strings.Join(v, ";")) } ws, resp, err := websocket.DefaultDialer.Dial(node.config.WebSocket, header) if err != nil { return err } vstr := resp.Header.Get("Lavalink-Major-Version") v, err := strconv.Atoi(vstr) if err != nil { return err } if v < 3 { return errInvalidVersion } node.wsConn = ws go node.listen() Log.Println("node", node.config.WebSocket, "opened") return nil } func (node *Node) stop() { // someone already stopped this if node.wsConn == nil { return } _ = node.wsConn.Close() } func (node *Node) listen() { var p fastjson.Parser for { msgType, msg, err := node.wsConn.ReadMessage() if err != nil { Log.Println(err) // try to reconnect oerr := node.open() if oerr != nil { Log.Println("node", node.config.WebSocket, "failed and could not reconnect, destroying.", err, oerr) node.manager.removeNode(node) return } Log.Println("node", node.config.WebSocket, "reconnected") return } if msgType != websocket.TextMessage { continue } v, err := p.ParseBytes(msg) if err != nil { continue } node.onEvent(v, msg) } } func (node *Node) onEvent(v *fastjson.Value, msg []byte) error { op := jsonStringValue(v, "op") switch op { case opStats: node.stats = &RemoteStats{} err := json.Unmarshal(msg, &node.stats) if err != nil { return err } case opPlayerUpdate: player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId")) if err != nil { return err } player.time = v.GetInt("state", "time") player.position = v.GetInt("state", "position") case opEvent: player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId")) if err != nil { return err } track := jsonStringValue(v, "track") switch jsonStringValue(v, "type") { case eventTrackStart: player.track = track player.handle(eventTrackStart, &TrackStart{ Track: track, }) case eventTrackEnd: player.track = "" player.handle(eventTrackEnd, &TrackEnd{ Track: track, Reason: jsonStringValue(v, "reason"), }) case eventTrackException: ex := &TrackException{ Track: track, Error: jsonStringValue(v, "error"), } if obj := v.Get("exception"); obj != nil { var exception Exception jsonUnmarshal(obj, &exception) ex.Exception = exception } player.handle(eventTrackException, ex) case eventTrackStuck: player.handle(eventTrackStuck, &TrackStuck{ Track: track, Threshold: time.Duration(v.GetInt("thresholdMs")) * time.Millisecond, }) case eventVoiceProcessed: data := &VoiceProcessingData{ node: node, UserID: jsonStringValue(v, "userId"), URL: fmt.Sprintf("%s/audio/%s", node.config.REST, track), File: track, } player.handle(eventVoiceProcessed, &VoiceProcessed{ Data: data, Hotword: v.GetBool("hotword"), Override: v.GetBool("override"), }) } return nil default: return errUnknownPayload } return nil } // CreatePlayer creates an audio player on this node func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServerUpdate) (*Player, error) { msg := voiceUpdateMessage{ Op: opVoiceUpdate, GuildID: guildID, SessionID: sessionID, Event: &event, } err := node.writeMessage(msg) if err != nil { return nil, err } player := &Player{ guildID: guildID, sessionID: sessionID, manager: node.manager, node: node, vol: 100, lastVoiceServerUpdate: event, } node.manager.players[guildID] = player return player, nil } // LoadTracks queries lavalink to return a Tracks object // // query should be a valid Lavaplayer query, including but not limited to: // - A direct media URI // - A direct Youtube /watch URI // - A search query, prefixed with ytsearch: or scsearch: // // See the Lavaplayer Source Code for all valid options. func (node *Node) LoadTracks(query string) (*Tracks, error) { v := url.Values{} v.Set("identifier", query) req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/loadtracks?%s", node.config.REST, v.Encode()), nil) if err != nil { return nil, err } req.Header.Set("Authorization", node.config.Password) resp, err := node.client.Do(req) if err != nil { return nil, err } tracks := new(Tracks) if err := json.NewDecoder(resp.Body).Decode(&tracks); err != nil { return nil, err } return tracks, nil } func (node *Node) writeMessage(v interface{}) error { return node.wsConn.WriteJSON(v) }