gRPC服务发现

发布于:2025-07-06 ⋅ 阅读:(14) ⋅ 点赞:(0)

基于 etcd 实现的服务发现,按照非规范化的 etcd key 实现,详细见代码注释。

package discovery

import (
	"context"
	"encoding/json"
	"fmt"
	"go.etcd.io/etcd/api/v3/mvccpb"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc/resolver"
	"strings"
	"time"
)

// gRPC 的服务一般会使用 protobuf 作为数据传输的介质
// gRPC 服务定义在 proto 的文件中,例如:service RoutingService {}
// protoc 将 proto 后缀文件转为 go 文件,文件内自动生成了 gRPC 服务的描述信息、服务注册的函数、客户端声明的函数等内容
// 如下,它们的格式是固定的,注意函数的参数
// 服务描述信息:RoutingService_ServiceDesc,格式:服务名_ServiceDesc
// 服务注册函数:RegisterRoutingServiceServer,格式:Register你服务名Server
// 客户端声明函数:NewRoutingServiceClient,格式:New服务名Client
// 其中客户端声明函数的参数是 gRPC 连接,返回值是 gRPC 服务的客户端接口,这样就可以调用客户端接口定义的 rpc 方法了
// gRPC 连接不会与某个 gRPC 服务绑定,它只是一个连接。
// 获取 gRPC 连接的方式如下两种,第一个参数就是 gRPC 服务的地址,可以写死 ip + port,也可以使用服务发现来获取 gRPC 服务的地址。
// grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName))
// grpc.Dial(fmt.Sprintf("%s:///%s", scheme, serviceName))(废弃)
// 服务发现是实现 Builder 和 Resolver 接口,Builder 用于创建 Resolver 实例,Resolver 用于解析服务地址。
// Builder 的 Scheme 方法返回值是 与 grpc.NewClient 中的 scheme 对应
// Builder 的 Build 第一个参数 target.Endpoint() 得到的结果是 grpc.NewClient 中的 serviceName,Build 方法的触发分情况:
// grpc.NewClient 声明不会触发 Build 方法,首次调用 rpc 方法时触发 Build
// grpc.Dial 声明会触发 Build 方法,但已经废弃了
// Resolver 的 ResolveNow 方法是 gRPC 主动调用的,我们可以使用它动态去 etcd 中获取服务地址,也可以不实现它,自定义服务发现的逻辑

// 服务发现的实现方式:
// 假如我们有三个应用,user-center、device-center、网关,user-center 和 device-center 暴露了很多 gRPC 服务,网关需要调用它们的服务
// 假如我们使用 etcd 作为注册中心,同时规范化 etcd 的 key ,例如:grpc/services/{serviceName}/实例ID
// grpc/services/user-center/实例1
// grpc/services/user-center/实例2
// grpc/services/device-center/实例1
// grpc/services/device-center/实例2
// 网关中分别实现 Builder 和 Resolver,并将 Builder 的实例注册在 grpc 的地址解析中,resolver.Register(Builder实现的实例)
// 获取 user-center 和 device-center 的 grpc 连接
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "user-center"))
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "device-center"))
// 当 gRPC 连接建立时,gRPC 会调用 Builder 的 Build 方法,我们获取 target.Endpoint() 就是 serviceName
// 这样 fmt.Sprintf("grpc/services/%s", serviceName) 获取 serviceName 的 etcd 的 key 前缀
// 如:grpc/services/user-center/
// Build 方法中按前缀匹配查询 etcd 的数据,这样就获取到了 user-center 的所有实例的地址,再同步到 Resolver 中
// 如上就实现了规范化 etcd 的 key 前缀的服务发现,不管有多少个应用,代码中只需要一个服务发现的实例

// 如果没有规范化 etcd 的 key 前缀,那么我们需要为各个服务声明不同的 scheme,每个 scheme 对应一个服务发现的实例
// Builder 的实现必须包含 etcd 的 key 前缀 ,不能利用 target.Endpoint() 去实现服务发现
// 如:ServiceDiscovery 实现了 Builder
// type ServiceDiscovery struct {
//		serverKey string
// }
// grpc/services/user-center/ 固定写死赋值给 serverKey
// 声明 ServiceDiscovery { serverKey },注册 resolver.Register(ServiceDiscovery实例)
// grpc.NewClient(fmt.Sprintf("%s:///%s", "user", "user-center"))

// 普通 rpc 调用时,服务端挂掉:
// 服务发现找不到数据时:rpc error: code = Unavailable desc = no children to pick from
// 服务挂掉但etcd/服务发现还有数据:rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.202.160.190:6888: connect: connection refused"
// 服务重启后客户端连接可以恢复
// 当某个服务节点不可用时,可以自动连接到可用的服务节点

// 流式 rpc 调用,服务端挂掉:
// 客户端发送方:EOF
// 客户端接收方:rpc error: code = Unavailable desc = error reading from server: EOF
// 服务重启后客户端连接不可恢复,必须重新建立连接

