Golang | gRPC索引服务

发布于:2025-05-29 ⋅ 阅读:(25) ⋅ 点赞:(0)
  • gRPC(Google Remote Procedure Call)是 Google 开源的一种高性能、通用的开源 RPC 框架。它基于 HTTP/2 协议、使用 Protocol Buffers(protobuf)作为接口定义语言(IDL)和序列化机制,支持多种语言。
  • RPC(Remote Procedure Call):远程过程调用,允许客户端像调用本地函数一样调用远程服务器上的函数。
  • IDL(接口定义语言):使用 .proto 文件定义服务、方法、消息类型等。
  • Protocol Buffers:Google 提供的一种轻量高效的结构化数据序列化方式,比 JSON、XML 更紧凑快速。
  • 架构:Client <-> Stub <-> Channel <-> 网络 <-> Server <-> Handler,Client Stub:本地代理,封装了 RPC 的调用过程。Channel:客户端到服务端的连接。Server:服务端实现了服务的接口。Handler:处理具体业务逻辑的函数。
  • 工作原理:
    • 开发者编写 .proto 文件定义服务和消息结构。
    • 使用 protoc 编译器生成多语言代码(如 Python、C++、Go)。
    • 客户端通过 Stub 调用远程方法,参数以二进制形式发送。
    • 服务端接收到请求,调用对应的服务处理逻辑。
    • 返回结果通过同样的方式返回给客户端。
// gRPC 的完整流程(以 Go 为例,其他语言相似)

[1] 编写 proto 文件(定义服务)
          ↓
[2] 使用 protoc 生成服务端和客户端代码
          ↓
[3] 服务端实现业务逻辑
          ↓
[4] 客户端调用 gRPC 接口
          ↓
[5] 服务端部署 + 启动监听端口
          ↓
[6] 客户端连接并远程调用成功
  • 在 gRPC 中,开发者首先通过 .proto 文件定义服务接口和消息结构。
  • 客户端和服务端都使用该文件生成代码,然后分别构造请求和实现逻辑。
  • 调用时,gRPC 框架会自动将结构体参数序列化为 ProtoBuf 二进制格式,通过 HTTP/2 传输到服务端,服务端反序列化后调用实际函数,最终返回响应并反序列化给客户端。

  • .proto 文件是用来定义你要通信的数据结构和接口的“说明书”。

  • 就像你在写:

    • 什么函数可以调用(服务名 + 方法名)
    • 每个函数的参数和返回值长什么样(结构体)
  • 然后 gRPC 根据 .proto 文件自动帮你生成:

    • Go 的 struct(对应 message)
    • 接口(对应 service)
    • gRPC 通信代码(打包解包、序列化反序列化)
  • gRPC 中 .proto 文件是客户端与服务端之间共享的“契约”。客户端必须拿到这个文件,通过 protoc 生成 stub,才能正确发起调用。这是 gRPC 能实现跨语言、高效通信的核心所在。

在这里插入图片描述
在这里插入图片描述

  • .proto 文件定义了服务的“抽象语义”(函数签名 + 数据结构),然后用工具生成各语言的代码框架,最终由开发者写上具体逻辑,整个服务就能跨语言运行并返回结果了。
  • .proto 文件是用来描述“服务接口”和“数据结构”的中立语言规范,它是 gRPC 通信的基础。它定义了你要传什么数据、调用什么函数,用来生成跨语言代码框架。
  • 有了 .proto 文件,我们可以自动生成 gRPC 通信代码,客户端就能像调用本地函数一样,跨进程、跨机器调用服务端的函数,gRPC 框架会自动完成底层传输。
[.proto 文件]
     ↓(protoc 编译)
[生成客户端/服务端代码][你写服务端实现 + 启动 gRPC Server][你写客户端调用 + 建立 gRPC 连接][客户端像本地一样调用方法 → gRPC框架序列化请求 → HTTP/2 发送][服务端解包 → 调用函数 → 返回结果 → 自动回传客户端]
// doc.proto

syntax = "proto3";

package types;

message Keyword {
  string Field = 1;
  string Word = 2;
}

message Document {
  string Id = 1;          //业务使用的唯一Id,索引上此Id不会重复
  uint64 IntId = 2;       //倒排索引上使用的文档id(业务侧不用管这个字段)
  uint64 BitsFeature = 3; //每个bit都表示某种特征的取值
  repeated Keyword Keywords = 4;      //倒排索引的key
  bytes Bytes = 5;        //业务实体序列化之后的结果
}

// go install github.com/gogo/protobuf/protoc-gen-gogofaster
// protoc --gogofaster_out=./types --proto_path=./types doc.proto
// term_query.proto

syntax = "proto3";

package types;

import "types/proto/doc.proto";

message TermQuery {
  Keyword Keyword = 1;    //Keyword类型引用自doc.proto
  repeated TermQuery Must = 2;
  repeated TermQuery Should = 3;
}

// protoc -I=C:/Users/jmh00/GolandProjects/criker-search --gogofaster_out=./types/term_query --proto_path=./types/term_query term_query.proto
// 在windows上-I需使用绝对路径
// index.proto

