go-zero(十八)结合Elasticsearch实现高效数据检索

发布于:2025-05-15 ⋅ 阅读:(8) ⋅ 点赞:(0)

go-zero结合Elasticsearch实现高效数据检索

1. Elasticsearch简单介绍

Elasticsearch(简称 ES) 是一个基于 Lucene 库 构建的 分布式、开源、实时搜索与分析引擎,采用 Apache 2.0 协议。它支持水平扩展,能高效处理大规模数据的存储、搜索、分析和可视化,是 ELK 栈(Elasticsearch、Logstash、Kibana)EFK 栈(Elasticsearch、Fluentd、Kibana) 的核心组件。

1.1核心特点

1. 分布式架构,天生可扩展
  • 自动分片(Sharding):数据自动分片到多个节点,单个索引可拆分为多个分片(Shard),支持水平扩展,轻松处理 PB 级数据。
  • 高可用性(Replication):每个分片支持多个副本(Replica),节点故障时自动切换,保证服务不中断。
  • 去中心化:无主节点依赖(7.x 后引入选举机制),节点自动发现和管理,简化集群运维。
2. 实时搜索与分析
  • 近实时(Near Real-Time):文档写入后默认 1 秒内可被搜索到,满足实时数据查询需求。
  • 全文搜索能力:基于 Lucene 实现高效的全文索引,支持分词、模糊搜索、短语匹配、高亮显示等。
3. 灵活的数据模型
  • JSON 文档存储:数据以 JSON 格式存储,支持动态映射(Dynamic Mapping),无需预定义严格 Schema(但推荐显式定义以优化性能)。
  • 多数据类型支持:处理文本、数值、日期、地理位置、二进制等数据类型,支持复杂嵌套结构。
4. 强大的查询与聚合 DSL
  • Query DSL(Domain-Specific Language):通过 JSON 格式的查询语言,支持布尔逻辑、范围查询、正则匹配、地理空间查询(如附近搜索)等。
  • 聚合分析(Aggregation):支持分组(Terms Aggregation)、统计(Avg/Max/Min)、桶分析(Bucket Aggregation)、管道聚合(Pipeline Aggregation),用于数据统计和可视化。
5. 生态系统丰富
  • 数据摄入:支持 Logstash(ETL 管道)、Beats(轻量级数据采集器)、Kafka 等数据源,以及直接通过 REST API 写入。
  • 可视化与分析:集成 Kibana 实现仪表盘、图表、日志分析;支持与 Grafana、Tableau 等工具对接。
  • 插件扩展:支持分词器(如中文分词 IK Analyzer)、安防(X-Pack Security)、机器学习(X-Pack ML)等插件。
6. 高性能与高可靠
  • 倒排索引优化:基于 Lucene 的倒排索引,查询速度随数据量增长保持稳定。
  • 分布式协调:通过集群节点自动负载均衡,支持自动故障转移和恢复。

1.2 应用场景

1. 日志管理与分析(最经典场景)
  • 场景:收集、存储、分析海量日志(如服务器日志、应用日志、微服务日志)。
  • 实现:通过 Logstash/Beats 采集日志,写入 ES,用 Kibana 进行日志检索、统计(如错误日志高频分析)、异常检测。
  • 优势:秒级查询亿级日志,支持按时间、服务名、错误码等多维度过滤和聚合。
2. 电商搜索与推荐
  • 场景:商品搜索(如淘宝、京东的搜索栏)、智能补全、相关商品推荐。
  • 功能:支持商品名称全文搜索、价格范围筛选、品牌 / 类目过滤、销量排序,结合地理位置搜索附近门店。
  • 技术点:分词器优化(如中文分词)、拼音搜索(解决输入错误)、搜索相关性排序(BM25 算法)。
3. 企业级搜索(内部知识库、文档检索)
  • 场景:企业内部文档搜索(如 Confluence、SharePoint 集成)、代码搜索、法律合同检索。
  • 优势:支持多语言文本处理、文档元数据过滤(如作者、创建时间)、权限控制(通过 X-Pack Security)。
4. 实时数据分析与仪表盘
  • 场景:业务指标监控(如网站 PV/UV、订单量实时统计)、用户行为分析(漏斗模型、留存率)。
  • 实现:将业务数据实时写入 ES,通过 Kibana 生成动态仪表盘,支持下钻分析(Drill Down)和预警通知。
5. 地理空间分析
  • 场景:物流轨迹追踪、共享单车位置查询、疫情传播热力图。
  • 功能:支持地理坐标(Geo-point)、地理区域(Geo-rectangle)、地理距离(Distance Query)查询,结合聚合生成热力图。
