Elasticsearch Java开发(SpringBoot)

发布于:2025-09-03 ⋅ 阅读:(23) ⋅ 点赞:(0)

Elasticsearch 是一个基于 Lucene 的分布式搜索和分析引擎,提供近实时的搜索能力。它具有以下特点:

• 分布式架构
• RESTful API
• 多租户支持
• 强大的查询 DSL
• 近实时搜索
• 自动分片和复制

基础使用 - Java API

1. 环境准备
首先添加 Maven 依赖:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.15.2</version>
</dependency>

2. 客户端初始化

RestHighLevelClient client = new RestHighLevelClient(
    RestClient.builder(
        new HttpHost("localhost", 9200, "http"),
        new HttpHost("localhost", 9201, "http")
    )
);

代码解析:

• RestHighLevelClient是 Elasticsearch 的高级 Java 客户端,提供了所有 API 的操作方法。
• RestClient.builder()用于构建底层 REST 客户端。
• HttpHost指定 Elasticsearch 节点地址,可以配置多个节点实现负载均衡和高可用。
• 通常在生产环境中配置多个节点地址,客户端会自动处理节点故障转移。

3. 索引操作
创建索引

CreateIndexRequest request = new CreateIndexRequest("products");
request.settings(Settings.builder()
    .put("index.number_of_shards", 3)
    .put("index.number_of_replicas", 2)
);

request.mapping(
    "{\"properties\":{\"name\":{\"type\":\"text\"},\"price\":{\"type\":\"double\"}}}", 
    XContentType.JSON
);

CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);

代码解析:

• CreateIndexRequest用于创建索引请求
• settings()方法设置索引级别配置:
number_of_shards:主分片数量(创建后不可修改)
number_of_replicas:每个主分片的副本数量(可动态修改)
• mapping()方法定义字段映射:
可以传入 JSON 字符串或 Map 对象
XContentType.JSON指定内容类型
• RequestOptions.DEFAULT使用默认请求选项

获取索引信息

GetIndexRequest request = new GetIndexRequest("products");
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);

Map<String, Settings> settings = response.getSettings();
Map<String, MappingMetadata> mappings = response.getMappings();

代码解析:

• GetIndexRequest获取索引信息请求
• getSettings()返回索引设置
• getMappings()返回索引映射
• 可以获取特定索引或使用通配符获取多个索引信息

4. 文档操作
索引文档(创建/更新)

IndexRequest request = new IndexRequest("products")
    .id("1") // 文档ID,可选
    .source(
        "name", "iPhone 13",
        "price", 799.99,
        "description", "Latest iPhone model",
        "created", "2023-01-15"
    );

IndexResponse response = client.index(request, RequestOptions.DEFAULT);

代码解析:

• IndexRequest用于创建或更新文档
• .id()指定文档 ID,如果不指定会自动生成
• .source()指定文档内容,支持多种格式:
Map 对象
JSON 字符串
XContentBuilder
• 键值对形式(如上例)
• IndexResponse包含操作结果:
• getResult():操作结果(CREATED/UPDATED)
• getId():文档 ID
• getVersion():文档版本号

获取文档

GetRequest request = new GetRequest("products", "1");
GetResponse response = client.get(request, RequestOptions.DEFAULT);

if (response.isExists()) {
    String sourceAsString = response.getSourceAsString(); // 获取JSON字符串
    Map<String, Object> sourceAsMap = response.getSourceAsMap(); // 获取Map对象
    Product product = mapper.readValue(sourceAsString, Product.class); // 转换为对象
}

代码解析:

• GetRequest需要指定索引名和文档 ID
• isExists()检查文档是否存在
• 获取文档内容的多种方式:
getSourceAsString()``:原始 JSON 字符串 getSourceAsMap():转换为 Map 对象 `getSourceAsBytes():原始字节数组
• 可以使用 JSON 库(如 Jackson)将结果转换为 Java 对象

更新文档

UpdateRequest request = new UpdateRequest("products", "1")
    .doc(
        "price", 749.99,
        "updated", new Date()
    );

UpdateResponse response = client.update(request, RequestOptions.DEFAULT);

代码解析:

• UpdateRequest需要指定索引名和文档 ID
• .doc()指定要更新的字段(部分更新)
• 也可以使用 .script()进行脚本更新
• 如果文档不存在可以设置 .upsert()进行插入

删除文档

DeleteRequest request = new DeleteRequest("products", "1");
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);

if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    // 文档不存在处理
}

代码解析:

• DeleteRequest需要指定索引名和文档 ID
• getResult()返回操作结果:
• DELETED:删除成功
• NOT_FOUND:文档不存在

5. 搜索操作
基本查询

SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

// 构建查询
sourceBuilder.query(QueryBuilders.matchQuery("name", "iPhone"));

// 设置分页
sourceBuilder.from(0);
sourceBuilder.size(10);

searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

代码解析:

• SearchRequest表示搜索请求,可以指定一个或多个索引
• SearchSourceBuilder构建搜索条件
• QueryBuilders提供了各种查询构建方法:
matchQuery():匹配查询(会分词)
termQuery():精确匹配(不分词)
rangeQuery():范围查询
boolQuery():布尔查询
• from()和 size()实现分页

布尔查询

BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
    .must(QueryBuilders.matchQuery("name", "iPhone")) // 必须满足
    .should(QueryBuilders.matchQuery("description", "pro")) // 应该满足(影响评分)
    .mustNot(QueryBuilders.termQuery("status", "disabled")) // 必须不满足
    .filter(QueryBuilders.rangeQuery("price").gte(500).lte(1000)); // 过滤条件(不影响评分)

sourceBuilder.query(boolQuery);

代码解析:

• boolQuery()构建布尔查询,可以组合多个查询条件
• 四种子条件:
must:必须匹配,贡献分数
should:应该匹配,贡献分数
mustNot:必须不匹配
filter:必须匹配,但不贡献分数(性能更好)

聚合查询

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

// 按名称分组
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_name")
    .field("name")
    .size(10);

// 每个分组内计算平均价格
aggregation.subAggregation(
    AggregationBuilders.avg("avg_price").field("price")
);

sourceBuilder.aggregation(aggregation);
sourceBuilder.size(0); // 不返回原始文档

SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

// 解析聚合结果
Terms byName = response.getAggregations().get("by_name");
for (Terms.Bucket bucket : byName.getBuckets()) {
    String name = bucket.getKeyAsString();
    Avg avgPrice = bucket.getAggregations().get("avg_price");
    System.out.println(name + ": " + avgPrice.getValue());
}

代码解析:

• AggregationBuilders提供各种聚合构建方法
• 常用聚合类型:
terms:分组统计
avg:平均值
sum:求和
max/min:最大/最小值
dateHistogram:日期直方图
• 聚合可以嵌套(subAggregation)
• size(0)表示只返回聚合结果,不返回文档

高亮显示

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("description", "apple"));

HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("description")
    .preTags("<strong>")
    .postTags("</strong>")
    .fragmentSize(200); // 高亮片段长度

sourceBuilder.highlighter(highlightBuilder);

SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

for (SearchHit hit : response.getHits()) {
    Map<String, HighlightField> highlights = hit.getHighlightFields();
    HighlightField descriptionHighlight = highlights.get("description");
    if (descriptionHighlight != null) {
        Text[] fragments = descriptionHighlight.getFragments();
        String highlighted = fragments[0].string();
    }
}

代码解析:

• HighlightBuilder构建高亮配置
• 重要参数:
preTags/postTags:高亮标签
fragmentSize:片段长度
numOfFragments:返回片段数量
• 高亮结果通过 hit.getHighlightFields()获取

6. 批量操作
批量索引

BulkRequest request = new BulkRequest();

request.add(new IndexRequest("products").id("1")
    .source(XContentType.JSON, 
        "name", "iPhone 13",
        "price", 799.99));
        
request.add(new IndexRequest("products").id("2")
    .source(XContentType.JSON, 
        "name", "MacBook Pro",
        "price", 1999.99));

BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);

if (response.hasFailures()) {
    for (BulkItemResponse item : response.getItems()) {
        if (item.isFailed()) {
            BulkItemResponse.Failure failure = item.getFailure();
            System.err.println("Failed to index: " + failure.getId() + 
                ", cause: " + failure.getMessage());
        }
    }
}

代码解析:

• BulkRequest可以包含多个索引、更新、删除操作
• 批量操作比单条操作效率高很多
• 需要检查 hasFailures()判断是否有失败操作
• 可以通过 getItems()获取每个操作的详细结果

批量查询

MultiSearchRequest request = new MultiSearchRequest();

SearchRequest firstSearch = new SearchRequest("products");
SearchSourceBuilder firstSource = new SearchSourceBuilder();
firstSource.query(QueryBuilders.matchQuery("name", "iPhone"));
firstSearch.source(firstSource);
request.add(firstSearch);

SearchRequest secondSearch = new SearchRequest("products");
SearchSourceBuilder secondSource = new SearchSourceBuilder();
secondSource.query(QueryBuilders.rangeQuery("price").gte(1000));
secondSearch.source(secondSource);
request.add(secondSearch);

MultiSearchResponse response = client.msearch(request, RequestOptions.DEFAULT);

for (MultiSearchResponse.Item item : response.getResponses()) {
    SearchResponse searchResponse = item.getResponse();
    // 处理每个查询结果
}

代码解析:

• MultiSearchRequest可以包含多个搜索请求
• 适用于需要同时执行多个不相关查询的场景
• 返回的 MultiSearchResponse包含每个查询的结果

整合Spring Boot

1. 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

2. 配置 Elasticsearch

spring.elasticsearch.rest.uris=http://localhost:9200
spring.elasticsearch.rest.connection-timeout=1s
spring.elasticsearch.rest.read-timeout=1m

或使用 Java 配置类:

@Configuration
public class ElasticsearchConfig {

    @Value("${spring.elasticsearch.rest.uris}")
    private String elasticsearchUrl;

    @Bean
    public RestHighLevelClient elasticsearchClient() {
        return new RestHighLevelClient(
            RestClient.builder(HttpHost.create(elasticsearchUrl))
        );
    }
}

3. 实体类映射

@Document(indexName = "products", createIndex = false)
public class Product {
    
    @Id
    private String id;
    
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String name;
    
    @Field(type = FieldType.Double)
    private double price;
    
    @Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)
    private Date created;
    
    @Field(type = FieldType.Nested)
    private List<Category> categories;
    
    // getters and setters
}

public class Category {
    @Field(type = FieldType.Keyword)
    private String name;
    
    @Field(type = FieldType.Integer)
    private int level;
}

代码解析:

• @Document:标记为 Elasticsearch 文档
indexName:索引名称
createIndex:是否自动创建索引
• @Id:标记文档 ID 字段
• @Field:字段映射配置
type:字段类型
analyzer:指定分词器
format:日期格式
• 嵌套对象使用 @Field(type = FieldType.Nested)

4. Repository 自定义查询

public interface ProductRepository extends ElasticsearchRepository<Product, String> {
    
    // 方法名派生查询
    List<Product> findByName(String name);
    
    Page<Product> findByPriceBetween(double minPrice, double maxPrice, Pageable pageable);
    
    // @Query 注解自定义查询
    @Query("{\"bool\": {\"must\": {\"match\": {\"name\": \"?0\"}}}}")
    List<Product> searchByName(String name);
    
    // 原生查询
    @Query("{\"bool\": {\"must\": [{\"match\": {\"name\": \"?0\"}}, {\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}]}}")
    Page<Product> searchByNameAndPriceRange(String name, double minPrice, double maxPrice, Pageable pageable);
    
    // 聚合查询
    @Aggregation("{'by_category': {'terms': {'field': 'categories.name'}, 'aggs': {'avg_price': {'avg': {'field': 'price'}}}}}")
    AggregatedPage<Product> findAllGroupByCategory(Pageable pageable);
}

代码解析:

• 方法名派生查询:根据方法名自动生成查询
• @Query注解:支持原生 JSON 查询 DSL
• Pageable参数支持分页和排序
• @Aggregation注解支持聚合查询

5. 自定义 Repository 实现

public interface CustomProductRepository {
    List<Product> customSearch(String name, double minPrice, double maxPrice);
}

public class CustomProductRepositoryImpl implements CustomProductRepository {
    
    @Autowired
    private ElasticsearchOperations operations;
    
    @Override
    public List<Product> customSearch(String name, double minPrice, double maxPrice) {
        Query query = new NativeSearchQueryBuilder()
            .withQuery(QueryBuilders.boolQuery()
                .must(QueryBuilders.matchQuery("name", name))
                .filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice))
            )
            .build();
            
        return operations.search(query, Product.class)
            .stream()
            .map(SearchHit::getContent)
            .collect(Collectors.toList());
    }
}

// 主Repository接口继承自定义接口
public interface ProductRepository extends ElasticsearchRepository<Product, String>, CustomProductRepository {
}

代码解析:

• 自定义接口和实现类命名规范:XxxRepository+ XxxRepositoryImpl
• 通过 ElasticsearchOperations执行复杂查询
• NativeSearchQueryBuilder构建原生查询
• 主 Repository 接口继承自定义接口

6. 事件监听

@Component
public class ProductIndexListener {
    
    @EventListener
    public void handleBeforeSave(BeforeConvertEvent<Product> event) {
        Product product = event.getSource();
        if (product.getCreated() == null) {
            product.setCreated(new Date());
        }
    }
    
    @EventListener
    public void handleAfterSave(AfterSaveEvent<Product> event) {
        Product product = event.getSource();
        System.out.println("Saved product: " + product.getName());
    }
}

代码解析:

• BeforeConvertEvent:在对象转换为文档前触发
• AfterConvertEvent:在对象转换为文档后触发
• BeforeSaveEvent:在保存文档前触发
• AfterSaveEvent:在保存文档后触发

性能优化技巧

1. 批量操作优化

// 使用 BulkProcessor 自动批量处理
BulkProcessor bulkProcessor = BulkProcessor.builder(
    (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
    new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            // 批量执行前
        }
        
        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            // 批量执行成功
        }
        
        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            // 批量执行失败
        }
    })
    .setBulkActions(1000) // 每1000个操作执行一次
    .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每5MB执行一次
    .setFlushInterval(TimeValue.timeValueSeconds(5)) // 每5秒执行一次
    .setConcurrentRequests(1) // 并发请求数
    .build();

// 添加文档到批量处理器
for (Product product : products) {
    IndexRequest request = new IndexRequest("products")
        .id(product.getId())
        .source(/*...*/);
    bulkProcessor.add(request);
}

// 最后手动刷新
bulkProcessor.flush();

2. 搜索优化

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

// 只返回需要的字段
sourceBuilder.fetchSource(new String[]{"name", "price"}, null);

// 禁用评分计算
sourceBuilder.query(QueryBuilders.boolQuery()
    .filter(QueryBuilders.termQuery("status", "active"))
);

// 使用索引排序
sourceBuilder.sort("price", SortOrder.ASC);

// 使用搜索折叠
CollapseBuilder collapseBuilder = new CollapseBuilder("category.keyword");
sourceBuilder.collapse(collapseBuilder);

// 使用查询缓存
sourceBuilder.requestCache(true);

3. 索引设计优化

CreateIndexRequest request = new CreateIndexRequest("products");

// 优化索引设置
request.settings(Settings.builder()
    .put("index.number_of_shards", 3)
    .put("index.number_of_replicas", 1)
    .put("index.refresh_interval", "30s") // 降低刷新频率
    .put("index.merge.scheduler.max_thread_count", 1) // 减少合并线程
);

// 优化映射
request.mapping(
    "{\"properties\":{" +
    "\"name\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\"}}}," +
    "\"price\":{\"type\":\"scaled_float\",\"scaling_factor\":100}," +
    "\"tags\":{\"type\":\"keyword\"}" +
    "}}",
    XContentType.JSON
);

网站公告

今日签到

点亮在社区的每一天
去签到