syntax = "proto3";

package index_service;

// 从-I指定的目录下寻找该proto文件
import "types/proto/doc.proto";
import "types/proto/term_query.proto";

message DocId {
  string DocId = 1;
}

message AffectedCount {
  int32 Count = 1;
}

message SearchRequest {
  types.TermQuery Query = 1;  //TermQuery类型引用自term_query.proto
  uint64 OnFlag = 2;
  uint64 OffFlag = 3;
  repeated uint64 OrFlags = 4; //repeated 这个字段是一个数组(列表),可以出现 0 次、1 次或多次。
}

message SearchResult {
  repeated types.Document Results = 1;
}

message CountRequest {
}

service IndexService {
  rpc DeleteDoc(DocId) returns (AffectedCount);
  rpc AddDoc(types.Document) returns (AffectedCount);
  rpc Search(SearchRequest) returns (SearchResult);
  rpc Count(CountRequest) returns (AffectedCount);
}

// protoc -I=C:/Users/jmh00/GolandProjects/criker-search --gogofaster_opt=Mdoc.proto=C:/Users/jmh00/GolandProjects/criker-search/types --gogofaster_opt=Mterm_query.proto=C:/Users/jmh00/GolandProjects/criker-search/types --gogofaster_out=plugins=grpc:./index_service --proto_path=./index_service/proto index.proto
// 在windows上-I需使用绝对路径
// --gogofaster_opt=M指示了.proto里的import转到.go里该怎么写,比如.proto里写import "doc.proto",转到.go里就应该写 import "github.com/Orisun/radic/v2/types"
// -I和--gogofaster_opt=M可以有多个

  • 生成代码
protoc \
-I=C:/Users/jmh00/GolandProjects/criker-search \
--gogofaster_opt=Mdoc.proto=C:/Users/jmh00/GolandProjects/criker-search/types \
--gogofaster_opt=Mterm_query.proto=C:/Users/jmh00/GolandProjects/criker-search/types \
--gogofaster_out=plugins=grpc:./index_service \
--proto_path=./index_service/proto \
index.proto

在这里插入图片描述


  • 实现服务端
// index_service.go

package index_service

import (
	"context"
	"fmt"
	"github.com/jmh000527/criker-search/index_service/service_hub"
	"github.com/jmh000527/criker-search/types"
	"github.com/jmh000527/criker-search/utils"
	"strconv"
	"time"
)

const (
	IndexService = "index_service"
)

// IndexServiceWorker 代表一个gRPC服务器,负责处理索引相关的服务请求。
// 它包括正排索引和倒排索引的管理,以及与服务注册中心的交互。
type IndexServiceWorker struct {
	Indexer  *LocalIndexer          // 正排索引和倒排索引的组合,用于处理文档的索引和搜索
	hub      service_hub.ServiceHub // 服务注册和发现相关的配置,负责服务的注册、注销和发现
	selfAddr string                 // 当前服务实例的地址,用于注册到服务中心和服务发现
}

// Init 初始化索引服务。
// 该方法负责初始化IndexServiceWorker的索引管理器,并设置相关的数据库类型和数据目录。
//
// 参数:
//   - DocNumEstimate: 预计文档数量,用于初始化倒排索引。
//   - dbtype: 数据库类型,决定使用哪种数据库存储索引数据。
//   - DataDir: 数据目录,数据库文件存放的路径。
//
// 返回值:
//   - error: 如果初始化过程中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) Init(DocNumEstimate int, dbtype int, DataDir string) error {
	// 创建一个新的Indexer实例
	w.Indexer = new(LocalIndexer)
	// 初始化Indexer实例,并传递文档数量估计、数据库类型和数据目录
	return w.Indexer.Init(DocNumEstimate, dbtype, DataDir)
}

// RegisterService 注册服务到etcd。如果提供了etcdServers,则创建EtcdServiceHub并注册服务。
// 如果etcdServers为空,则表示使用单机模式,不进行服务注册。
//
// 参数:
//   - etcdServers: etcd服务器地址列表。如果为空,则表示不进行服务注册。
//   - servicePort: 服务端口号。必须大于1024。
//
// 返回值:
//   - error: 如果传入的端口号无效或服务注册过程中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) RegisterService(etcdServers []string, servicePort int) error {
	// 检查是否需要注册服务到etcd
	if len(etcdServers) > 0 {
		// 验证服务端口号是否合法
		if servicePort <= 1024 {
			return fmt.Errorf("无效的服务端口号 %d,服务端口必须大于1024", servicePort)
		}

		// 获取本地IP地址
		localIP, err := utils.GetLocalIP()
		if err != nil {
			return fmt.Errorf("获取本地IP地址失败: %v", err)
		}

		// 单机模式下,将本地IP写死为127.0.0.1
		localIP = "127.0.0.1"
		w.selfAddr = localIP + ":" + strconv.Itoa(servicePort)

		// 设置心跳频率
		var heartbeatFrequency int64 = 3

		// 获取EtcdServiceHub实例(单例模式)
		hub := service_hub.GetServiceHub(etcdServers, heartbeatFrequency)

		// 注册服务到etcd,初始时租约ID为0
		leaseID, err := hub.RegisterService(IndexService, w.selfAddr, 0)
		if err != nil {
			return fmt.Errorf("服务注册失败: %v", err)
		}

		// 设置hub
		w.hub = hub

		// 启动一个协程,定期续约服务租约
		go func() {
			for {
				_, err := hub.RegisterService(IndexService, w.selfAddr, leaseID)
				if err != nil {
					utils.Log.Printf("续约服务租约失败,租约ID: %v, 错误: %v", leaseID, err)
				}
				// 心跳间隔时间稍短于最大超时时间
				time.Sleep(time.Duration(heartbeatFrequency)*time.Second - 100*time.Millisecond)
			}
		}()
	}
	return nil
}

