目录
十、生态集成——基于Spring Data Elasticsearch
一、前言提要
Elasticsearch(ES)是一个开源的分布式搜索和分析引擎,基于Apache Lucene构建,专为海量数据的实时搜索、分析和可视化而设计。它是Elastic Stack(ELK Stack)的核心组件,广泛应用于日志分析、监控、电商搜索、安全事件检测等场景。
二、核心特性
特性 | 说明 |
---|---|
分布式架构 | 自动分片、负载均衡、副本容灾,支持水平扩展 |
近实时搜索 | 数据写入后1秒内即可被搜索到(NRT,Near Real-Time) |
RESTful API | 通过HTTP+JSON即可操作,支持多种编程语言(Java、Python、Go等) |
强大的查询语言 | 支持全文检索、聚合分析、地理位置查询、模糊匹配等 |
Schema-Free | 无需预定义严格表结构(动态映射),兼容结构化/非结构化数据 |
高可用性 | 自动发现节点、故障转移,支持跨集群复制(CCR) |
三、应用场景
1. 日志收集与分析
- 使用Logstash/Beats收集日志 → Elasticsearch存储 → Kibana可视化
- 例如:Nginx日志中查找500错误的IP分布。
2. 电商商品搜索
- 支持分词搜索、拼音搜索、价格排序、聚合统计(如品牌、价格区间筛选)。
3. APM(应用性能监控)
- 存储系统指标(CPU、内存)、链路追踪数据(SkyWalking、Jaeger集成ES)。
4. 安全事件检测
- 实时分析网络流量,检测异常行为(如暴力破解)。
四、主要优势
- PB级数据:毫秒级响应(如Twitter每日千亿级日志)。
- 生态完善:与Kafka、Spark、Flink无缝集成。
- 开箱即用:单节点即可启动,无需复杂配置。
五、集成方式
1. 官方高级REST客户端
// Maven依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.9</version> <!-- 使用与你的ES集群匹配的版本 -->
</dependency>
// 初始化客户端
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
2. 新版Java API Client(8.x+推荐)
// Maven依赖
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.11.0</version>
</dependency>
// 初始化客户端
ElasticsearchClient client = new ElasticsearchClient(
RestClient.builder(
new HttpHost("localhost", 9200)
)
);
六、基础操作
1. 索引文档
// 传统方式
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch");
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
// 新版Java API
client.index(i -> i
.index("posts")
.id("1")
.document(new Post("kimchy", new Date(), "trying out Elasticsearch"))
);
2. 搜索文档
// 传统方式
SearchRequest searchRequest = new SearchRequest("posts");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy"));
searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
// 新版Java API
SearchResponse<Post> response = client.search(s -> s
.index("posts")
.query(q -> q
.term(t -> t
.field("user")
.value("kimchy")
)
),
Post.class
);
七、高级特性
1. 批量操作
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("posts").id("2").source(/* your doc */));
bulkRequest.add(new DeleteRequest("posts").id("1"));
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
2. 聚合查询
SearchRequest request = new SearchRequest("sales");
TermsAggregationBuilder aggregation = AggregationBuilders.terms("top_tags")
.field("tags.keyword")
.size(10);
request.source().aggregation(aggregation);
3. 连接池配置
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("user", "password")
);
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http")
).setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setMaxConnTotal(100)
.setMaxConnPerRoute(20)
);
八、概念类比——与关系型数据库
关系型数据库(MySQL) | Elasticsearch | 示例 |
---|---|---|
Database(数据库) | Index(索引) | order_db → order_index |
Table(表) | Type(已废弃)/Mapping | user_table → user_doc |
Row(行) | Document(文档) | {"id":1, "name":"Alice"} |
Column(列) | Field(字段) | name 字段存储字符串 |
Schema(表结构) | Mapping(映射) | 定义字段类型、分词器等 |
SQL查询 | DSL(JSON查询语言) | SELECT * FROM user → GET /user/_search |
九、简单示例——实现存储与搜索
1. 存储文档(PUT)
PUT /products/_doc/1
{
"name": "iPhone 15",
"price": 7999,
"category": "手机"
}
2. 搜索文档(GET)
GET /products/_search
{
"query": {
"match": {
"name": "iPhone"
}
},
"sort": [
{"price": "desc"}
]
}
十、生态集成——基于Spring Data Elasticsearch
1. Spring Boot配置
@Configuration
public class ElasticsearchConfig {
@Bean
public ClientConfiguration clientConfiguration() {
return ClientConfiguration.builder()
.connectedTo("localhost:9200")
.withBasicAuth("user", "password")
.build();
}
@Bean
public ElasticsearchTemplate elasticsearchTemplate() {
return new ElasticsearchTemplate(elasticsearchClient());
}
}
2. 实体映射
@Document(indexName = "posts")
public class Post {
@Id private String id;
@Field(type = FieldType.Text) private String title;
@Field(type = FieldType.Date) private Date date;
// getters/setters
}
3. Repository接口
public interface PostRepository extends ElasticsearchRepository<Post, String> {
List<Post> findByTitleContaining(String title);
@Query("{\"bool\": {\"must\": [{\"match\": {\"title\": \"?0\"}}]}}")
Page<Post> findByTitle(String title, Pageable pageable);
}
十一、性能优化建议
1. 批量处理:使用bulk API进行批量索引
2. 连接管理:合理配置连接池大小
3. 索引优化:合理设置分片数和副本数
4. 查询优化:避免深度分页,使用scroll API处理大数据量
5. 缓存利用:合理使用filter上下文和系统缓存
十二、总结归纳概述
> Elasticsearch = 搜索引擎 + 分布式数据库 + 实时分析工具
> 专为“搜索一切”而生,从代码日志到宇宙射线数据皆可处理!