Elasticsearch 是一个基于 Lucene 的分布式搜索和分析引擎,提供近实时的搜索能力。它具有以下特点:
• 分布式架构
• RESTful API
• 多租户支持
• 强大的查询 DSL
• 近实时搜索
• 自动分片和复制
基础使用 - Java API
1. 环境准备
首先添加 Maven 依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.15.2</version>
</dependency>
2. 客户端初始化
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")
)
);
代码解析:
• RestHighLevelClient是 Elasticsearch 的高级 Java 客户端,提供了所有 API 的操作方法。
• RestClient.builder()用于构建底层 REST 客户端。
• HttpHost指定 Elasticsearch 节点地址,可以配置多个节点实现负载均衡和高可用。
• 通常在生产环境中配置多个节点地址,客户端会自动处理节点故障转移。
3. 索引操作
创建索引
CreateIndexRequest request = new CreateIndexRequest("products");
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2)
);
request.mapping(
"{\"properties\":{\"name\":{\"type\":\"text\"},\"price\":{\"type\":\"double\"}}}",
XContentType.JSON
);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
代码解析:
• CreateIndexRequest用于创建索引请求
• settings()方法设置索引级别配置:
number_of_shards:主分片数量(创建后不可修改)
number_of_replicas:每个主分片的副本数量(可动态修改)
• mapping()方法定义字段映射:
可以传入 JSON 字符串或 Map 对象
XContentType.JSON指定内容类型
• RequestOptions.DEFAULT使用默认请求选项
获取索引信息
GetIndexRequest request = new GetIndexRequest("products");
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
Map<String, Settings> settings = response.getSettings();
Map<String, MappingMetadata> mappings = response.getMappings();
代码解析:
• GetIndexRequest获取索引信息请求
• getSettings()返回索引设置
• getMappings()返回索引映射
• 可以获取特定索引或使用通配符获取多个索引信息
4. 文档操作
索引文档(创建/更新)
IndexRequest request = new IndexRequest("products")
.id("1") // 文档ID,可选
.source(
"name", "iPhone 13",
"price", 799.99,
"description", "Latest iPhone model",
"created", "2023-01-15"
);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
代码解析:
• IndexRequest用于创建或更新文档
• .id()指定文档 ID,如果不指定会自动生成
• .source()指定文档内容,支持多种格式:
Map 对象
JSON 字符串
XContentBuilder
• 键值对形式(如上例)
• IndexResponse包含操作结果:
• getResult():操作结果(CREATED/UPDATED)
• getId():文档 ID
• getVersion():文档版本号
获取文档
GetRequest request = new GetRequest("products", "1");
GetResponse response = client.get(request, RequestOptions.DEFAULT);
if (response.isExists()) {
String sourceAsString = response.getSourceAsString(); // 获取JSON字符串
Map<String, Object> sourceAsMap = response.getSourceAsMap(); // 获取Map对象
Product product = mapper.readValue(sourceAsString, Product.class); // 转换为对象
}
代码解析:
• GetRequest需要指定索引名和文档 ID
• isExists()检查文档是否存在
• 获取文档内容的多种方式:
getSourceAsString()``:原始 JSON 字符串 getSourceAsMap():转换为 Map 对象 `getSourceAsBytes():原始字节数组
• 可以使用 JSON 库(如 Jackson)将结果转换为 Java 对象
更新文档
UpdateRequest request = new UpdateRequest("products", "1")
.doc(
"price", 749.99,
"updated", new Date()
);
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
代码解析:
• UpdateRequest需要指定索引名和文档 ID
• .doc()指定要更新的字段(部分更新)
• 也可以使用 .script()进行脚本更新
• 如果文档不存在可以设置 .upsert()进行插入
删除文档
DeleteRequest request = new DeleteRequest("products", "1");
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
// 文档不存在处理
}
代码解析:
• DeleteRequest需要指定索引名和文档 ID
• getResult()返回操作结果:
• DELETED:删除成功
• NOT_FOUND:文档不存在
5. 搜索操作
基本查询
SearchRequest searchRequest = new SearchRequest("products");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 构建查询
sourceBuilder.query(QueryBuilders.matchQuery("name", "iPhone"));
// 设置分页
sourceBuilder.from(0);
sourceBuilder.size(10);
searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
代码解析:
• SearchRequest表示搜索请求,可以指定一个或多个索引
• SearchSourceBuilder构建搜索条件
• QueryBuilders提供了各种查询构建方法:
matchQuery():匹配查询(会分词)
termQuery():精确匹配(不分词)
rangeQuery():范围查询
boolQuery():布尔查询
• from()和 size()实现分页
布尔查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("name", "iPhone")) // 必须满足
.should(QueryBuilders.matchQuery("description", "pro")) // 应该满足(影响评分)
.mustNot(QueryBuilders.termQuery("status", "disabled")) // 必须不满足
.filter(QueryBuilders.rangeQuery("price").gte(500).lte(1000)); // 过滤条件(不影响评分)
sourceBuilder.query(boolQuery);
代码解析:
• boolQuery()构建布尔查询,可以组合多个查询条件
• 四种子条件:
must:必须匹配,贡献分数
should:应该匹配,贡献分数
mustNot:必须不匹配
filter:必须匹配,但不贡献分数(性能更好)
聚合查询
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 按名称分组
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_name")
.field("name")
.size(10);
// 每个分组内计算平均价格
aggregation.subAggregation(
AggregationBuilders.avg("avg_price").field("price")
);
sourceBuilder.aggregation(aggregation);
sourceBuilder.size(0); // 不返回原始文档
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// 解析聚合结果
Terms byName = response.getAggregations().get("by_name");
for (Terms.Bucket bucket : byName.getBuckets()) {
String name = bucket.getKeyAsString();
Avg avgPrice = bucket.getAggregations().get("avg_price");
System.out.println(name + ": " + avgPrice.getValue());
}
代码解析:
• AggregationBuilders提供各种聚合构建方法
• 常用聚合类型:
terms:分组统计
avg:平均值
sum:求和
max/min:最大/最小值
dateHistogram:日期直方图
• 聚合可以嵌套(subAggregation)
• size(0)表示只返回聚合结果,不返回文档
高亮显示
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("description", "apple"));
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("description")
.preTags("<strong>")
.postTags("</strong>")
.fragmentSize(200); // 高亮片段长度
sourceBuilder.highlighter(highlightBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit hit : response.getHits()) {
Map<String, HighlightField> highlights = hit.getHighlightFields();
HighlightField descriptionHighlight = highlights.get("description");
if (descriptionHighlight != null) {
Text[] fragments = descriptionHighlight.getFragments();
String highlighted = fragments[0].string();
}
}
代码解析:
• HighlightBuilder构建高亮配置
• 重要参数:
preTags/postTags:高亮标签
fragmentSize:片段长度
numOfFragments:返回片段数量
• 高亮结果通过 hit.getHighlightFields()获取
6. 批量操作
批量索引
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("products").id("1")
.source(XContentType.JSON,
"name", "iPhone 13",
"price", 799.99));
request.add(new IndexRequest("products").id("2")
.source(XContentType.JSON,
"name", "MacBook Pro",
"price", 1999.99));
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
BulkItemResponse.Failure failure = item.getFailure();
System.err.println("Failed to index: " + failure.getId() +
", cause: " + failure.getMessage());
}
}
}
代码解析:
• BulkRequest可以包含多个索引、更新、删除操作
• 批量操作比单条操作效率高很多
• 需要检查 hasFailures()判断是否有失败操作
• 可以通过 getItems()获取每个操作的详细结果
批量查询
MultiSearchRequest request = new MultiSearchRequest();
SearchRequest firstSearch = new SearchRequest("products");
SearchSourceBuilder firstSource = new SearchSourceBuilder();
firstSource.query(QueryBuilders.matchQuery("name", "iPhone"));
firstSearch.source(firstSource);
request.add(firstSearch);
SearchRequest secondSearch = new SearchRequest("products");
SearchSourceBuilder secondSource = new SearchSourceBuilder();
secondSource.query(QueryBuilders.rangeQuery("price").gte(1000));
secondSearch.source(secondSource);
request.add(secondSearch);
MultiSearchResponse response = client.msearch(request, RequestOptions.DEFAULT);
for (MultiSearchResponse.Item item : response.getResponses()) {
SearchResponse searchResponse = item.getResponse();
// 处理每个查询结果
}
代码解析:
• MultiSearchRequest可以包含多个搜索请求
• 适用于需要同时执行多个不相关查询的场景
• 返回的 MultiSearchResponse包含每个查询的结果
整合Spring Boot
1. 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2. 配置 Elasticsearch
spring.elasticsearch.rest.uris=http://localhost:9200
spring.elasticsearch.rest.connection-timeout=1s
spring.elasticsearch.rest.read-timeout=1m
或使用 Java 配置类:
@Configuration
public class ElasticsearchConfig {
@Value("${spring.elasticsearch.rest.uris}")
private String elasticsearchUrl;
@Bean
public RestHighLevelClient elasticsearchClient() {
return new RestHighLevelClient(
RestClient.builder(HttpHost.create(elasticsearchUrl))
);
}
}
3. 实体类映射
@Document(indexName = "products", createIndex = false)
public class Product {
@Id
private String id;
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String name;
@Field(type = FieldType.Double)
private double price;
@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)
private Date created;
@Field(type = FieldType.Nested)
private List<Category> categories;
// getters and setters
}
public class Category {
@Field(type = FieldType.Keyword)
private String name;
@Field(type = FieldType.Integer)
private int level;
}
代码解析:
• @Document:标记为 Elasticsearch 文档
indexName:索引名称
createIndex:是否自动创建索引
• @Id:标记文档 ID 字段
• @Field:字段映射配置
type:字段类型
analyzer:指定分词器
format:日期格式
• 嵌套对象使用 @Field(type = FieldType.Nested)
4. Repository 自定义查询
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
// 方法名派生查询
List<Product> findByName(String name);
Page<Product> findByPriceBetween(double minPrice, double maxPrice, Pageable pageable);
// @Query 注解自定义查询
@Query("{\"bool\": {\"must\": {\"match\": {\"name\": \"?0\"}}}}")
List<Product> searchByName(String name);
// 原生查询
@Query("{\"bool\": {\"must\": [{\"match\": {\"name\": \"?0\"}}, {\"range\": {\"price\": {\"gte\": ?1, \"lte\": ?2}}}]}}")
Page<Product> searchByNameAndPriceRange(String name, double minPrice, double maxPrice, Pageable pageable);
// 聚合查询
@Aggregation("{'by_category': {'terms': {'field': 'categories.name'}, 'aggs': {'avg_price': {'avg': {'field': 'price'}}}}}")
AggregatedPage<Product> findAllGroupByCategory(Pageable pageable);
}
代码解析:
• 方法名派生查询:根据方法名自动生成查询
• @Query注解:支持原生 JSON 查询 DSL
• Pageable参数支持分页和排序
• @Aggregation注解支持聚合查询
5. 自定义 Repository 实现
public interface CustomProductRepository {
List<Product> customSearch(String name, double minPrice, double maxPrice);
}
public class CustomProductRepositoryImpl implements CustomProductRepository {
@Autowired
private ElasticsearchOperations operations;
@Override
public List<Product> customSearch(String name, double minPrice, double maxPrice) {
Query query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("name", name))
.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice))
)
.build();
return operations.search(query, Product.class)
.stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}
}
// 主Repository接口继承自定义接口
public interface ProductRepository extends ElasticsearchRepository<Product, String>, CustomProductRepository {
}
代码解析:
• 自定义接口和实现类命名规范:XxxRepository+ XxxRepositoryImpl
• 通过 ElasticsearchOperations执行复杂查询
• NativeSearchQueryBuilder构建原生查询
• 主 Repository 接口继承自定义接口
6. 事件监听
@Component
public class ProductIndexListener {
@EventListener
public void handleBeforeSave(BeforeConvertEvent<Product> event) {
Product product = event.getSource();
if (product.getCreated() == null) {
product.setCreated(new Date());
}
}
@EventListener
public void handleAfterSave(AfterSaveEvent<Product> event) {
Product product = event.getSource();
System.out.println("Saved product: " + product.getName());
}
}
代码解析:
• BeforeConvertEvent:在对象转换为文档前触发
• AfterConvertEvent:在对象转换为文档后触发
• BeforeSaveEvent:在保存文档前触发
• AfterSaveEvent:在保存文档后触发
性能优化技巧
1. 批量操作优化
// 使用 BulkProcessor 自动批量处理
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// 批量执行前
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// 批量执行成功
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// 批量执行失败
}
})
.setBulkActions(1000) // 每1000个操作执行一次
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每5MB执行一次
.setFlushInterval(TimeValue.timeValueSeconds(5)) // 每5秒执行一次
.setConcurrentRequests(1) // 并发请求数
.build();
// 添加文档到批量处理器
for (Product product : products) {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(/*...*/);
bulkProcessor.add(request);
}
// 最后手动刷新
bulkProcessor.flush();
2. 搜索优化
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 只返回需要的字段
sourceBuilder.fetchSource(new String[]{"name", "price"}, null);
// 禁用评分计算
sourceBuilder.query(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("status", "active"))
);
// 使用索引排序
sourceBuilder.sort("price", SortOrder.ASC);
// 使用搜索折叠
CollapseBuilder collapseBuilder = new CollapseBuilder("category.keyword");
sourceBuilder.collapse(collapseBuilder);
// 使用查询缓存
sourceBuilder.requestCache(true);
3. 索引设计优化
CreateIndexRequest request = new CreateIndexRequest("products");
// 优化索引设置
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
.put("index.refresh_interval", "30s") // 降低刷新频率
.put("index.merge.scheduler.max_thread_count", 1) // 减少合并线程
);
// 优化映射
request.mapping(
"{\"properties\":{" +
"\"name\":{\"type\":\"text\",\"fields\":{\"keyword\":{\"type\":\"keyword\"}}}," +
"\"price\":{\"type\":\"scaled_float\",\"scaling_factor\":100}," +
"\"tags\":{\"type\":\"keyword\"}" +
"}}",
XContentType.JSON
);