Update to new etcd clientv3 version

This commit is contained in:
Tyler 2022-11-25 06:11:24 -05:00
parent 324d45b871
commit fe505d6188
3 changed files with 1236 additions and 85 deletions

View File

@ -1,127 +1,127 @@
package discovery package discovery
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/coreos/etcd/clientv3" "github.com/google/uuid"
"github.com/google/uuid" "go.etcd.io/etcd/client/v3"
"path" "path"
) )
type Registry struct { type Registry struct {
client *clientv3.Client client *clientv3.Client
path string path string
} }
func New(endpoints []string) (*Registry, error) { func New(endpoints []string) (*Registry, error) {
client, err := clientv3.New(clientv3.Config{ client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints, Endpoints: endpoints,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Registry{ return &Registry{
client: client, client: client,
}, nil }, nil
} }
func (r *Registry) Register(ctx context.Context, service string, host Host) error { func (r *Registry) Register(ctx context.Context, service string, host Host) error {
resp, err := r.client.Grant(context.Background(), 30) resp, err := r.client.Grant(context.Background(), 30)
if err != nil { if err != nil {
return err return err
} }
if host.UUID == "" { if host.UUID == "" {
host.UUID = uuid.New().String() host.UUID = uuid.New().String()
} }
b, err := json.Marshal(host) b, err := json.Marshal(host)
if err != nil { if err != nil {
return err return err
} }
key := path.Join(r.path, service, host.UUID) key := path.Join(r.path, service, host.UUID)
_, err = r.client.Put(context.Background(), key, string(b), clientv3.WithLease(resp.ID)) _, err = r.client.Put(context.Background(), key, string(b), clientv3.WithLease(resp.ID))
if err != nil { if err != nil {
return err return err
} }
ch, err := r.client.KeepAlive(ctx, resp.ID) ch, err := r.client.KeepAlive(ctx, resp.ID)
if err != nil { if err != nil {
return err return err
} }
go func() { go func() {
for { for {
select { select {
case ka := <-ch: case ka := <-ch:
if ka == nil { if ka == nil {
return return
} }
// Removed // Removed
} }
} }
}() }()
return nil return nil
} }
func (r *Registry) Get(service string) ([]Host, error) { func (r *Registry) Get(service string) ([]Host, error) {
res, err := r.client.Get(context.Background(), path.Join(r.path, service), clientv3.WithPrefix()) res, err := r.client.Get(context.Background(), path.Join(r.path, service), clientv3.WithPrefix())
if err != nil { if err != nil {
return nil, err return nil, err
} }
var host Host var host Host
hosts := make([]Host, len(res.Kvs)) hosts := make([]Host, len(res.Kvs))
for i, kv := range res.Kvs { for i, kv := range res.Kvs {
host = Host{} host = Host{}
if err = json.Unmarshal(kv.Value, &host); err != nil { if err = json.Unmarshal(kv.Value, &host); err != nil {
continue continue
} }
hosts[i] = host hosts[i] = host
} }
return hosts, nil return hosts, nil
} }
func (r *Registry) Watch(service string) (chan Host, chan string) { func (r *Registry) Watch(service string) (chan Host, chan string) {
ch := r.client.Watch(context.Background(), path.Join(r.path, service), clientv3.WithPrefix()) ch := r.client.Watch(context.Background(), path.Join(r.path, service), clientv3.WithPrefix())
upChan := make(chan Host) upChan := make(chan Host)
downChan := make(chan string) downChan := make(chan string)
go func() { go func() {
for resp := range ch { for resp := range ch {
for _, e := range resp.Events { for _, e := range resp.Events {
if e.Type == clientv3.EventTypeDelete { if e.Type == clientv3.EventTypeDelete {
// Removed // Removed
downChan <- string(e.Kv.Key) downChan <- string(e.Kv.Key)
} else if e.IsCreate() || e.IsModify() { } else if e.IsCreate() || e.IsModify() {
host := Host{} host := Host{}
if err := json.Unmarshal(e.Kv.Value, &host); err != nil { if err := json.Unmarshal(e.Kv.Value, &host); err != nil {
continue continue
} }
upChan <- host upChan <- host
} }
} }
} }
}() }()
return upChan, downChan return upChan, downChan
} }

17
go.mod
View File

@ -3,6 +3,17 @@ module meow.tf/go/service-discovery
go 1.15 go 1.15
require ( require (
github.com/coreos/etcd v3.3.13+incompatible github.com/coreos/etcd v3.3.27+incompatible
github.com/google/uuid v1.1.1 github.com/coreos/go-semver v0.3.0 // indirect
) github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.3.0
go.etcd.io/etcd/client/v3 v3.5.6 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/net v0.2.0 // indirect
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
google.golang.org/grpc v1.51.0 // indirect
)

1140
go.sum Normal file

File diff suppressed because it is too large Load Diff