6. 安全与 SIEM(安全信息与事件管理)
  • 场景:网络安全日志分析、入侵检测、合规审计。
  • 实现:采集防火墙、WAF、IDS 等设备的日志,通过 ES 进行关联分析(如多维度事件关联)、异常流量检测。
7. 监控与运维(APM、基础设施监控)
  • 场景:应用性能监控(APM)、服务器指标(CPU / 内存 / 磁盘)监控、微服务链路追踪。
  • 工具链:结合 Elastic APM 采集指标数据,用 ES 存储,Kibana 可视化服务调用链、慢查询定位。

2. 环境部署

2.1部署 elasticsearch 和kibana

注意:elasticsearch 和kibana 版本号尽量一致, elasticsearch用8.x版本,kibana 也要用8.x版本,不然无法一起使用

创建 docker-compose.yml 文件:

version: '3'
services:
    elasticsearch:
      container_name: elasticsearch
      image: bitnami/elasticsearch:8.9.0
      environment:
        - TZ=Asia/Shanghai
        - discovery.type=single-node
        - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      privileged: true
      ports:
        - "9200:9200"
      restart: always
      networks:
        - go_zero_net

  kibana:
    container_name: kibana
    image: bitnami/kibana:8.9.0
    restart: always
    environment:
      - TZ=Asia/Shanghai
      - I18N_LOCALE=zh-CN
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200  # 修正服务名引用
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch  # 显式声明依赖
    networks:
      - go_zero_net


networks:
  go_zero_net:
    driver: bridge

volumes:
  esdata:
    driver: local

环境部署完成后,验证 Elasticsearch 是否启动成功:

curl http://localhost:9200

应该会返回类似以下的 JSON 响应:

{
  "name" : "d95962b2abfe",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "TtO-vhldRGmlrZ6U1cIgQw",
  "version" : {
    "number" : "8.9.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "8aa461beb06aa0417a231c345a1b8c38fb498a0d",
    "build_date" : "2023-07-19T14:43:58.555259655Z",
    "build_snapshot" : false,
    "lucene_version" : "9.7.0",
    "minimum_wire_compatibility_version" : "7.17.0",
    "minimum_index_compatibility_version" : "7.0.0"
  },
  "tagline" : "You Know, for Search"
}

使用浏览器访问 http://localhost:5601/ 如果访问成功,说明kibana正常启动

在这里插入图片描述

3.项目构建

3.1 编写 API 文件

创建 api/search/search.api 文件,定义搜索服务 API:

syntax = "v1"

type (
	SearchRequest {
		Keyword   string  `json:"keyword"`
		Page      int     `json:"page,optional,default=1"`
		PageSize  int     `json:"pageSize,optional,default=10"`
		Category  string  `json:"category,optional"`
		MinPrice  float64 `json:"minPrice,optional"`
		MaxPrice  float64 `json:"maxPrice,optional"`
		SortField string  `json:"sortField,optional"`
		SortOrder string  `json:"sortOrder,optional,options=asc|desc"`
	}
	ProductItem {
		ID          string   `json:"id"`
		Name        string   `json:"name"`
		Description string   `json:"description"`
		Price       float64  `json:"price"`
		Category    string   `json:"category"`
		Tags        []string `json:"tags"`
		CreatedAt   int64    `json:"createdAt"`
	}
	SearchResponse {
		Total    int64         `json:"total"`
		Products []ProductItem `json:"products"`
	}
	IndexProductRequest {
		Product ProductItem `json:"product"`
	}
	IndexProductResponse {
		Success bool   `json:"success"`
		Message string `json:"message,optional"`
	}
	deleteProductRequest {
		ID string `json:"id"`
	}
)

service search-api {
	@handler SearchProducts
	post /api/search/products (SearchRequest) returns (SearchResponse)

	@handler IndexProduct
	post /api/products/index (IndexProductRequest) returns (IndexProductResponse)

	@handler DeleteProduct
	post /api/delete/products (deleteProductRequest) returns (IndexProductResponse)
}

这个 API 定义文件包含了三个主要 API :

  • 搜索商品
  • 索引(创建/更新)商品
  • 删除商品

现在,切换到项目目录,使用 goctl 工具根据 API 定义生成代码:

# 生成代码
goctl api go -api api/search/search.api -dir .

3.2 封装 Elasticsearch 服务

接下来,我们需要将 Elasticsearch 集成到生成的代码中。

安装 Elasticsearch Go 客户端,因为我们环境部署的是8.X版本,所以这里go-elasticsearch也选择v8版本

go get github.com/elastic/go-elasticsearch/v8

添加 Elasticsearch 配置

修改 internal/config/config.go 文件,添加 Elasticsearch 配置:

package config

import (
	"github.com/zeromicro/go-zero/rest"
)

