go-zero结合Elasticsearch实现高效数据检索
1. Elasticsearch简单介绍
Elasticsearch(简称 ES) 是一个基于 Lucene 库 构建的 分布式、开源、实时搜索与分析引擎,采用 Apache 2.0 协议。它支持水平扩展,能高效处理大规模数据的存储、搜索、分析和可视化,是 ELK 栈(Elasticsearch、Logstash、Kibana) 和 EFK 栈(Elasticsearch、Fluentd、Kibana) 的核心组件。
1.1核心特点
1. 分布式架构,天生可扩展
- 自动分片(Sharding):数据自动分片到多个节点,单个索引可拆分为多个分片(Shard),支持水平扩展,轻松处理 PB 级数据。
- 高可用性(Replication):每个分片支持多个副本(Replica),节点故障时自动切换,保证服务不中断。
- 去中心化:无主节点依赖(7.x 后引入选举机制),节点自动发现和管理,简化集群运维。
2. 实时搜索与分析
- 近实时(Near Real-Time):文档写入后默认 1 秒内可被搜索到,满足实时数据查询需求。
- 全文搜索能力:基于 Lucene 实现高效的全文索引,支持分词、模糊搜索、短语匹配、高亮显示等。
3. 灵活的数据模型
- JSON 文档存储:数据以 JSON 格式存储,支持动态映射(Dynamic Mapping),无需预定义严格 Schema(但推荐显式定义以优化性能)。
- 多数据类型支持:处理文本、数值、日期、地理位置、二进制等数据类型,支持复杂嵌套结构。
4. 强大的查询与聚合 DSL
- Query DSL(Domain-Specific Language):通过 JSON 格式的查询语言,支持布尔逻辑、范围查询、正则匹配、地理空间查询(如附近搜索)等。
- 聚合分析(Aggregation):支持分组(Terms Aggregation)、统计(Avg/Max/Min)、桶分析(Bucket Aggregation)、管道聚合(Pipeline Aggregation),用于数据统计和可视化。
5. 生态系统丰富
- 数据摄入:支持 Logstash(ETL 管道)、Beats(轻量级数据采集器)、Kafka 等数据源,以及直接通过 REST API 写入。
- 可视化与分析:集成 Kibana 实现仪表盘、图表、日志分析;支持与 Grafana、Tableau 等工具对接。
- 插件扩展:支持分词器(如中文分词 IK Analyzer)、安防(X-Pack Security)、机器学习(X-Pack ML)等插件。
6. 高性能与高可靠
- 倒排索引优化:基于 Lucene 的倒排索引,查询速度随数据量增长保持稳定。
- 分布式协调:通过集群节点自动负载均衡,支持自动故障转移和恢复。
1.2 应用场景
1. 日志管理与分析(最经典场景)
- 场景:收集、存储、分析海量日志(如服务器日志、应用日志、微服务日志)。
- 实现:通过 Logstash/Beats 采集日志,写入 ES,用 Kibana 进行日志检索、统计(如错误日志高频分析)、异常检测。
- 优势:秒级查询亿级日志,支持按时间、服务名、错误码等多维度过滤和聚合。
2. 电商搜索与推荐
- 场景:商品搜索(如淘宝、京东的搜索栏)、智能补全、相关商品推荐。
- 功能:支持商品名称全文搜索、价格范围筛选、品牌 / 类目过滤、销量排序,结合地理位置搜索附近门店。
- 技术点:分词器优化(如中文分词)、拼音搜索(解决输入错误)、搜索相关性排序(BM25 算法)。
3. 企业级搜索(内部知识库、文档检索)
- 场景:企业内部文档搜索(如 Confluence、SharePoint 集成)、代码搜索、法律合同检索。
- 优势:支持多语言文本处理、文档元数据过滤(如作者、创建时间)、权限控制(通过 X-Pack Security)。
4. 实时数据分析与仪表盘
- 场景:业务指标监控(如网站 PV/UV、订单量实时统计)、用户行为分析(漏斗模型、留存率)。
- 实现:将业务数据实时写入 ES,通过 Kibana 生成动态仪表盘,支持下钻分析(Drill Down)和预警通知。
5. 地理空间分析
- 场景:物流轨迹追踪、共享单车位置查询、疫情传播热力图。
- 功能:支持地理坐标(Geo-point)、地理区域(Geo-rectangle)、地理距离(Distance Query)查询,结合聚合生成热力图。
6. 安全与 SIEM(安全信息与事件管理)
- 场景:网络安全日志分析、入侵检测、合规审计。
- 实现:采集防火墙、WAF、IDS 等设备的日志,通过 ES 进行关联分析(如多维度事件关联)、异常流量检测。
7. 监控与运维(APM、基础设施监控)
- 场景:应用性能监控(APM)、服务器指标(CPU / 内存 / 磁盘)监控、微服务链路追踪。
- 工具链:结合 Elastic APM 采集指标数据,用 ES 存储,Kibana 可视化服务调用链、慢查询定位。
2. 环境部署
2.1部署 elasticsearch 和kibana
注意:elasticsearch 和kibana 版本号尽量一致, elasticsearch用8.x版本,kibana 也要用8.x版本,不然无法一起使用
创建 docker-compose.yml
文件:
version: '3'
services:
elasticsearch:
container_name: elasticsearch
image: bitnami/elasticsearch:8.9.0
environment:
- TZ=Asia/Shanghai
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
privileged: true
ports:
- "9200:9200"
restart: always
networks:
- go_zero_net
kibana:
container_name: kibana
image: bitnami/kibana:8.9.0
restart: always
environment:
- TZ=Asia/Shanghai
- I18N_LOCALE=zh-CN
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200 # 修正服务名引用
ports:
- "5601:5601"
depends_on:
- elasticsearch # 显式声明依赖
networks:
- go_zero_net
networks:
go_zero_net:
driver: bridge
volumes:
esdata:
driver: local
环境部署完成后,验证 Elasticsearch 是否启动成功:
curl http://localhost:9200
应该会返回类似以下的 JSON 响应:
{
"name" : "d95962b2abfe",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "TtO-vhldRGmlrZ6U1cIgQw",
"version" : {
"number" : "8.9.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "8aa461beb06aa0417a231c345a1b8c38fb498a0d",
"build_date" : "2023-07-19T14:43:58.555259655Z",
"build_snapshot" : false,
"lucene_version" : "9.7.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
使用浏览器访问 http://localhost:5601/
如果访问成功,说明kibana正常启动
3.项目构建
3.1 编写 API 文件
创建 api/search/search.api
文件,定义搜索服务 API:
syntax = "v1"
type (
SearchRequest {
Keyword string `json:"keyword"`
Page int `json:"page,optional,default=1"`
PageSize int `json:"pageSize,optional,default=10"`
Category string `json:"category,optional"`
MinPrice float64 `json:"minPrice,optional"`
MaxPrice float64 `json:"maxPrice,optional"`
SortField string `json:"sortField,optional"`
SortOrder string `json:"sortOrder,optional,options=asc|desc"`
}
ProductItem {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
Category string `json:"category"`
Tags []string `json:"tags"`
CreatedAt int64 `json:"createdAt"`
}
SearchResponse {
Total int64 `json:"total"`
Products []ProductItem `json:"products"`
}
IndexProductRequest {
Product ProductItem `json:"product"`
}
IndexProductResponse {
Success bool `json:"success"`
Message string `json:"message,optional"`
}
deleteProductRequest {
ID string `json:"id"`
}
)
service search-api {
@handler SearchProducts
post /api/search/products (SearchRequest) returns (SearchResponse)
@handler IndexProduct
post /api/products/index (IndexProductRequest) returns (IndexProductResponse)
@handler DeleteProduct
post /api/delete/products (deleteProductRequest) returns (IndexProductResponse)
}
这个 API 定义文件包含了三个主要 API :
- 搜索商品
- 索引(创建/更新)商品
- 删除商品
现在,切换到项目目录,使用 goctl 工具根据 API 定义生成代码:
# 生成代码
goctl api go -api api/search/search.api -dir .
3.2 封装 Elasticsearch 服务
接下来,我们需要将 Elasticsearch 集成到生成的代码中。
安装 Elasticsearch Go 客户端,因为我们环境部署的是8.X版本,所以这里go-elasticsearch也选择v8版本
go get github.com/elastic/go-elasticsearch/v8
添加 Elasticsearch 配置
修改 internal/config/config.go
文件,添加 Elasticsearch 配置:
package config
import (
"github.com/zeromicro/go-zero/rest"
)
type Config struct {
rest.RestConf
Elasticsearch struct {
Addresses []string
Username string
Password string
}
}
同时,修改 etc/search-api.yaml
配置文件,添加 Elasticsearch 配置:
Name: search-api
Host: 0.0.0.0
Port: 8888
Elasticsearch:
Addresses:
- http://localhost:9200
Username: ""
Password: ""
创建 internal/pkg/es/es.go
文件,实现 Elasticsearch 客户端的封装:
package es
import (
"context"
"encoding/json"
"errors"
"log"
"strings"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
// ElasticsearchClient 封装ES客户端
type ElasticsearchClient struct {
client *elasticsearch.Client
}
// NewElasticsearchClient 创建新的ES客户端
func NewElasticsearchClient(addresses []string, username, password string) (*ElasticsearchClient, error) {
cfg := elasticsearch.Config{
Addresses: addresses,
Username: username,
Password: password,
}
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, err
}
// 测试连接
res, err := client.Info()
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.IsError() {
return nil, errors.New("Elasticsearch connection failed")
}
return &ElasticsearchClient{
client: client,
}, nil
}
// CreateIndex 创建索引
func (e *ElasticsearchClient) CreateIndex(index string, mapping string) error {
res, err := e.client.Indices.Create(
index,
e.client.Indices.Create.WithBody(strings.NewReader(mapping)),
)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return errors.New("failed to create index")
}
return nil
}
// IndexExists 检查索引是否存在
func (e *ElasticsearchClient) IndexExists(index string) (bool, error) {
res, err := e.client.Indices.Exists([]string{index})
if err != nil {
return false, err
}
defer res.Body.Close()
return res.StatusCode == 200, nil
}
// IndexDocument 索引单个文档
func (e *ElasticsearchClient) IndexDocument(index, id string, document interface{}) error {
data, err := json.Marshal(document)
if err != nil {
return err
}
req := esapi.IndexRequest{
Index: index,
DocumentID: id,
Body: strings.NewReader(string(data)),
Refresh: "true",
}
res, err := req.Do(context.Background(), e.client)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return errors.New("failed to index document")
}
return nil
}
// Search 执行搜索请求
func (e *ElasticsearchClient) Search(index string, query map[string]interface{}) (map[string]interface{}, error) {
var buf strings.Builder
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return nil, err
}
res, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(index),
e.client.Search.WithBody(strings.NewReader(buf.String())),
e.client.Search.WithTrackTotalHits(true),
)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
return nil, err
}
log.Printf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"])
return nil, errors.New("search error")
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, err
}
return r, nil
}
// DeleteDocument 删除文档
func (e *ElasticsearchClient) DeleteDocument(index, id string) error {
req := esapi.DeleteRequest{
Index: index,
DocumentID: id,
Refresh: "true",
}
res, err := req.Do(context.Background(), e.client)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return errors.New("failed to delete document")
}
return nil
}
// BulkIndex 批量索引文档
func (e *ElasticsearchClient) BulkIndex(index string, documents []map[string]interface{}) error {
var buf strings.Builder
for _, doc := range documents {
// 获取文档ID
id, ok := doc["id"].(string)
if !ok {
id = ""
}
// 从文档中删除ID字段,避免重复
delete(doc, "id")
// 创建索引操作元数据
meta := map[string]interface{}{
"index": map[string]interface{}{
"_index": index,
"_id": id,
},
}
// 将元数据写入缓冲区
if err := json.NewEncoder(&buf).Encode(meta); err != nil {
return err
}
// 将文档数据写入缓冲区
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
return err
}
}
// 执行批量请求
res, err := e.client.Bulk(strings.NewReader(buf.String()), e.client.Bulk.WithIndex(index), e.client.Bulk.WithRefresh("true"))
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return errors.New("failed to bulk index documents")
}
return nil
}
3.3定义商品索引映射
创建 internal/model/product.go
文件,定义商品索引映射:
package model
const (
ProductIndex = "products"
)
// ProductIndexMapping 商品索引映射
var ProductIndexMapping = `{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"analyzer": {
"text_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding"]
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "text_analyzer",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"description": {
"type": "text",
"analyzer": "text_analyzer"
},
"price": {
"type": "double"
},
"category": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"createdAt": {
"type": "date",
"format": "epoch_millis"
}
}
}
}`
// Product 商品模型
type Product struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
Category string `json:"category"`
Tags []string `json:"tags"`
CreatedAt int64 `json:"createdAt"`
}
Mapping介绍
- Settings(索引设置)
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": { ... }
}
分片设置:
number_of_shards
: 1 个主分片(适用于小规模或开发环境)。number_of_replicas
: 0 个副本(无冗余,生产环境建议至少设为 1)。
分析器配置:
"analysis": { "analyzer": { "text_analyzer": { "type": "custom", "tokenizer": "standard", "filter": ["lowercase", "asciifolding"] } } }
- 文本分析器(
text_analyzer
):- 使用
standard
分词器(按词边界分词,适合大多数语言)。 - 应用
lowercase
过滤器(转为小写)和asciifolding
过滤器(将非 ASCII 字符转为等效 ASCII,如é
→e
)。
- 使用
- 文本分析器(
- Mappings(字段映射)
"mappings": {
"properties": { ... }
}
- 字段类型与用途:
字段名 | 类型 | 用途与特点 |
---|---|---|
id |
keyword |
精确匹配(如商品 ID),不分析,用于过滤、排序或聚合。 |
name |
text |
全文搜索字段,使用 text_analyzer 处理(支持分词、小写和 ASCII 折叠)。 |
.keyword |
子字段,保留原始值,用于精确匹配(如聚合品牌名)。 | |
description |
text |
长文本描述,同样使用 text_analyzer 进行全文搜索。 |
price |
double |
浮点型数值,支持范围查询(如 price > 100 )。 |
category |
keyword |
分类标签(如 “electronics”),用于过滤和聚合。 |
tags |
keyword |
标签数组(如 ["popular", "new"] ),支持多值精确匹配。 |
createdAt |
date |
日期类型(Unix 毫秒时间戳),支持范围查询(如按时间筛选)。 |
3.4 扩展 ServiceContext
修改 internal/svc/servicecontext.go
文件,添加 Elasticsearch 客户端:
package svc
import (
"go-zero-es-demo/internal/config"
"go-zero-es-demo/internal/pkg/es"
)
type ServiceContext struct {
Config config.Config
EsClient *es.ElasticsearchClient
}
func NewServiceContext(c config.Config) *ServiceContext {
esClient, err := es.NewElasticsearchClient(
c.Elasticsearch.Addresses,
c.Elasticsearch.Username,
c.Elasticsearch.Password,
)
if err != nil {
panic(err)
}
return &ServiceContext{
Config: c,
EsClient: esClient,
}
}
//定义索引初始化函数
func InitElasticsearch(client *es.ElasticsearchClient) error {
// 检查商品索引是否存在
exists, err := client.IndexExists(model.ProductIndex)
if err != nil {
return err
}
// 如果索引不存在,则创建
if !exists {
err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
if err != nil {
return err
}
}
return nil
}
3.5 实现搜索逻辑
修改 internal/logic/searchproductslogic.go
文件,实现商品搜索功能:
package logic
import (
"context"
"go-zero-es-demo/internal/model"
"go-zero-es-demo/internal/svc"
"go-zero-es-demo/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type SearchProductsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewSearchProductsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SearchProductsLogic {
return &SearchProductsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *SearchProductsLogic) SearchProducts(req *types.SearchRequest) (resp *types.SearchResponse, err error) {
// 构建搜索查询
from := (req.Page - 1) * req.PageSize
size := req.PageSize
// 基本查询结构
query := map[string]interface{}{
"from": from,
"size": size,
}
// 构建搜索条件
boolQuery := map[string]interface{}{}
mustClauses := []map[string]interface{}{}
// 关键词搜索
if req.Keyword != "" {
mustClauses = append(mustClauses, map[string]interface{}{
"multi_match": map[string]interface{}{
"query": req.Keyword,
"fields": []string{"name^3", "description", "tags"},
},
})
}
// 分类过滤
if req.Category != "" {
mustClauses = append(mustClauses, map[string]interface{}{
"term": map[string]interface{}{
"category": req.Category,
},
})
}
// 价格范围过滤
if req.MinPrice > 0 || req.MaxPrice > 0 {
rangeQuery := map[string]interface{}{}
if req.MinPrice > 0 {
rangeQuery["gte"] = req.MinPrice
}
if req.MaxPrice > 0 {
rangeQuery["lte"] = req.MaxPrice
}
mustClauses = append(mustClauses, map[string]interface{}{
"range": map[string]interface{}{
"price": rangeQuery,
},
})
}
// 添加bool查询
if len(mustClauses) > 0 {
boolQuery["must"] = mustClauses
query["query"] = map[string]interface{}{
"bool": boolQuery,
}
} else {
query["query"] = map[string]interface{}{
"match_all": map[string]interface{}{},
}
}
// 排序
if req.SortField != "" {
order := "asc"
if req.SortOrder == "desc" {
order = "desc"
}
query["sort"] = []map[string]interface{}{
{
req.SortField: map[string]interface{}{
"order": order,
},
},
}
}
// 执行搜索请求
result, err := l.svcCtx.EsClient.Search(model.ProductIndex, query)
if err != nil {
return nil, err
}
// 解析搜索结果
total := int64(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
hits := result["hits"].(map[string]interface{})["hits"].([]interface{})
products := make([]types.ProductItem, 0, len(hits))
for _, hit := range hits {
source := hit.(map[string]interface{})["_source"].(map[string]interface{})
// 提取标签数组
tags := []string{}
if tagsRaw, ok := source["tags"].([]interface{}); ok {
for _, tag := range tagsRaw {
if tagStr, ok := tag.(string); ok {
tags = append(tags, tagStr)
}
}
}
product := types.ProductItem{
ID: source["id"].(string),
Name: source["name"].(string),
Description: source["description"].(string),
Price: source["price"].(float64),
Category: source["category"].(string),
Tags: tags,
CreatedAt: int64(source["createdAt"].(float64)),
}
products = append(products, product)
}
return &types.SearchResponse{
Total: total,
Products: products,
}, nil
}
3.6 实现索引和删除逻辑
修改 internal/logic/indexproductlogic.go
文件,实现商品索引功能:
package logic
import (
"context"
"time"
"go-zero-es-demo/internal/model"
"go-zero-es-demo/internal/svc"
"go-zero-es-demo/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type IndexProductLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewIndexProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *IndexProductLogic {
return &IndexProductLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *IndexProductLogic) IndexProduct(req *types.IndexProductRequest) (resp *types.IndexProductResponse, err error) {
// 如果未提供创建时间,则使用当前时间
if req.Product.CreatedAt == 0 {
req.Product.CreatedAt = time.Now().UnixMilli()
}
// 索引文档
err = l.svcCtx.EsClient.IndexDocument(model.ProductIndex, req.Product.ID, req.Product)
if err != nil {
return &types.IndexProductResponse{
Success: false,
Message: "Failed to index product: " + err.Error(),
}, nil
}
return &types.IndexProductResponse{
Success: true,
Message: "Product indexed successfully",
}, nil
}
修改 internal/logic/deleteproductlogic.go
文件,实现商品删除功能:
package logic
import (
"context"
"go-zero-es-demo/internal/model"
"go-zero-es-demo/internal/svc"
"go-zero-es-demo/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type DeleteProductLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewDeleteProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteProductLogic {
return &DeleteProductLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *DeleteProductLogic) DeleteProduct(req *types.DeleteProductRequest) (resp *types.IndexProductResponse, err error) {
// todo: add your logic here and delete this line
// 删除文档
err = l.svcCtx.EsClient.DeleteDocument(model.ProductIndex, req.ID)
if err != nil {
return &types.IndexProductResponse{
Success: false,
Message: "Failed to delete product: " + err.Error(),
}, nil
}
return &types.IndexProductResponse{
Success: true,
Message: "Product deleted successfully",
}, nil
}
最后,修改主程序入口 search.go
文件,添加 Elasticsearch 索引初始化逻辑:
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
server := rest.MustNewServer(c.RestConf)
defer server.Stop()
ctx := svc.NewServiceContext(c)
// 初始化 Elasticsearch 索引
if err := svc.InitElasticsearch(ctx.EsClient); err != nil {
panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))
}
handler.RegisterHandlers(server, ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()
}
4. 项目测试
现在开始启动项目来测试
go run search.go
测试添加索引,我这里只粘贴了一个,其他数据自行补充:
curl --location '127.0.0.1:8888/api/products/index' \
--header 'Content-Type: application/json' \
--data '{
"product":
{
"id": "1",
"name": "陶瓷马克杯",
"description": "简约北欧风格陶瓷马克杯,采用高温烧制,釉面光滑易清洗,适合日常咖啡或茶饮。",
"price": 39.9,
"category": "家居",
"tags": ["厨房用品", "创意礼品", "陶瓷制品"],
"createdAt": 1689321456
}
}
运行结果:
{
"success": true,
"message": "Product indexed successfully"
}
测试索引搜索
curl --location '127.0.0.1:8888/api/search/products' \
--header 'Content-Type: application/json' \
--data '{
"keyword" :"面包"
}
'
得到以下类似的结果:
{
"total": 4,
"products": [
{
"id": "5",
"name": "全麦面包",
"description": "无添加全麦面包,富含膳食纤维和蛋白质,口感松软,适合早餐搭配果酱或奶酪食用。",
"price": 19.8,
"category": "食品",
"tags": [
"健康食品",
"早餐必备",
"全麦谷物"
],
"createdAt": 1690001234
},
{
"id": "4",
"name": "保湿面霜",
"description": "深层保湿面霜,含透明质酸和胶原蛋白成分,有效改善干燥肌肤,适合所有肤质日常护理。",
"price": 129,
"category": "美妆",
"tags": [
"护肤用品",
"保湿补水",
"温和配方"
],
"createdAt": 1687654321
}
]
}
测试索引删除
curl --location '127.0.0.1:8888/api/delete/products' \
--header 'Content-Type: application/json' \
--data '
{
"id" :"666"
}'
5 功能拓展
完成基本功能后,我们可以添加一些高级功能,使搜索服务更加强大。
6.1 聚合查询
首先,在 API 定义文件 api/search/search.api
中添加新的类型和接口:
type (
CategoryStat {
Category string `json:"category"`
Count int64 `json:"count"`
AvgPrice float64 `json:"avgPrice"`
MaxPrice float64 `json:"maxPrice"`
MinPrice float64 `json:"minPrice"`
}
CategoryStatsResponse {
CategoryStats []CategoryStat `json:"categoryStats"`
}
)
service search-api {
// ...existing endpoints...
@handler GetCategoryStats
get /api/stats/categories returns (CategoryStatsResponse)
}
使用 goctl 更新生成的代码:
goctl api go -api api/search/search.api -dir .
在 internal/pkg/es/es.go
中添加聚合查询方法:
// Aggregate 执行聚合查询
func (e *ElasticsearchClient) Aggregate(index string, query map[string]interface{}) (map[string]interface{}, error) {
var buf strings.Builder
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return nil, err
}
res, err := e.client.Search(
e.client.Search.WithContext(context.Background()),
e.client.Search.WithIndex(index),
e.client.Search.WithBody(strings.NewReader(buf.String())),
e.client.Search.WithSize(0), // 聚合查询通常不需要返回文档
)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
return nil, err
}
return nil, errors.New("aggregate error")
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, err
}
return r, nil
}
实现统计逻辑,在 internal/logic/getcategorystatslogic.go
文件中:
package logic
import (
"context"
"go-zero-es-demo/internal/model"
"go-zero-es-demo/internal/svc"
"go-zero-es-demo/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetCategoryStatsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetCategoryStatsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCategoryStatsLogic {
return &GetCategoryStatsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetCategoryStatsLogic) GetCategoryStats() (resp *types.CategoryStatsResponse, err error) {
// 构建聚合查询
query := map[string]interface{}{
"aggs": map[string]interface{}{
"categories": map[string]interface{}{
"terms": map[string]interface{}{
"field": "category",
"size": 20,
},
"aggs": map[string]interface{}{
"avg_price": map[string]interface{}{
"avg": map[string]interface{}{
"field": "price",
},
},
"max_price": map[string]interface{}{
"max": map[string]interface{}{
"field": "price",
},
},
"min_price": map[string]interface{}{
"min": map[string]interface{}{
"field": "price",
},
},
},
},
},
}
// 执行聚合查询
result, err := l.svcCtx.EsClient.Aggregate(model.ProductIndex, query)
if err != nil {
return nil, err
}
// 解析结果
aggregations := result["aggregations"].(map[string]interface{})
categories := aggregations["categories"].(map[string]interface{})
buckets := categories["buckets"].([]interface{})
stats := make([]types.CategoryStat, 0, len(buckets))
for _, bucket := range buckets {
b := bucket.(map[string]interface{})
key := b["key"].(string)
docCount := int64(b["doc_count"].(float64))
avgPrice := b["avg_price"].(map[string]interface{})["value"].(float64)
maxPrice := b["max_price"].(map[string]interface{})["value"].(float64)
minPrice := b["min_price"].(map[string]interface{})["value"].(float64)
stats = append(stats, types.CategoryStat{
Category: key,
Count: docCount,
AvgPrice: avgPrice,
MaxPrice: maxPrice,
MinPrice: minPrice,
})
}
return &types.CategoryStatsResponse{
CategoryStats: stats,
}, nil
}
运行测试
curl --location '127.0.0.1:8888/api/stats/categories'
会得到如下的类似的结果:
{
"categoryStats": [
{
"category": "家居",
"count": 2,
"avgPrice": 26.4,
"maxPrice": 39.9,
"minPrice": 12.9
},
{
"category": "服饰",
"count": 1,
"avgPrice": 59.99,
"maxPrice": 59.99,
"minPrice": 59.99
},
{
"category": "电子",
"count": 1,
"avgPrice": 89.5,
"maxPrice": 89.5,
"minPrice": 89.5
},
{
"category": "美妆",
"count": 1,
"avgPrice": 129,
"maxPrice": 129,
"minPrice": 129
},
{
"category": "运动",
"count": 1,
"avgPrice": 79.5,
"maxPrice": 79.5,
"minPrice": 79.5
},
{
"category": "食品",
"count": 1,
"avgPrice": 19.8,
"maxPrice": 19.8,
"minPrice": 19.8
}
]
}
6.2 同义词搜索
要启用同义词搜索,需要修改索引映射。首先,修改 internal/model/product.go
文件中的索引映射:
// 修改 ProductIndexMapping 变量
var ProductIndexMapping = `{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": {
"filter": {
"synonym_filter": {
"type": "synonym",
"synonyms": [
"音响, 音箱, 音像",
"衣服, 服装, 服饰",
"首饰, 手饰, 饰品"
]
}
},
"analyzer": {
"text_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding", "synonym_filter"]
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "text_analyzer",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"description": {
"type": "text",
"analyzer": "text_analyzer"
},
"price": {
"type": "double"
},
"category": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"createdAt": {
"type": "date",
"format": "epoch_millis"
}
}
}
}`
增加ynonym_filter
: 同义词过滤器,定义了3组同义词:
"音响, 音箱, 音像",
"衣服, 服装, 服饰",
"首饰, 手饰, 饰品"
当修改索引映射后,需要重建索引。因此在 internal/pkg/es/es.go
中添加删除索引的方法:
// DeleteIndex 删除索引
func (e *ElasticsearchClient) DeleteIndex(index string) error {
res, err := e.client.Indices.Delete([]string{index})
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return errors.New("failed to delete index")
}
return nil
}
在 /svc/servicecontext.go
中添加更新映射的逻辑:
// UpdateElasticsearchIndex 更新索引映射(需要重建索引)
func UpdateElasticsearchIndex(client *es.ElasticsearchClient) error {
// 检查索引是否存在
exists, err := client.IndexExists(model.ProductIndex)
if err != nil {
return err
}
// 如果索引已存在,则删除并重建
if exists {
// 获取原索引的所有文档
query := map[string]interface{}{
"query": map[string]interface{}{
"match_all": map[string]interface{}{},
},
"size": 10000, // 注意:实际应用中应使用滚动API处理大量数据
}
result, err := client.Search(model.ProductIndex, query)
if err != nil {
return err
}
// 提取文档
hits := result["hits"].(map[string]interface{})["hits"].([]interface{})
documents := make([]map[string]interface{}, 0, len(hits))
for _, hit := range hits {
hitMap := hit.(map[string]interface{})
source := hitMap["_source"].(map[string]interface{})
id := hitMap["_id"].(string)
// 确保文档有ID
source["id"] = id
documents = append(documents, source)
}
// 删除索引
err = client.DeleteIndex(model.ProductIndex)
if err != nil {
return err
}
// 创建新索引
err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
if err != nil {
return err
}
// 如果有文档,重新索引它们
if len(documents) > 0 {
err = client.BulkIndex(model.ProductIndex, documents)
if err != nil {
return err
}
}
return nil
}
// 如果索引不存在,则创建
return client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
}
修改mian函数:
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
server := rest.MustNewServer(c.RestConf)
defer server.Stop()
ctx := svc.NewServiceContext(c)
//更新并初始化索引
err := svc.UpdateElasticsearchIndex(ctx.EsClient)
if err != nil {
return
}
/*
// 初始化 Elasticsearch 索引
if err := svc.InitElasticsearch(ctx.EsClient); err != nil {
panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))
}
*/
handler.RegisterHandlers(server, ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()
}
为了方便测试,我们增加几个新的数据:
[
{
"id": "1",
"name": "豪华音像",
"description": "",
"price": 999.99,
"category": "数码",
"tags": ["smartphone", "popular", "new"],
"createdAt": 1715788800000 // 2025-05-14
},
{
"id": "2",
"name": "智能音箱",
"description": "续航长达18小时",
"price": 1199.99,
"category": "数码",
"tags": ["laptop", "apple", "portable"],
"createdAt": 1715702400000 // 2025-05-13
},
{
"id": "3",
"name": "轻便音响",
"description": "",
"price": 1799.99,
"category": "数码",
"tags": ["tv", "4k", "oled"],
"createdAt": 1715616000000 // 2025-05-12
}
]