fastjson, cleanup

This commit is contained in:
Tyler 2019-01-13 18:37:38 -05:00
parent 6252b12453
commit 580ef99f87
6 changed files with 214 additions and 198 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/base64" "encoding/base64"
"encoding/binary" "encoding/binary"
"github.com/valyala/fastjson"
"io" "io"
) )
@ -126,3 +127,19 @@ func readString(r io.Reader) (string, error) {
return string(buf), nil return string(buf), nil
} }
func jsonStringValue(v *fastjson.Value, keys ... string) string {
value := v.Get(keys...)
if value == nil {
return ""
}
strB, err := value.StringBytes()
if err != nil {
return ""
}
return string(strB)
}

View File

@ -46,6 +46,7 @@ func NewLavalink(shards string, userID string) *Lavalink {
// AddNodes adds a node to the Lavalink manager // AddNodes adds a node to the Lavalink manager
func (lavalink *Lavalink) AddNodes(nodeConfigs ...NodeConfig) error { func (lavalink *Lavalink) AddNodes(nodeConfigs ...NodeConfig) error {
nodes := make([]Node, len(nodeConfigs)) nodes := make([]Node, len(nodeConfigs))
for i, c := range nodeConfigs { for i, c := range nodeConfigs {
n := Node{ n := Node{
config: c, config: c,
@ -57,7 +58,9 @@ func (lavalink *Lavalink) AddNodes(nodeConfigs ...NodeConfig) error {
} }
nodes[i] = n nodes[i] = n
} }
lavalink.nodes = append(lavalink.nodes, nodes...) lavalink.nodes = append(lavalink.nodes, nodes...)
return nil return nil
} }
@ -103,8 +106,10 @@ func (lavalink *Lavalink) BestNode() (*Node, error) {
// GetPlayer gets a player for a guild // GetPlayer gets a player for a guild
func (lavalink *Lavalink) GetPlayer(guild string) (*Player, error) { func (lavalink *Lavalink) GetPlayer(guild string) (*Player, error) {
p, ok := lavalink.players[guild] p, ok := lavalink.players[guild]
if !ok { if !ok {
return nil, errPlayerNotFound return nil, errPlayerNotFound
} }
return p, nil return p, nil
} }

39
messages.go Normal file
View File

@ -0,0 +1,39 @@
package gavalink
type basicMessage struct {
Op string `json:"op"`
GuildID string `json:"guildId,omitempty"`
}
type playMessage struct {
Op string `json:"op"`
GuildID string `json:"guildId,omitempty"`
Track string `json:"track,omitempty"`
StartTime string `json:"startTime,omitempty"`
EndTime string `json:"endTime,omitempty"`
}
type pauseMessage struct {
Op string `json:"op"`
GuildID string `json:"guildId,omitempty"`
Pause bool `json:"pause,omitempty"`
}
type seekMessage struct {
Op string `json:"op"`
GuildID string `json:"guildId,omitempty"`
Position *int `json:"position,omitempty"`
}
type volumeMessage struct {
Op string `json:"op"`
GuildID string `json:"guildId,omitempty"`
Volume int `json:"volume,omitempty"`
}
type voiceUpdateMessage struct {
Op string `json:"op"`
GuildID string `json:"guildId,omitempty"`
SessionID string `json:"sessionId,omitempty"`
Event *VoiceServerUpdate `json:"event,omitempty"`
}

View File

@ -68,35 +68,6 @@ const (
eventTrackStuck = "TrackStuckEvent" eventTrackStuck = "TrackStuckEvent"
) )
type message struct {
Op string `json:"op"`
GuildID string `json:"guildId,omitempty"`
SessionID string `json:"sessionId,omitempty"`
Event *VoiceServerUpdate `json:"event,omitempty"`
Track string `json:"track,omitempty"`
StartTime string `json:"startTime,omitempty"`
EndTime string `json:"endTime,omitempty"`
Pause *bool `json:"pause,omitempty"`
Position *int `json:"position,omitempty"`
Volume *int `json:"volume,omitempty"`
State *state `json:"state,omitempty"`
Type string `json:"type,omitempty"`
Reason string `json:"reason,omitempty"`
Error string `json:"error,omitempty"`
ThresholdMs int `json:"thresholdMs,omitempty"`
StatCPU *statCPU `json:"cpu,omitempty"`
// TODO: stats
}
type state struct {
Time int `json:"time"`
Position int `json:"position"`
}
type statCPU struct {
Load float32 `json:"lavalinkLoad"`
}
// VoiceServerUpdate is a raw Discord VOICE_SERVER_UPDATE event // VoiceServerUpdate is a raw Discord VOICE_SERVER_UPDATE event
type VoiceServerUpdate struct { type VoiceServerUpdate struct {
GuildID string `json:"guild_id"` GuildID string `json:"guild_id"`

91
node.go
View File

@ -3,7 +3,7 @@ package gavalink
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "github.com/valyala/fastjson"
"net/http" "net/http"
"strconv" "strconv"
@ -70,8 +70,11 @@ func (node *Node) stop() {
} }
func (node *Node) listen() { func (node *Node) listen() {
var p fastjson.Parser
for { for {
msgType, msg, err := node.wsConn.ReadMessage() msgType, msg, err := node.wsConn.ReadMessage()
if err != nil { if err != nil {
Log.Println(err) Log.Println(err)
// try to reconnect // try to reconnect
@ -84,53 +87,54 @@ func (node *Node) listen() {
Log.Println("node", node.config.WebSocket, "reconnected") Log.Println("node", node.config.WebSocket, "reconnected")
return return
} }
err = node.onEvent(msgType, msg)
// TODO: better error handling? if msgType != websocket.TextMessage {
continue
}
v, err := p.ParseBytes(msg)
if err != nil { if err != nil {
Log.Println(err) continue
} }
node.onEvent(v)
} }
} }
func (node *Node) onEvent(msgType int, msg []byte) error { func (node *Node) onEvent(v *fastjson.Value) error {
if msgType != websocket.TextMessage { op := jsonStringValue(v, "op")
return errUnknownPayload
}
m := message{} switch op {
err := json.Unmarshal(msg, &m)
if err != nil {
return err
}
switch m.Op {
case opPlayerUpdate: case opPlayerUpdate:
player, err := node.manager.GetPlayer(m.GuildID) player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId"))
if err != nil {
return err
}
player.time = m.State.Time
player.position = m.State.Position
case opEvent:
player, err := node.manager.GetPlayer(m.GuildID)
if err != nil { if err != nil {
return err return err
} }
switch m.Type { player.time = v.GetInt("state", "time")
player.position = v.GetInt("state", "position")
case opEvent:
player, err := node.manager.GetPlayer(jsonStringValue(v, "guildId"))
if err != nil {
return err
}
track := jsonStringValue(v, "track")
switch jsonStringValue(v, "type") {
case eventTrackEnd: case eventTrackEnd:
player.track = "" player.track = ""
err = player.handler.OnTrackEnd(player, m.Track, m.Reason) err = player.handler.OnTrackEnd(player, track, jsonStringValue(v, "reason"))
case eventTrackException: case eventTrackException:
err = player.handler.OnTrackException(player, m.Track, m.Reason) err = player.handler.OnTrackException(player, track, jsonStringValue(v, "reason"))
case eventTrackStuck: case eventTrackStuck:
err = player.handler.OnTrackStuck(player, m.Track, m.ThresholdMs) err = player.handler.OnTrackStuck(player, track, v.GetInt("thresholdMs"))
} }
return err return err
case opStats: case opStats:
node.load = m.StatCPU.Load node.load = float32(v.GetFloat64("cpu", "lavalinkLoad"))
default: default:
return errUnknownPayload return errUnknownPayload
} }
@ -140,20 +144,19 @@ func (node *Node) onEvent(msgType int, msg []byte) error {
// CreatePlayer creates an audio player on this node // CreatePlayer creates an audio player on this node
func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServerUpdate, handler EventHandler) (*Player, error) { func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServerUpdate, handler EventHandler) (*Player, error) {
msg := message{ msg := voiceUpdateMessage{
Op: opVoiceUpdate, Op: opVoiceUpdate,
GuildID: guildID, GuildID: guildID,
SessionID: sessionID, SessionID: sessionID,
Event: &event, Event: &event,
} }
data, err := json.Marshal(msg)
if err != nil { err := node.writeMessage(msg)
return nil, err
}
err = node.wsConn.WriteMessage(websocket.TextMessage, data)
if err != nil { if err != nil {
return nil, err return nil, err
} }
player := &Player{ player := &Player{
guildID: guildID, guildID: guildID,
manager: node.manager, manager: node.manager,
@ -161,7 +164,9 @@ func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServ
handler: handler, handler: handler,
vol: 100, vol: 100,
} }
node.manager.players[guildID] = player node.manager.players[guildID] = player
return player, nil return player, nil
} }
@ -175,24 +180,30 @@ func (node *Node) CreatePlayer(guildID string, sessionID string, event VoiceServ
// See the Lavaplayer Source Code for all valid options. // See the Lavaplayer Source Code for all valid options.
func (node *Node) LoadTracks(query string) (*Tracks, error) { func (node *Node) LoadTracks(query string) (*Tracks, error) {
url := fmt.Sprintf("%s/loadtracks?identifier=%s", node.config.REST, query) url := fmt.Sprintf("%s/loadtracks?identifier=%s", node.config.REST, query)
req, err := http.NewRequest(http.MethodGet, url, nil) req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
req.Header.Set("Authorization", node.config.Password) req.Header.Set("Authorization", node.config.Password)
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
tracks := new(Tracks) tracks := new(Tracks)
err = json.Unmarshal(data, &tracks)
if err != nil { if err := json.NewDecoder(resp.Body).Decode(&tracks); err != nil {
return nil, err return nil, err
} }
return tracks, nil return tracks, nil
} }
func (node *Node) writeMessage(v interface{}) error {
return node.wsConn.WriteJSON(v)
}

View File

@ -1,10 +1,7 @@
package gavalink package gavalink
import ( import (
"encoding/json"
"strconv" "strconv"
"github.com/gorilla/websocket"
) )
// Player is a Lavalink player // Player is a Lavalink player
@ -40,19 +37,15 @@ func (player *Player) PlayAt(track string, startTime int, endTime int) error {
start := strconv.Itoa(startTime) start := strconv.Itoa(startTime)
end := strconv.Itoa(endTime) end := strconv.Itoa(endTime)
msg := message{ msg := playMessage{
Op: opPlay, Op: opPlay,
GuildID: player.guildID, GuildID: player.guildID,
Track: track, Track: track,
StartTime: start, StartTime: start,
EndTime: end, EndTime: end,
} }
data, err := json.Marshal(msg)
if err != nil { return player.node.writeMessage(msg)
return err
}
err = player.node.wsConn.WriteMessage(websocket.TextMessage, data)
return err
} }
// Track returns the player's current track // Track returns the player's current track
@ -63,33 +56,26 @@ func (player *Player) Track() string {
// Stop will stop the currently playing track // Stop will stop the currently playing track
func (player *Player) Stop() error { func (player *Player) Stop() error {
player.track = "" player.track = ""
msg := message{
msg := basicMessage{
Op: opStop, Op: opStop,
GuildID: player.guildID, GuildID: player.guildID,
} }
data, err := json.Marshal(msg)
if err != nil { return player.node.writeMessage(msg)
return err
}
err = player.node.wsConn.WriteMessage(websocket.TextMessage, data)
return err
} }
// Pause will pause or resume the player, depending on the pause parameter // Pause will pause or resume the player, depending on the pause parameter
func (player *Player) Pause(pause bool) error { func (player *Player) Pause(pause bool) error {
player.paused = pause player.paused = pause
msg := message{ msg := pauseMessage{
Op: opPause, Op: opPause,
GuildID: player.guildID, GuildID: player.guildID,
Pause: &pause, Pause: pause,
} }
data, err := json.Marshal(msg)
if err != nil { return player.node.writeMessage(msg)
return err
}
err = player.node.wsConn.WriteMessage(websocket.TextMessage, data)
return err
} }
// Paused returns whether or not the player is currently paused // Paused returns whether or not the player is currently paused
@ -99,17 +85,13 @@ func (player *Player) Paused() bool {
// Seek will seek the player to the speicifed position, in millis // Seek will seek the player to the speicifed position, in millis
func (player *Player) Seek(position int) error { func (player *Player) Seek(position int) error {
msg := message{ msg := seekMessage{
Op: opSeek, Op: opSeek,
GuildID: player.guildID, GuildID: player.guildID,
Position: &position, Position: &position,
} }
data, err := json.Marshal(msg)
if err != nil { return player.node.wsConn.WriteJSON(msg)
return err
}
err = player.node.wsConn.WriteMessage(websocket.TextMessage, data)
return err
} }
// Position returns the player's position, as reported by Lavalink // Position returns the player's position, as reported by Lavalink
@ -127,17 +109,13 @@ func (player *Player) Volume(volume int) error {
player.vol = volume player.vol = volume
msg := message{ msg := volumeMessage{
Op: opVolume, Op: opVolume,
GuildID: player.guildID, GuildID: player.guildID,
Volume: &volume, Volume: volume,
} }
data, err := json.Marshal(msg)
if err != nil { return player.node.wsConn.WriteJSON(msg)
return err
}
err = player.node.wsConn.WriteMessage(websocket.TextMessage, data)
return err
} }
// GetVolume gets the player's volume level // GetVolume gets the player's volume level
@ -154,34 +132,29 @@ func (player *Player) GetVolume() int {
// To move a player to a new Node, first player.Destroy() it, and then // To move a player to a new Node, first player.Destroy() it, and then
// create a new player on the new node. // create a new player on the new node.
func (player *Player) Forward(sessionID string, event VoiceServerUpdate) error { func (player *Player) Forward(sessionID string, event VoiceServerUpdate) error {
msg := message{ msg := voiceUpdateMessage{
Op: opVoiceUpdate, Op: opVoiceUpdate,
GuildID: player.guildID, GuildID: player.guildID,
SessionID: sessionID, SessionID: sessionID,
Event: &event, Event: &event,
} }
data, err := json.Marshal(msg)
if err != nil { return player.node.wsConn.WriteJSON(msg)
return err
}
err = player.node.wsConn.WriteMessage(websocket.TextMessage, data)
return err
} }
// Destroy will destroy this player // Destroy will destroy this player
func (player *Player) Destroy() error { func (player *Player) Destroy() error {
msg := message{ msg := basicMessage{
Op: opDestroy, Op: opDestroy,
GuildID: player.guildID, GuildID: player.guildID,
} }
data, err := json.Marshal(msg)
if err != nil { err := player.node.wsConn.WriteJSON(msg)
return err
}
err = player.node.wsConn.WriteMessage(websocket.TextMessage, data)
if err != nil { if err != nil {
return err return err
} }
delete(player.manager.players, player.guildID) delete(player.manager.players, player.guildID)
return nil return nil
} }