type Config struct {
	rest.RestConf
	Elasticsearch struct {
		Addresses []string
		Username  string
		Password  string
	}
}

同时,修改 etc/search-api.yaml 配置文件,添加 Elasticsearch 配置:

Name: search-api
Host: 0.0.0.0
Port: 8888

Elasticsearch:
  Addresses:
    - http://localhost:9200
  Username: ""
  Password: ""

创建 internal/pkg/es/es.go 文件,实现 Elasticsearch 客户端的封装:

package es

import (
	"context"
	"encoding/json"
	"errors"
	"log"
	"strings"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
)

// ElasticsearchClient 封装ES客户端
type ElasticsearchClient struct {
	client *elasticsearch.Client
}

// NewElasticsearchClient 创建新的ES客户端
func NewElasticsearchClient(addresses []string, username, password string) (*ElasticsearchClient, error) {
	cfg := elasticsearch.Config{
		Addresses: addresses,
		Username:  username,
		Password:  password,
	}

	client, err := elasticsearch.NewClient(cfg)
	if err != nil {
		return nil, err
	}

	// 测试连接
	res, err := client.Info()
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.IsError() {
		return nil, errors.New("Elasticsearch connection failed")
	}

	return &ElasticsearchClient{
		client: client,
	}, nil
}

// CreateIndex 创建索引
func (e *ElasticsearchClient) CreateIndex(index string, mapping string) error {
	res, err := e.client.Indices.Create(
		index,
		e.client.Indices.Create.WithBody(strings.NewReader(mapping)),
	)
	if err != nil {
		return err
	}
	defer res.Body.Close()

	if res.IsError() {
		return errors.New("failed to create index")
	}

	return nil
}

// IndexExists 检查索引是否存在
func (e *ElasticsearchClient) IndexExists(index string) (bool, error) {
	res, err := e.client.Indices.Exists([]string{index})
	if err != nil {
		return false, err
	}
	defer res.Body.Close()

	return res.StatusCode == 200, nil
}

// IndexDocument 索引单个文档
func (e *ElasticsearchClient) IndexDocument(index, id string, document interface{}) error {
	data, err := json.Marshal(document)
	if err != nil {
		return err
	}

	req := esapi.IndexRequest{
		Index:      index,
		DocumentID: id,
		Body:       strings.NewReader(string(data)),
		Refresh:    "true",
	}

	res, err := req.Do(context.Background(), e.client)
	if err != nil {
		return err
	}
	defer res.Body.Close()

	if res.IsError() {
		return errors.New("failed to index document")
	}

	return nil
}

// Search 执行搜索请求
func (e *ElasticsearchClient) Search(index string, query map[string]interface{}) (map[string]interface{}, error) {
	var buf strings.Builder
	if err := json.NewEncoder(&buf).Encode(query); err != nil {
		return nil, err
	}

	res, err := e.client.Search(
		e.client.Search.WithContext(context.Background()),
		e.client.Search.WithIndex(index),
		e.client.Search.WithBody(strings.NewReader(buf.String())),
		e.client.Search.WithTrackTotalHits(true),
	)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.IsError() {
		var e map[string]interface{}
		if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
			return nil, err
		}
		log.Printf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"])
		return nil, errors.New("search error")
	}

	var r map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
		return nil, err
	}

	return r, nil
}

// DeleteDocument 删除文档
func (e *ElasticsearchClient) DeleteDocument(index, id string) error {
	req := esapi.DeleteRequest{
		Index:      index,
		DocumentID: id,
		Refresh:    "true",
	}

	res, err := req.Do(context.Background(), e.client)
	if err != nil {
		return err
	}
	defer res.Body.Close()

	if res.IsError() {
		return errors.New("failed to delete document")
	}

	return nil
}

// BulkIndex 批量索引文档
func (e *ElasticsearchClient) BulkIndex(index string, documents []map[string]interface{}) error {
	var buf strings.Builder

	for _, doc := range documents {
		// 获取文档ID
		id, ok := doc["id"].(string)
		if !ok {
			id = ""
		}
		// 从文档中删除ID字段,避免重复
		delete(doc, "id")

		// 创建索引操作元数据
		meta := map[string]interface{}{
			"index": map[string]interface{}{
				"_index": index,
				"_id":    id,
			},
		}

		// 将元数据写入缓冲区
		if err := json.NewEncoder(&buf).Encode(meta); err != nil {
			return err
		}
		// 将文档数据写入缓冲区
		if err := json.NewEncoder(&buf).Encode(doc); err != nil {
			return err
		}
	}

	// 执行批量请求
	res, err := e.client.Bulk(strings.NewReader(buf.String()), e.client.Bulk.WithIndex(index), e.client.Bulk.WithRefresh("true"))
	if err != nil {
		return err
	}
	defer res.Body.Close()

	if res.IsError() {
		return errors.New("failed to bulk index documents")
	}

	return nil
}

