c语言sscanf函数的用法是什么
253
2022-11-16
【Go语言实战】 (16) gRPC 集成 ETCD 进行服务注册
文章目录
写在前面1. 服务实例定义2. 服务实例注册3. 使用示例
写在前面
本文采用的 ETCD 版本库是 go.etcd.io/etcd/client/v3 采用的 gRPC 版本库是 google.golang.org/grpc
在Go语言的RPC框架中,gRPC 是比较原生的,并没有集成 ETCD 服务发现的集成,需要我们去稍微封装一下。而像 micro 框架这种封装性比较好的就有集成 ETCD、consul 等等的服务发现功能,就直接调用就行了。
本文实战例子源码在 服务实例定义
定义我们所需要注入进ETCD的服务结构体
type Server struct { Name string `json:"name"` Addr string `json:"addr"` // 地址 Version string `json:"version"` // 版本 Weight int64 `json:"weight"` // 权重}
Name:名字为服务的名字(用来进行服务的发现)Addr:服务的地址(存储服务地址)Version:服务的版本(方便服务的版本迭代)Weight:服务的权重(后续用来降级熔断)
定义服务名字前缀的函数
func BuildPrefix(server Server) string { if server.Version == "" { return fmt.Sprintf("/%s/", server.Name) } return fmt.Sprintf("/%s/%s/", server.Name, server.Version)}
定义注册的地址函数
func BuildRegisterPath(server Server) string { return fmt.Sprintf("%s%s", BuildPrefix(server), server.Addr)}
将值反序列化成一个注册 Server 服务
func ParseValue(value []byte) (Server, error) { server := Server{} if err := json.Unmarshal(value, &server); err != nil { return server, err } return server, nil}
分割路径,后续用作 Server 地址的更新
func SplitPath(path string) (Server, error) { server := Server{} strs := strings.Split(path, "/") if len(strs) == 0 { return server, errors.New("invalid path") } server.Addr = strs[len(strs)-1] return server, nil}
判断这个服务地址是否已经存在,防止服务访问冲突
func Exist(l []resolver.Address, addr resolver.Address) bool { for i := range l { if l[i].Addr == addr.Addr { return true } } return false}
移除服务
func Remove(s []resolver.Address, addr resolver.Address) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr.Addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false}
2. 服务实例注册
定义服务实例的实例,用来存储全部的实例信息,并且维持各个服务之间的执行,防止宕机等情况
type Register struct { EtcdAddrs []string DialTimeout int closeCh chan struct{} leasesID clientv3.LeaseID keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse srvInfo Server srvTTL int64 cli *clientv3.Client logger *logrus.Logger}
创建一个注册对象
func NewRegister(etcdAddrs []string, logger *logrus.Logger) *Register { return &Register{ EtcdAddrs: etcdAddrs, DialTimeout: 3, logger: logger, }}
注册服务到 ETCD 中
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) { var err error if strings.Split(srvInfo.Addr, ":")[0] == "" { // 判断服务地址的正确性 return nil, errors.New("invalid ip address") } // 对服务进行注册 if r.cli, err = clientv3.New(clientv3.Config{ Endpoints: r.EtcdAddrs, DialTimeout: time.Duration(r.DialTimeout) * time.Second, }); err != nil { return nil, err } r.srvInfo = srvInfo // 服务信息的注册 r.srvTTL = ttl // 服务的存活时间 if err = r.register(); err != nil { return nil, err } // 初始化一个切片来判断这个服务连接是否关闭 r.closeCh = make(chan struct{}) // 异步进行心跳检测 go r.keepAlive() return r.closeCh, nil}
这里我们要先说明一个名词:租约
ETCD的 Lease 租约,它类似 TTL(Time To Live),用于 etcd 客户端与服务端之间进行活性检测。
在到达 TTL 时间之前,etcd 服务端不会删除相关租约上绑定的键值对;超过 TTL 时间,则会删除。因此我们需要在到达 TTL 时间之前续租,以实现客户端与服务端之间的保活。
func (r *Register) register() error { //设置超时时间,访问etcd有超时控制 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second) defer cancel() // 注册一个新的租约 leaseResp, err := r.cli.Grant(ctx, r.srvTTL) if err != nil { return err } // 赋值租约的ID r.leasesID = leaseResp.ID // 对这个 cli 进行心跳检测 if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), r.leasesID); err != nil { return err } data, err := json.Marshal(r.srvInfo) if err != nil { return err } // 将服务写到 ETCD 中 _, err = r.cli.Put(context.Background(), BuildRegisterPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID)) return err}
关闭服务连接
func (r *Register) Stop() { r.closeCh <- struct{}{}}
删除节点
func (r *Register) unregister() error { _, err := r.cli.Delete(context.Background(), BuildRegisterPath(r.srvInfo)) return err}
存活检测
func (r *Register) keepAlive() { ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second) for { select { case <-r.closeCh: // 是否存在这个服务 if err := r.unregister(); err != nil { r.logger.Error("unregister failed, error: ", err) } // 撤销租约 if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil { r.logger.Error("revoke failed, error: ", err) } case res := <-r.keepAliveCh: if res == nil { if err := r.register(); err != nil { r.logger.Error("register failed, error: ", err) } } case <-ticker.C: if r.keepAliveCh == nil { if err := r.register(); err != nil { r.logger.Error("register failed, error: ", err) } } } }}
获取注册服务的信息
func (r *Register) GetServerInfo() (Server, error) { resp, err := r.cli.Get(context.Background(), BuildRegisterPath(r.srvInfo)) if err != nil { return r.srvInfo, err } server := Server{} if resp.Count >= 1 { if err := json.Unmarshal(resp.Kvs[0].Value, &server); err != nil { return server, err } } return server, err}
3. 使用示例
服务注册
etcdRegister := discovery.NewRegister(etcdAddress, logrus.New())
定义一个Node存放服务信息
userNode := discovery.Server{ Name: viper.GetString("server.domain"), Addr: grpcAddress, }
注册
if _, err := etcdRegister.Register(userNode, 10); err != nil { panic(fmt.Sprintf("start server failed, err: %v", err))}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~