- 哨兵(centennial)负责接待客人,直接与调用方对接。
- 哨兵的核心组件包括service HUB和connection pool。
- service HUB用于与服务中心通信,获取可提供服务的节点信息。
- connection pool用于缓存与index worker的连接,避免每次搜索时重新建立连接。
- 连接池初始化为空map。
- 提供函数获取指定endpoint的GRPC连接。
- 函数首先检查本地缓存中是否有可用连接,若无则创建新连接。
- 创建连接时默认立即返回,可选阻塞模式直到连接可用。
- 连接建立后放入缓存并返回。
- 哨兵提供添加、删除和搜索三个核心功能。
- 添加功能:随机选择一台index worker添加新文档。
- 删除功能:遍历所有endpoint,并行删除指定文档。
- 搜索功能:将搜索请求发送到所有endpoint,合并搜索结果。
- 使用channel进行并发搜索结果的收集。
- 上游并发写入channel,下游读取channel数据到切片。
- 使用wait group等待所有搜索任务完成。
- 关闭channel后仍可读取,确保读取到所有数据。
package index_service
import (
"context"
"fmt"
"github.com/jmh000527/criker-search/index_service/service_hub"
"github.com/jmh000527/criker-search/types"
"github.com/jmh000527/criker-search/utils"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"sync"
"sync/atomic"
"time"
)
type Sentinel struct {
hub service_hub.ServiceHub
connPool sync.Map
}
func NewSentinel(etcdServers []string) *Sentinel {
return &Sentinel{
hub: service_hub.GetServiceHubProxy(etcdServers, 3, 100),
connPool: sync.Map{},
}
}
func (sentinel *Sentinel) GetGrpcConn(endpoint string) *grpc.ClientConn {
v, exists := sentinel.connPool.Load(endpoint)
if exists {
conn := v.(*grpc.ClientConn)
if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Shutdown {
utils.Log.Printf("连接到 endpoint %s 的状态为 %s", endpoint, conn.GetState().String())
conn.Close()
sentinel.connPool.Delete(endpoint)
} else {
return conn
}
}
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
grpcConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
utils.Log.Printf("连接到 %s 的 gRPC 失败,错误: %s", endpoint, err.Error())
return nil
}
utils.Log.Printf("连接到 %s 的 gRPC 成功", endpoint)
sentinel.connPool.Store(endpoint, grpcConn)
return grpcConn
}
func (sentinel *Sentinel) AddDoc(doc types.Document) (int, error) {
endpoint := sentinel.hub.GetServiceEndpoint(IndexService)
if len(endpoint) == 0 {
return 0, fmt.Errorf("未找到服务 %s 的有效节点", IndexService)
}
grpcConn := sentinel.GetGrpcConn(endpoint)
if grpcConn == nil {
return 0, fmt.Errorf("连接到 %s 的 gRPC 失败", endpoint)
}
client := NewIndexServiceClient(grpcConn)
affected, err := client.AddDoc(context.Background(), &doc)
if err != nil {
return 0, err
}
utils.Log.Printf("成功向 worker %s 添加 %d 个文档", endpoint, affected.Count)
return int(affected.Count), nil
}
func (sentinel *Sentinel) DeleteDoc(docId string) int {
endpoints := sentinel.hub.GetServiceEndpoints(IndexService)
if len(endpoints) == 0 {
return 0
}
var n int32
wg := sync.WaitGroup{}
wg.Add(len(endpoints))
for _, endpoint := range endpoints {
go func(endpoint string) {
defer wg.Done()
grpcConn := sentinel.GetGrpcConn(endpoint)
if grpcConn == nil {
utils.Log.Printf("连接到 %s 的 gRPC 失败", endpoint)
return
}
client := NewIndexServiceClient(grpcConn)
affected, err := client.DeleteDoc(context.Background(), &DocId{docId})
if err != nil {
utils.Log.Printf("从 worker %s 删除文档 %s 失败,错误: %s", endpoint, docId, err)
return
}
if affected.Count > 0 {
atomic.AddInt32(&n, affected.Count)
utils.Log.Printf("从 worker %s 删除文档 %s 成功", endpoint, docId)
}
}(endpoint)
}
wg.Wait()
return int(atomic.LoadInt32(&n))
}
func (sentinel *Sentinel) Search(query *types.TermQuery, onFlag, offFlag uint64, orFlags []uint64) []*types.Document {
endpoints := sentinel.hub.GetServiceEndpoints(IndexService)
if len(endpoints) == 0 {
return nil
}
docs := make([]*types.Document, 0, 1000)
resultChan := make(chan *types.Document, 1000)
var wg sync.WaitGroup
wg.Add(len(endpoints))
for _, endpoint := range endpoints {
go func(endpoint string) {
defer wg.Done()
grpcConn := sentinel.GetGrpcConn(endpoint)
if grpcConn == nil {
utils.Log.Printf("连接到 %s 的 gRPC 连接失败", endpoint)
return
}
client := NewIndexServiceClient(grpcConn)
searchResult, err := client.Search(context.Background(), &SearchRequest{
Query: query,
OnFlag: onFlag,
OffFlag: offFlag,
OrFlags: orFlags,
})
if err != nil {
utils.Log.Printf("向 worker %s 执行查询 %s 失败,错误: %s", endpoint, query, err)
return
}
if len(searchResult.Results) > 0 {
utils.Log.Printf("向 worker %s 执行查询 %s 成功,获取到 %v 个文档", endpoint, query, len(searchResult.Results))
for _, result := range searchResult.Results {
resultChan <- result
}
}
}(endpoint)
}
signalChan := make(chan struct{})
go func() {
for doc := range resultChan {
docs = append(docs, doc)
}
signalChan <- struct{}{}
}()
wg.Wait()
close(resultChan)
<-signalChan
return docs
}
func (sentinel *Sentinel) Count() int {
var n int32
endpoints := sentinel.hub.GetServiceEndpoints(IndexService)
if len(endpoints) == 0 {
return 0
}
var wg sync.WaitGroup
wg.Add(len(endpoints))
for _, endpoint := range endpoints {
go func(endpoint string) {
defer wg.Done()
grpcConn := sentinel.GetGrpcConn(endpoint)
if grpcConn != nil {
client := NewIndexServiceClient(grpcConn)
affected, err := client.Count(context.Background(), new(CountRequest))
if err != nil {
utils.Log.Printf("从 worker %s 获取文档数量失败: %s", endpoint, err)
}
if affected.Count > 0 {
atomic.AddInt32(&n, affected.Count)
utils.Log.Printf("worker %s 共有 %d 个文档", endpoint, affected.Count)
}
}
}(endpoint)
}
wg.Wait()
return int(atomic.LoadInt32(&n))
}
func (sentinel *Sentinel) Close() (err error) {
sentinel.connPool.Range(func(key, value any) bool {
conn := value.(*grpc.ClientConn)
err = conn.Close()
return true
})
sentinel.hub.Close()
return
}