3.3定义商品索引映射

创建 internal/model/product.go 文件,定义商品索引映射:

package model

const (
	ProductIndex = "products"
)

// ProductIndexMapping 商品索引映射
var ProductIndexMapping = `{
	"settings": {
		"number_of_shards": 1,
		"number_of_replicas": 0,
		"analysis": {
			"analyzer": {
				"text_analyzer": {
					"type": "custom",
					"tokenizer": "standard",
					"filter": ["lowercase", "asciifolding"]
				}
			}
		}
	},
	"mappings": {
		"properties": {
			"id": {
				"type": "keyword"
			},
			"name": {
				"type": "text",
				"analyzer": "text_analyzer",
				"fields": {
					"keyword": {
						"type": "keyword"
					}
				}
			},
			"description": {
				"type": "text",
				"analyzer": "text_analyzer"
			},
			"price": {
				"type": "double"
			},
			"category": {
				"type": "keyword"
			},
			"tags": {
				"type": "keyword"
			},
			"createdAt": {
				"type": "date",
				"format": "epoch_millis"
			}
		}
	}
}`

// Product 商品模型
type Product struct {
	ID          string   `json:"id"`
	Name        string   `json:"name"`
	Description string   `json:"description"`
	Price       float64  `json:"price"`
	Category    string   `json:"category"`
	Tags        []string `json:"tags"`
	CreatedAt   int64    `json:"createdAt"`
}

Mapping介绍

  1. Settings(索引设置)
"settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "analysis": { ... }
}
  • 分片设置

    • number_of_shards: 1 个主分片(适用于小规模或开发环境)。
    • number_of_replicas: 0 个副本(无冗余,生产环境建议至少设为 1)。
  • 分析器配置

    "analysis": {
        "analyzer": {
            "text_analyzer": {
                "type": "custom",
                "tokenizer": "standard",
                "filter": ["lowercase", "asciifolding"]
            }
        }
    }
    
    • 文本分析器text_analyzer):
      • 使用 standard 分词器(按词边界分词,适合大多数语言)。
      • 应用 lowercase 过滤器(转为小写)和 asciifolding 过滤器(将非 ASCII 字符转为等效 ASCII,如 ée)。
  1. Mappings(字段映射)
"mappings": {
    "properties": { ... }
}
  • 字段类型与用途
字段名 类型 用途与特点
id keyword 精确匹配(如商品 ID),不分析,用于过滤、排序或聚合。
name text 全文搜索字段,使用 text_analyzer 处理(支持分词、小写和 ASCII 折叠)。
.keyword 子字段,保留原始值,用于精确匹配(如聚合品牌名)。
description text 长文本描述,同样使用 text_analyzer 进行全文搜索。
price double 浮点型数值,支持范围查询(如 price > 100)。
category keyword 分类标签(如 “electronics”),用于过滤和聚合。
tags keyword 标签数组(如 ["popular", "new"]),支持多值精确匹配。
createdAt date 日期类型(Unix 毫秒时间戳),支持范围查询(如按时间筛选)。

3.4 扩展 ServiceContext

修改 internal/svc/servicecontext.go 文件,添加 Elasticsearch 客户端:

package svc

import (
	"go-zero-es-demo/internal/config"
	"go-zero-es-demo/internal/pkg/es"
)

type ServiceContext struct {
	Config   config.Config
	EsClient *es.ElasticsearchClient
}

func NewServiceContext(c config.Config) *ServiceContext {
	esClient, err := es.NewElasticsearchClient(
		c.Elasticsearch.Addresses,
		c.Elasticsearch.Username,
		c.Elasticsearch.Password,
	)
	if err != nil {
		panic(err)
	}

	return &ServiceContext{
		Config:   c,
		EsClient: esClient,
	}
}
//定义索引初始化函数
func InitElasticsearch(client *es.ElasticsearchClient) error {
	// 检查商品索引是否存在
	exists, err := client.IndexExists(model.ProductIndex)
	if err != nil {
		return err
	}

	// 如果索引不存在,则创建
	if !exists {
		err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
		if err != nil {
			return err
		}
	}

	return nil
}


3.5 实现搜索逻辑

修改 internal/logic/searchproductslogic.go 文件,实现商品搜索功能:

package logic

import (
	"context"

	"go-zero-es-demo/internal/model"
	"go-zero-es-demo/internal/svc"
	"go-zero-es-demo/internal/types"

	"github.com/zeromicro/go-zero/core/logx"
)