// ServiceDiscovery is a gRPC resolver that uses etcd for service discovery.
// 配合 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 来使用
// Build 方法的 target.Endpoint() 就是 serviceName
type ServiceDiscovery struct {
	scheme     string // 自定义的 grpc 服务的 scheme,例如:grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)
	serviceKey string // etcd 中服务的 key 前缀,例如:grpc/maxwell-ai/GatewayInfoService/1.0/
	etcdClient *clientv3.Client
}

// ServiceResolver is a gRPC resolver that resolves service addresses from etcd.
// 一个 scheme 对应一个 ServiceResolver,当 grpc 建立连接时触发 ServiceDiscovery 的 Build 方法
// 注意:
// grpc.NewClient 不会触发 Build 方法
// grpc.Dial 会触发 Build 方法,但已经废弃了
type ServiceResolver struct {
	scheme     string
	serviceKey string

	target  resolver.Target
	client  *clientv3.Client
	cc      resolver.ClientConn
	addrMap map[string]resolver.Address
	closed  chan struct{}
}

func NewServiceDiscovery(scheme string, serviceKey string, etcdClient *clientv3.Client) *ServiceDiscovery {
	return &ServiceDiscovery{
		scheme:     scheme,
		serviceKey: serviceKey,
		etcdClient: etcdClient,
	}
}

// Build creates a new ServiceDiscovery resolver.
// grpc.NewClient 不会触发 Build 方法
// grpc.Dial 会触发 Build 方法,但已经废弃了
// target: grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 中的 serviceName 就是 target
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	// 创建服务解析器
	serviceResolver := &ServiceResolver{
		target:     target,
		cc:         cc,
		scheme:     s.scheme,
		serviceKey: s.serviceKey,
		client:     s.etcdClient,
		closed:     make(chan struct{}),
		addrMap:    make(map[string]resolver.Address),
	}

	// 首次拉取所有数据
	if err := serviceResolver.rePull(); err != nil {
		return nil, err
	}
	// 开启 watcher 监听 etcd 中的服务地址变化
	go serviceResolver.watcher()

	return serviceResolver, nil
}

// Scheme returns the scheme of the resolver.
// scheme 是 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)
func (s *ServiceDiscovery) Scheme() string {
	return s.scheme
}

// ResolveNow is called by gRPC to resolve the service address immediately.
// grpc 主动调用去解析服务地址,这里可以实现从 etcd 获取服务地址的逻辑
// 但是不在这里实现,因为这里实现有同步和异步从 etcd 中查询数据
// 同步会阻塞
// 异步会开启很多 goroutine,可能会导致 goroutine 泄漏
func (s *ServiceResolver) ResolveNow(options resolver.ResolveNowOptions) {

}

func (s *ServiceResolver) Close() {
	close(s.closed)
}

func (s *ServiceResolver) rePull() error {
	ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancelFunc()
	resp, err := s.client.Get(ctx, s.serviceKey, clientv3.WithPrefix())
	if err != nil {
		return err
	}
	s.addrMap = make(map[string]resolver.Address)
	for _, ev := range resp.Kvs {
		key := strings.TrimPrefix(string(ev.Key), s.serviceKey)
		s.addServer(key, ev.Value)
	}
	s.syncToGrpc()
	return nil
}

func (s *ServiceResolver) addServer(key string, value []byte) {
	var si ServiceInfo
	if err := json.Unmarshal(value, &si); err != nil {
		return
	}
	s.addrMap[key] = resolver.Address{
		Addr: fmt.Sprintf("%s:%d", si.Ip, si.Port),
	}
}

func (s *ServiceResolver) delServer(key string) {
	if _, ok := s.addrMap[key]; ok {
		delete(s.addrMap, key)
	}
}

func (s *ServiceResolver) syncToGrpc() {
	addrSlice := make([]resolver.Address, 0, 10)
	for _, v := range s.addrMap {
		addrSlice = append(addrSlice, v)
	}
	err := s.cc.UpdateState(resolver.State{Addresses: addrSlice})
	if err != nil {
		return
	}
}

func (s *ServiceResolver) watcher() {
	rePull := false
	for {
		select {
		case <-s.closed:
			return
		default:
		}

		if rePull {
			if err := s.rePull(); err != nil {
				time.Sleep(5 * time.Second)
				continue
			}
		}
		rch := s.client.Watch(context.Background(), s.serviceKey, clientv3.WithPrefix())
	loop:
		for {
			select {
			case <-s.closed:
				return
			case resp, ok := <-rch:
				if !ok {
					rePull = true
					break loop
				}
				for _, ev := range resp.Events {
					key := strings.TrimPrefix(string(ev.Kv.Key), s.serviceKey)
					switch ev.Type {
					case mvccpb.PUT:
						s.addServer(key, ev.Kv.Value)
					case mvccpb.DELETE:
						s.delServer(key)
					}
				}
				s.syncToGrpc()
			}
		}
	}
}