【Go语言实战】 (16) gRPC 集成 ETCD 进行服务注册

网友投稿 253 2022-11-16

【Go语言实战】 (16) gRPC 集成 ETCD 进行服务注册

文章目录

​​写在前面​​​​1. 服务实例定义​​​​2. 服务实例注册​​​​3. 使用示例​​

写在前面

本文采用的 ETCD 版本库是 ​​go.etcd.io/etcd/client/v3​​​ 采用的 gRPC 版本库是 ​​​google.golang.org/grpc​​

在Go语言的RPC框架中,gRPC 是比较原生的,并没有集成 ETCD 服务发现的集成,需要我们去稍微封装一下。而像 micro 框架这种封装性比较好的就有集成 ETCD、consul 等等的服务发现功能,就直接调用就行了。

本文实战例子源码在 ​​服务实例定义

定义我们所需要注入进ETCD的服务结构体

type Server struct { Name string `json:"name"` Addr string `json:"addr"` // 地址 Version string `json:"version"` // 版本 Weight int64 `json:"weight"` // 权重}

Name:名字为服务的名字(用来进行服务的发现)Addr:服务的地址(存储服务地址)Version:服务的版本(方便服务的版本迭代)Weight:服务的权重(后续用来降级熔断)

定义服务名字前缀的函数

func BuildPrefix(server Server) string { if server.Version == "" { return fmt.Sprintf("/%s/", server.Name) } return fmt.Sprintf("/%s/%s/", server.Name, server.Version)}

定义注册的地址函数

func BuildRegisterPath(server Server) string { return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr)}

将值反序列化成一个注册 Server 服务

func ParseValue(value []byte) (Server, error) { server := Server{} if err := json.Unmarshal(value, &server); err != nil { return server, err } return server, nil}

分割路径,后续用作 Server 地址的更新

func SplitPath(path string) (Server, error) { server := Server{} strs := strings.Split(path, "/") if len(strs) == 0 { return server, errors.New("invalid path") } server.Addr = strs[len(strs)-1] return server, nil}

判断这个服务地址是否已经存在,防止服务访问冲突

func Exist(l []resolver.Address, addr resolver.Address) bool { for i := range l { if l[i].Addr == addr.Addr { return true } } return false}

移除服务

func Remove(s []resolver.Address, addr resolver.Address) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr.Addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false}

2. 服务实例注册

定义服务实例的实例,用来存储全部的实例信息,并且维持各个服务之间的执行,防止宕机等情况

type Register struct { EtcdAddrs []string DialTimeout int closeCh chan struct{} leasesID clientv3.LeaseID keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse srvInfo Server srvTTL int64 cli *clientv3.Client logger *logrus.Logger}

创建一个注册对象

func NewRegister(etcdAddrs []string, logger *logrus.Logger) *Register { return &Register{ EtcdAddrs: etcdAddrs, DialTimeout: 3, logger: logger, }}

注册服务到 ETCD 中

func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) { var err error if strings.Split(srvInfo.Addr, ":")[0] == "" { // 判断服务地址的正确性 return nil, errors.New("invalid ip address") } // 对服务进行注册 if r.cli, err = clientv3.New(clientv3.Config{ Endpoints: r.EtcdAddrs, DialTimeout: time.Duration(r.DialTimeout) * time.Second, }); err != nil { return nil, err } r.srvInfo = srvInfo // 服务信息的注册 r.srvTTL = ttl // 服务的存活时间 if err = r.register(); err != nil { return nil, err } // 初始化一个切片来判断这个服务连接是否关闭 r.closeCh = make(chan struct{}) // 异步进行心跳检测 go r.keepAlive() return r.closeCh, nil}

这里我们要先说明一个名词:租约

ETCD的 ​​Lease​​​ 租约,它类似 ​​TTL(Time To Live)​​,用于 etcd 客户端与服务端之间进行活性检测。

在到达 TTL 时间之前,etcd 服务端不会删除相关租约上绑定的键值对;超过 TTL 时间,则会删除。因此我们需要在到达 TTL 时间之前续租,以实现客户端与服务端之间的保活。

func (r *Register) register() error { //设置超时时间,访问etcd有超时控制 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second) defer cancel() // 注册一个新的租约 leaseResp, err := r.cli.Grant(ctx, r.srvTTL) if err != nil { return err } // 赋值租约的ID r.leasesID = leaseResp.ID // 对这个 cli 进行心跳检测 if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil { return err } data, err := json.Marshal(r.srvInfo) if err != nil { return err } // 将服务写到 ETCD 中 _, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID)) return err}

关闭服务连接

func (r *Register) Stop() { r.closeCh <- struct{}{}}

删除节点

func (r *Register) unregister() error { _, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo)) return err}

存活检测

func (r *Register) keepAlive() { ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second) for { select { case <-r.closeCh: // 是否存在这个服务 if err := r.unregister(); err != nil { r.logger.Error("unregister failed, error: ", err) } // 撤销租约 if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil { r.logger.Error("revoke failed, error: ", err) } case res := <-r.keepAliveCh: if res == nil { if err := r.register(); err != nil { r.logger.Error("register failed, error: ", err) } } case <-ticker.C: if r.keepAliveCh == nil { if err := r.register(); err != nil { r.logger.Error("register failed, error: ", err) } } } }}

获取注册服务的信息

func (r *Register) GetServerInfo() (Server, error) { resp, err := r.cli.Get(context.Background(), BuildRegisterPath(r.srvInfo)) if err != nil { return r.srvInfo, err } server := Server{} if resp.Count >= 1 { if err := json.Unmarshal(resp.Kvs[0].Value, &server); err != nil { return server, err } } return server, err}

3. 使用示例

服务注册

etcdRegister := discovery.NewRegister(etcdAddress, logrus.New())

定义一个Node存放服务信息

userNode := discovery.Server{ Name: viper.GetString("server.domain"), Addr: grpcAddress, }

注册

if _, err := etcdRegister.Register(userNode, 10); err != nil { panic(fmt.Sprintf("start server failed, err: %v", err))}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:大数据Hadoop之——基于Hive的内存型SQL查询引擎Impala(Impala环境部署)
下一篇:【ZYNQ Ultrascale+ MPSOC FPGA教程】第十二章RS422实验
相关文章

 发表评论

暂时没有评论,来抢沙发吧~