type SearchProductsLogic struct {
	logx.Logger
	ctx    context.Context
	svcCtx *svc.ServiceContext
}

func NewSearchProductsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SearchProductsLogic {
	return &SearchProductsLogic{
		Logger: logx.WithContext(ctx),
		ctx:    ctx,
		svcCtx: svcCtx,
	}
}

func (l *SearchProductsLogic) SearchProducts(req *types.SearchRequest) (resp *types.SearchResponse, err error) {
	// 构建搜索查询
	from := (req.Page - 1) * req.PageSize
	size := req.PageSize

	// 基本查询结构
	query := map[string]interface{}{
		"from": from,
		"size": size,
	}

	// 构建搜索条件
	boolQuery := map[string]interface{}{}
	mustClauses := []map[string]interface{}{}

	// 关键词搜索
	if req.Keyword != "" {
		mustClauses = append(mustClauses, map[string]interface{}{
			"multi_match": map[string]interface{}{
				"query":  req.Keyword,
				"fields": []string{"name^3", "description", "tags"},
			},
		})
	}

	// 分类过滤
	if req.Category != "" {
		mustClauses = append(mustClauses, map[string]interface{}{
			"term": map[string]interface{}{
				"category": req.Category,
			},
		})
	}

	// 价格范围过滤
	if req.MinPrice > 0 || req.MaxPrice > 0 {
		rangeQuery := map[string]interface{}{}

		if req.MinPrice > 0 {
			rangeQuery["gte"] = req.MinPrice
		}

		if req.MaxPrice > 0 {
			rangeQuery["lte"] = req.MaxPrice
		}

		mustClauses = append(mustClauses, map[string]interface{}{
			"range": map[string]interface{}{
				"price": rangeQuery,
			},
		})
	}

	// 添加bool查询
	if len(mustClauses) > 0 {
		boolQuery["must"] = mustClauses
		query["query"] = map[string]interface{}{
			"bool": boolQuery,
		}
	} else {
		query["query"] = map[string]interface{}{
			"match_all": map[string]interface{}{},
		}
	}

	// 排序
	if req.SortField != "" {
		order := "asc"
		if req.SortOrder == "desc" {
			order = "desc"
		}

		query["sort"] = []map[string]interface{}{
			{
				req.SortField: map[string]interface{}{
					"order": order,
				},
			},
		}
	}

	// 执行搜索请求
	result, err := l.svcCtx.EsClient.Search(model.ProductIndex, query)
	if err != nil {
		return nil, err
	}

	// 解析搜索结果
	total := int64(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))
	hits := result["hits"].(map[string]interface{})["hits"].([]interface{})

	products := make([]types.ProductItem, 0, len(hits))
	for _, hit := range hits {
		source := hit.(map[string]interface{})["_source"].(map[string]interface{})

		// 提取标签数组
		tags := []string{}
		if tagsRaw, ok := source["tags"].([]interface{}); ok {
			for _, tag := range tagsRaw {
				if tagStr, ok := tag.(string); ok {
					tags = append(tags, tagStr)
				}
			}
		}

		product := types.ProductItem{
			ID:          source["id"].(string),
			Name:        source["name"].(string),
			Description: source["description"].(string),
			Price:       source["price"].(float64),
			Category:    source["category"].(string),
			Tags:        tags,
			CreatedAt:   int64(source["createdAt"].(float64)),
		}

		products = append(products, product)
	}

	return &types.SearchResponse{
		Total:    total,
		Products: products,
	}, nil
}

3.6 实现索引和删除逻辑

修改 internal/logic/indexproductlogic.go 文件,实现商品索引功能:

package logic

import (
	"context"
	"time"

	"go-zero-es-demo/internal/model"
	"go-zero-es-demo/internal/svc"
	"go-zero-es-demo/internal/types"

	"github.com/zeromicro/go-zero/core/logx"
)

type IndexProductLogic struct {
	logx.Logger
	ctx    context.Context
	svcCtx *svc.ServiceContext
}

func NewIndexProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *IndexProductLogic {
	return &IndexProductLogic{
		Logger: logx.WithContext(ctx),
		ctx:    ctx,
		svcCtx: svcCtx,
	}
}

func (l *IndexProductLogic) IndexProduct(req *types.IndexProductRequest) (resp *types.IndexProductResponse, err error) {
	// 如果未提供创建时间,则使用当前时间
	if req.Product.CreatedAt == 0 {
		req.Product.CreatedAt = time.Now().UnixMilli()
	}

	// 索引文档
	err = l.svcCtx.EsClient.IndexDocument(model.ProductIndex, req.Product.ID, req.Product)
	if err != nil {
		return &types.IndexProductResponse{
			Success: false,
			Message: "Failed to index product: " + err.Error(),
		}, nil
	}

	return &types.IndexProductResponse{
		Success: true,
		Message: "Product indexed successfully",
	}, nil
}