// LoadFromIndexFile 从索引文件中加载数据。在系统重启后,可以通过此方法从持久化的索引文件中恢复数据。
//
// 返回值:
//   - int: 加载成功的文档数量。如果加载过程中发生错误,则返回0。
func (w *IndexServiceWorker) LoadFromIndexFile() int {
	return w.Indexer.LoadFromIndexFile()
}

// Close 关闭索引服务。如果服务在etcd中注册过,则需要注销服务;否则只需要关闭索引。
//
// 返回值:
//   - error: 如果在注销服务或关闭索引过程中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) Close() error {
	// 检查是否需要注销服务
	if w.hub != nil {
		// 注销服务
		err := w.hub.UnregisterService(IndexService, w.selfAddr)
		if err != nil {
			utils.Log.Printf("注销服务失败,服务地址: %v, 错误: %v", w.selfAddr, err)
			return err
		}
		utils.Log.Printf("注销服务成功,服务地址: %v", w.selfAddr)
	}

	// 关闭索引
	return w.Indexer.Close()
}

// DeleteDoc 从索引中删除文档。根据提供的文档ID删除对应的文档。
//
// 参数:
//   - ctx: 上下文,用于处理请求的生命周期和取消操作。
//   - docId: 包含要删除的文档ID。
//
// 返回值:
//   - *AffectedCount: 删除操作影响的文档数量。
//   - error: 如果删除操作中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) DeleteDoc(ctx context.Context, docId *DocId) (*AffectedCount, error) {
	// 调用Indexer的DeleteDoc方法删除文档,并返回影响的文档数量
	return &AffectedCount{
		Count: int32(w.Indexer.DeleteDoc(docId.DocId)),
	}, nil
}

// AddDoc 向索引中添加文档。如果文档已经存在,会先删除旧文档再添加新文档。
//
// 参数:
//   - ctx: 上下文,用于处理请求的生命周期和取消操作。
//   - doc: 要添加的文档对象。
//
// 返回值:
//   - *AffectedCount: 添加操作影响的文档数量。
//   - error: 如果添加操作中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) AddDoc(ctx context.Context, doc *types.Document) (*AffectedCount, error) {
	// 调用Indexer的AddDoc方法添加文档,并返回影响的文档数量
	n, err := w.Indexer.AddDoc(*doc)
	return &AffectedCount{
		Count: int32(n),
	}, err
}

// Search 执行检索操作,返回符合查询条件的文档列表。
//
// 参数:
//   - ctx: 上下文,用于处理请求的生命周期和取消操作。
//   - request: 包含检索查询的请求对象。
//
// 返回值:
//   - *SearchResult: 包含检索结果的文档列表。
//   - error: 如果检索操作中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) Search(ctx context.Context, request *SearchRequest) (*SearchResult, error) {
	// 调用Indexer的Search方法进行检索,并返回检索结果
	result := w.Indexer.Search(request.Query, request.OnFlag, request.OffFlag, request.OrFlags)
	return &SearchResult{
		Results: result,
	}, nil
}

// Count 返回索引中当前文档的数量。
//
// 参数:
//   - ctx: 上下文,用于处理请求的生命周期和取消操作。
//   - request: 包含计数请求的对象。
//
// 返回值:
//   - *AffectedCount: 当前索引中的文档数量。
//   - error: 如果计数操作中发生错误,则返回相应的错误。
func (w *IndexServiceWorker) Count(ctx context.Context, request *CountRequest) (*AffectedCount, error) {
	// 调用Indexer的Count方法获取文档数量,并返回结果
	return &AffectedCount{
		Count: int32(w.Indexer.Count()),
	}, nil
}

  • 分布式搜索引擎中,每台私人服务器只存储部分数据。
  • 多个index worker之间需要互相通信,协同合作。
  • 通信方式包括网络接口,采用gRPC方式进行服务接口的提供。
  • index service包含三个接口:删除document、添加document和搜索。
  • 删除接口接收document ID,返回受影响的条数。
  • 添加接口接收document,返回添加成功的条数。
  • 搜索接口接收搜索请求,返回搜索结果。
  • index.proto引用了doc.proto和term_query.proto,跨文件和跨目录引用。
  • 使用-I参数指定proto文件的搜索路径。
  • 转成go代码后,import路径需要修改为正确的路径。

网站公告

今日签到

点亮在社区的每一天
去签到