etcd實現服務發現和注冊

bytemode · · 198 次點擊 · · 開始瀏覽    
## 原理 etcd實現服務發現和注冊,使用的是kv存儲、租約、watch. 向etcd 注冊 該服務(其實就是 存一個值)然后向etcd 發送心跳,當etcd 沒有檢測到心跳就會 把這個鍵值對 刪了(這整個動作是etcd里的租約模式),網關那邊 就只需要 watch 這個 key ,就能夠知道 所有服務的所有動態了. 注冊的時候可以使用前綴這樣在watch的時候可以watch所有的服務器. ## 服務注冊 1. 租約模式,客戶端申請一個租約設置過期時間,keepalive沒個固定時間進行租約續期,通過租約存儲key.過期不續租則etcd會刪除租約上的所有key 2. 相同服務存儲的key的前綴可以設置成一樣 3. 注冊服務就是向服務端使用租約模式寫入一個key ``` package main import ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3" ) //創建租約注冊服務 type ServiceRegister struct { etcdClient *clientv3.Client //etcd client lease clientv3.Lease //租約 leaseResp *clientv3.LeaseGrantResponse //設置租約時間返回 canclefunc func() //租約撤銷 //租約keepalieve相應chan keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse key string //注冊的key } func NewServiceRegister(addr []string, timeNum int64) (*ServiceReg, error) { conf := clientv3.Config{ Endpoints: addr, DialTimeout: 5 * time.Second, } var ( client *clientv3.Client ) //連接etcd if clientTem, err := clientv3.New(conf); err == nil { etcdClient = clientTem } else { return nil, err } ser := &ServiceRegister{ etcdClient: client, } //申請租約設置時間keepalive if err := ser.setLease(timeNum); err != nil { return nil, err } //監聽續租相應chan go ser.ListenLeaseRespChan() return ser, nil } //設置租約 func (this *ServiceRegister) setLease(timeNum int64) error { //申請租約 lease := clientv3.NewLease(this.etcdClient) //設置租約時間 leaseResp, err := lease.Grant(context.TODO(), timeNum) if err != nil { return err } //設置續租 定期發送需求請求 ctx, cancelFunc := context.WithCancel(context.TODO()) leaseRespChan, err := lease.KeepAlive(ctx, leaseResp.ID) if err != nil { return err } this.lease = lease this.leaseResp = leaseResp this.canclefunc = cancelFunc this.keepAliveChan = leaseRespChan return nil } //監聽 續租情況 func (this *ServiceRegister) ListenLeaseRespChan() { for { select { case leaseKeepResp := <-this.keepAliveChan: if leaseKeepResp == nil { fmt.Printf("已經關閉續租功能\n") return } else { fmt.Printf("續租成功\n") } } } } //通過租約 注冊服務 func (this *ServiceRegister) PutService(key, val string) error { //帶租約的模式寫入數據即注冊服務 kv := clientv3.NewKV(this.etcdClient) _, err := kv.Put(context.TODO(), key, val, clientv3.WithLease(this.leaseResp.ID)) return err } //撤銷租約 func (this *ServiceRegister) RevokeLease() error { this.canclefunc() time.Sleep(2 * time.Second) _, err := this.lease.Revoke(context.TODO(), this.leaseResp.ID) return err } func main() { ser,_ := NewServiceRegister([]string{"127.0.0.1:2379"}, 5) ser.PutService("/server/node1", "node1") select{} } ``` ## 服務發現 1. 創建一個client 連到etcd. 2. 匹配到所有相同前綴的 key. 存儲信息到本地 3. watch這個key前綴,當有增加或者刪除的時候就修改本地 4. 本地維護server的列表 ``` import ( "go.etcd.io/etcd/clientv3" "time" "context" "go.etcd.io/etcd/mvcc/mvccpb" "sync" "log" ) type ServiceDiscovery struct { client *clientv3.Client serverList map[string]string lock sync.Mutex } func NewServiceDiscovery (addr []string)( *ServiceDiscovery, error){ conf := clientv3.Config{ Endpoints: addr, DialTimeout: 5 * time.Second, } if client, err := clientv3.New(conf); err == nil { return &ClientDis{ client:client, serverList:make(map[string]string), }, nil } else { return nil ,err } } func (this * ServiceDiscovery) GetService(prefix string) ([]string ,error){ //使用key前桌獲取所有的etcd上所有的server resp, err := this.client.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return nil, err } //解析出所有的server放入本地 addrs := this.extractAddrs(resp) //warch server前綴 將變更寫入本地 go this.watcher(prefix) return addrs ,nil } // 監聽key前綴 func (this *ServiceDiscovery) watcher(prefix string) { //監聽 返回監聽事件chan rch := this.client.Watch(context.Background(), prefix, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: //修改或者新增 this.SetServiceList(string(ev.Kv.Key),string(ev.Kv.Value)) case mvccpb.DELETE: //刪除 this.DelServiceList(string(ev.Kv.Key)) } } } } func (this *ServiceDiscovery) extractAddrs(resp *clientv3.GetResponse) []string { addrs := make([]string,0) if resp == nil || resp.Kvs == nil { return addrs } for i := range resp.Kvs { if v := resp.Kvs[i].Value; v != nil { this.SetServiceList(string(resp.Kvs[i].Key),string(resp.Kvs[i].Value)) addrs = append(addrs, string(v)) } } return addrs } func (this *ServiceDiscovery) SetServiceList(key,val string) { this.lock.Lock() defer this.lock.Unlock() this.serverList[key] = string(val) log.Println("set data key :",key,"val:",val) } func (this *ServiceDiscovery) DelServiceList(key string) { this.lock.Lock() defer this.lock.Unlock() delete(this.serverList,key) log.Println("del data key:", key) } func (this *ServiceDiscovery) SerList2Array()[]string { this.lock.Lock() defer this.lock.Unlock() addrs := make([]string,0) for _, v := range this.serverList { addrs = append(addrs,v) } return addrs } func main () { cli,_ := NewServiceDiscovery([]string{"127.0.0.1:2379"}) cli.GetService("/server") select {} } ```

入群交流(和以上內容無關):Go中文網 QQ 交流群:729884609 或加微信入微信群:274768166 備注:入群;關注公眾號:Go語言中文網

198 次點擊  
加入收藏 微博
上一篇:grpc快速使用
下一篇:etcd快速入門
暫無回復
添加一條新回復 (您需要 登錄 后才能回復 沒有賬號 ?)
  • 請盡量讓自己的回復能夠對別人有幫助
  • 支持 Markdown 格式, **粗體**、~~刪除線~~、`單行代碼`
  • 支持 @ 本站用戶;支持表情(輸入 : 提示),見 Emoji cheat sheet
  • 圖片支持拖拽、截圖粘貼等方式上傳