修改 internal/logic/deleteproductlogic.go 文件,实现商品删除功能:

package logic

import (
	"context"

	"go-zero-es-demo/internal/model"
	"go-zero-es-demo/internal/svc"
	"go-zero-es-demo/internal/types"

	"github.com/zeromicro/go-zero/core/logx"
)

type DeleteProductLogic struct {
	logx.Logger
	ctx    context.Context
	svcCtx *svc.ServiceContext
}

func NewDeleteProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteProductLogic {
	return &DeleteProductLogic{
		Logger: logx.WithContext(ctx),
		ctx:    ctx,
		svcCtx: svcCtx,
	}
}

func (l *DeleteProductLogic) DeleteProduct(req *types.DeleteProductRequest) (resp *types.IndexProductResponse, err error) {
	// todo: add your logic here and delete this line
	// 删除文档
	err = l.svcCtx.EsClient.DeleteDocument(model.ProductIndex, req.ID)
	if err != nil {
		return &types.IndexProductResponse{
			Success: false,
			Message: "Failed to delete product: " + err.Error(),
		}, nil
	}

	return &types.IndexProductResponse{
		Success: true,
		Message: "Product deleted successfully",
	}, nil
}


最后,修改主程序入口 search.go 文件,添加 Elasticsearch 索引初始化逻辑:


func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)

	server := rest.MustNewServer(c.RestConf)
	defer server.Stop()
	ctx := svc.NewServiceContext(c)
	
	// 初始化 Elasticsearch 索引
	if err := svc.InitElasticsearch(ctx.EsClient); err != nil {
		panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))
	}

	handler.RegisterHandlers(server, ctx)

	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
	server.Start()
}

4. 项目测试

现在开始启动项目来测试

go run search.go

测试添加索引,我这里只粘贴了一个,其他数据自行补充:

curl --location '127.0.0.1:8888/api/products/index' \
--header 'Content-Type: application/json' \
--data '{
    "product":
    {
        "id": "1",
        "name": "陶瓷马克杯",
        "description": "简约北欧风格陶瓷马克杯,采用高温烧制,釉面光滑易清洗,适合日常咖啡或茶饮。",
        "price": 39.9,
        "category": "家居",
        "tags": ["厨房用品", "创意礼品", "陶瓷制品"],
        "createdAt": 1689321456
        }
}

运行结果:


{
    "success": true,
    "message": "Product indexed successfully"
}

测试索引搜索

curl --location '127.0.0.1:8888/api/search/products' \
--header 'Content-Type: application/json' \
--data '{
    "keyword" :"面包"
}
'

得到以下类似的结果:

{
    "total": 4,
    "products": [
        {
            "id": "5",
            "name": "全麦面包",
            "description": "无添加全麦面包,富含膳食纤维和蛋白质,口感松软,适合早餐搭配果酱或奶酪食用。",
            "price": 19.8,
            "category": "食品",
            "tags": [
                "健康食品",
                "早餐必备",
                "全麦谷物"
            ],
            "createdAt": 1690001234
        },
        {
            "id": "4",
            "name": "保湿面霜",
            "description": "深层保湿面霜,含透明质酸和胶原蛋白成分,有效改善干燥肌肤,适合所有肤质日常护理。",
            "price": 129,
            "category": "美妆",
            "tags": [
                "护肤用品",
                "保湿补水",
                "温和配方"
            ],
            "createdAt": 1687654321
        }
    ]
}

测试索引删除

curl --location '127.0.0.1:8888/api/delete/products' \
--header 'Content-Type: application/json' \
--data '
{
    "id" :"666"
}'

5 功能拓展

完成基本功能后,我们可以添加一些高级功能,使搜索服务更加强大。

6.1 聚合查询

首先,在 API 定义文件 api/search/search.api 中添加新的类型和接口:

type (
    CategoryStat {
        Category  string  `json:"category"`
        Count     int64   `json:"count"`
        AvgPrice  float64 `json:"avgPrice"`
        MaxPrice  float64 `json:"maxPrice"`
        MinPrice  float64 `json:"minPrice"`
    }

    CategoryStatsResponse {
        CategoryStats []CategoryStat `json:"categoryStats"`
    }
)

service search-api {
    // ...existing endpoints...

    @handler GetCategoryStats
    get /api/stats/categories returns (CategoryStatsResponse)
}

使用 goctl 更新生成的代码:

goctl api go -api api/search/search.api -dir .

internal/pkg/es/es.go 中添加聚合查询方法:

