package discovery import ( "context" "encoding/json" "github.com/coreos/etcd/clientv3" "github.com/google/uuid" "path" ) type Registry struct { client *clientv3.Client path string } func New(endpoints []string) (*Registry, error) { client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, }) if err != nil { return nil, err } return &Registry{ client: client, }, nil } func (r *Registry) Register(ctx context.Context, service string, host Host) error { resp, err := r.client.Grant(context.Background(), 30) if err != nil { return err } if host.UUID == "" { host.UUID = uuid.New().String() } b, err := json.Marshal(host) if err != nil { return err } key := path.Join(r.path, service, host.UUID) _, err = r.client.Put(context.Background(), key, string(b), clientv3.WithLease(resp.ID)) if err != nil { return err } ch, err := r.client.KeepAlive(ctx, resp.ID) if err != nil { return err } go func() { for { select { case ka := <-ch: if ka == nil { return } // Removed } } }() return nil } func (r *Registry) Get(service string) ([]Host, error) { res, err := r.client.Get(context.Background(), path.Join(r.path, service), clientv3.WithPrefix()) if err != nil { return nil, err } var host Host hosts := make([]Host, len(res.Kvs)) for i, kv := range res.Kvs { host = Host{} if err = json.Unmarshal(kv.Value, &host); err != nil { continue } hosts[i] = host } return hosts, nil } func (r *Registry) Watch(service string) (chan Host, chan string) { ch := r.client.Watch(context.Background(), path.Join(r.path, service), clientv3.WithPrefix()) upChan := make(chan Host) downChan := make(chan string) go func() { for resp := range ch { for _, e := range resp.Events { if e.Type == clientv3.EventTypeDelete { // Removed downChan <- string(e.Kv.Key) } else if e.IsCreate() || e.IsModify() { host := Host{} if err := json.Unmarshal(e.Kv.Value, &host); err != nil { continue } upChan <- host } } } }() return upChan, downChan }