// Aggregate 执行聚合查询
func (e *ElasticsearchClient) Aggregate(index string, query map[string]interface{}) (map[string]interface{}, error) {
	var buf strings.Builder
	if err := json.NewEncoder(&buf).Encode(query); err != nil {
		return nil, err
	}

	res, err := e.client.Search(
		e.client.Search.WithContext(context.Background()),
		e.client.Search.WithIndex(index),
		e.client.Search.WithBody(strings.NewReader(buf.String())),
		e.client.Search.WithSize(0), // 聚合查询通常不需要返回文档
	)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.IsError() {
		var e map[string]interface{}
		if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
			return nil, err
		}
		return nil, errors.New("aggregate error")
	}

	var r map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
		return nil, err
	}

	return r, nil
}

实现统计逻辑,在 internal/logic/getcategorystatslogic.go 文件中:

package logic

import (
	"context"

	"go-zero-es-demo/internal/model"
	"go-zero-es-demo/internal/svc"
	"go-zero-es-demo/internal/types"

	"github.com/zeromicro/go-zero/core/logx"
)

type GetCategoryStatsLogic struct {
	logx.Logger
	ctx    context.Context
	svcCtx *svc.ServiceContext
}

func NewGetCategoryStatsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCategoryStatsLogic {
	return &GetCategoryStatsLogic{
		Logger: logx.WithContext(ctx),
		ctx:    ctx,
		svcCtx: svcCtx,
	}
}

func (l *GetCategoryStatsLogic) GetCategoryStats() (resp *types.CategoryStatsResponse, err error) {
	// 构建聚合查询
	query := map[string]interface{}{
		"aggs": map[string]interface{}{
			"categories": map[string]interface{}{
				"terms": map[string]interface{}{
					"field": "category",
					"size":  20,
				},
				"aggs": map[string]interface{}{
					"avg_price": map[string]interface{}{
						"avg": map[string]interface{}{
							"field": "price",
						},
					},
					"max_price": map[string]interface{}{
						"max": map[string]interface{}{
							"field": "price",
						},
					},
					"min_price": map[string]interface{}{
						"min": map[string]interface{}{
							"field": "price",
						},
					},
				},
			},
		},
	}

	// 执行聚合查询
	result, err := l.svcCtx.EsClient.Aggregate(model.ProductIndex, query)
	if err != nil {
		return nil, err
	}

	// 解析结果
	aggregations := result["aggregations"].(map[string]interface{})
	categories := aggregations["categories"].(map[string]interface{})
	buckets := categories["buckets"].([]interface{})

	stats := make([]types.CategoryStat, 0, len(buckets))
	for _, bucket := range buckets {
		b := bucket.(map[string]interface{})
		key := b["key"].(string)
		docCount := int64(b["doc_count"].(float64))
		avgPrice := b["avg_price"].(map[string]interface{})["value"].(float64)
		maxPrice := b["max_price"].(map[string]interface{})["value"].(float64)
		minPrice := b["min_price"].(map[string]interface{})["value"].(float64)

		stats = append(stats, types.CategoryStat{
			Category:  key,
			Count:     docCount,
			AvgPrice:  avgPrice,
			MaxPrice:  maxPrice,
			MinPrice:  minPrice,
		})
	}

	return &types.CategoryStatsResponse{
		CategoryStats: stats,
	}, nil
}

运行测试


curl --location '127.0.0.1:8888/api/stats/categories'

会得到如下的类似的结果:

{
    "categoryStats": [
        {
            "category": "家居",
            "count": 2,
            "avgPrice": 26.4,
            "maxPrice": 39.9,
            "minPrice": 12.9
        },
        {
            "category": "服饰",
            "count": 1,
            "avgPrice": 59.99,
            "maxPrice": 59.99,
            "minPrice": 59.99
        },
        {
            "category": "电子",
            "count": 1,
            "avgPrice": 89.5,
            "maxPrice": 89.5,
            "minPrice": 89.5
        },
        {
            "category": "美妆",
            "count": 1,
            "avgPrice": 129,
            "maxPrice": 129,
            "minPrice": 129
        },
        {
            "category": "运动",
            "count": 1,
            "avgPrice": 79.5,
            "maxPrice": 79.5,
            "minPrice": 79.5
        },
        {
            "category": "食品",
            "count": 1,
            "avgPrice": 19.8,
            "maxPrice": 19.8,
            "minPrice": 19.8
        }
    ]
}

6.2 同义词搜索

要启用同义词搜索,需要修改索引映射。首先,修改 internal/model/product.go 文件中的索引映射:

// 修改 ProductIndexMapping 变量
var ProductIndexMapping = `{
	"settings": {
		"number_of_shards": 1,
		"number_of_replicas": 0,
		"analysis": {
			"filter": {
				"synonym_filter": {
					"type": "synonym",
					"synonyms": [
						"音响, 音箱, 音像",
						"衣服, 服装, 服饰",
						"首饰, 手饰, 饰品"
					]
				}
			},
			"analyzer": {
				"text_analyzer": {
					"type": "custom",
					"tokenizer": "standard",
					"filter": ["lowercase", "asciifolding", "synonym_filter"]
				}
			}
		}
	},
	"mappings": {
		"properties": {
			"id": {
				"type": "keyword"
			},
			"name": {
				"type": "text",
				"analyzer": "text_analyzer",
				"fields": {
					"keyword": {
						"type": "keyword"
					}
				}
			},
			"description": {
				"type": "text",
				"analyzer": "text_analyzer"
			},
			"price": {
				"type": "double"
			},
			"category": {
				"type": "keyword"
			},
			"tags": {
				"type": "keyword"
			},
			"createdAt": {
				"type": "date",
				"format": "epoch_millis"
			}
		}
	}
}`

增加ynonym_filter: 同义词过滤器,定义了3组同义词:

"音响, 音箱, 音像",
"衣服, 服装, 服饰",
"首饰, 手饰, 饰品"

当修改索引映射后,需要重建索引。因此在 internal/pkg/es/es.go 中添加删除索引的方法:

// DeleteIndex 删除索引
func (e *ElasticsearchClient) DeleteIndex(index string) error {
	res, err := e.client.Indices.Delete([]string{index})
	if err != nil {
		return err
	}
	defer res.Body.Close()

	if res.IsError() {
		return errors.New("failed to delete index")
	}

	return nil
}

/svc/servicecontext.go 中添加更新映射的逻辑:

// UpdateElasticsearchIndex 更新索引映射(需要重建索引)
func UpdateElasticsearchIndex(client *es.ElasticsearchClient) error {
	// 检查索引是否存在
	exists, err := client.IndexExists(model.ProductIndex)
	if err != nil {
		return err
	}

	// 如果索引已存在,则删除并重建
	if exists {
		// 获取原索引的所有文档
		query := map[string]interface{}{
			"query": map[string]interface{}{
				"match_all": map[string]interface{}{},
			},
			"size": 10000, // 注意:实际应用中应使用滚动API处理大量数据
		}

		result, err := client.Search(model.ProductIndex, query)
		if err != nil {
			return err
		}

		// 提取文档
		hits := result["hits"].(map[string]interface{})["hits"].([]interface{})
		documents := make([]map[string]interface{}, 0, len(hits))

		for _, hit := range hits {
			hitMap := hit.(map[string]interface{})
			source := hitMap["_source"].(map[string]interface{})
			id := hitMap["_id"].(string)

			// 确保文档有ID
			source["id"] = id
			documents = append(documents, source)
		}

		// 删除索引
		err = client.DeleteIndex(model.ProductIndex)
		if err != nil {
			return err
		}

		// 创建新索引
		err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
		if err != nil {
			return err
		}

		// 如果有文档,重新索引它们
		if len(documents) > 0 {
			err = client.BulkIndex(model.ProductIndex, documents)
			if err != nil {
				return err
			}
		}

		return nil
	}

	// 如果索引不存在,则创建
	return client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
}

修改mian函数:


func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)

	server := rest.MustNewServer(c.RestConf)
	defer server.Stop()
	ctx := svc.NewServiceContext(c)
	
	//更新并初始化索引
	err := svc.UpdateElasticsearchIndex(ctx.EsClient)
	if err != nil {
		return
	}

	/*
		// 初始化 Elasticsearch 索引
		if err := svc.InitElasticsearch(ctx.EsClient); err != nil {
			panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))
		}

	*/

	handler.RegisterHandlers(server, ctx)

	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
	server.Start()
}

为了方便测试,我们增加几个新的数据:

[
  {
    "id": "1",
    "name": "豪华音像",
    "description": "",
    "price": 999.99,
    "category": "数码",
    "tags": ["smartphone", "popular", "new"],
    "createdAt": 1715788800000  // 2025-05-14
  },
  {
    "id": "2",
    "name": "智能音箱",
    "description": "续航长达18小时",
    "price": 1199.99,
    "category": "数码",
    "tags": ["laptop", "apple", "portable"],
    "createdAt": 1715702400000  // 2025-05-13
  },
  {
    "id": "3",
    "name": "轻便音响",
    "description": "",
    "price": 1799.99,
    "category": "数码",
    "tags": ["tv", "4k", "oled"],
    "createdAt": 1715616000000  // 2025-05-12
  }
]