ElasticSearch+kafka实现日志功能
链接
安装zookeeper
一、下载
不要下载源码
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yyQIPkdY-1659668794311)(D:\notes\kafka\3.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NjhkFBpf-1659668794312)(D:\notes\kafka\5.png)]
二、配置
把zoo_sample.cfg复制一份改名叫zoo.cfg
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uH7sdDbl-1659668794313)(D:\notes\kafka\4.png)]
#配置以下信息 dataDir=D:\environment\zks\apache-zookeeper-3.6.3-bin-server\data dataLogDir=D:\environment\zks\apache-zookeeper-3.6.3-bin-server\log
三、运行
- 点击bin->zkServer.cmd
安装Kafka
一、下载
- 不要下载源码
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-seQ1YF2g-1659668794313)(D:\notes\kafka\1.png)]
二、配置
打开配置文件
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wwzUE4hC-1659668794314)(D:\notes\kafka\2.png)]
# 配置以下信息 # 端口 listeners=PLAINTEXT://localhost:9092 # 日志文件 # 数据文件 log.dirs=D:\environment\kafka_2.12-3.2.1\logs dataDir=D:\environment\kafka_2.12-3.2.1\data # zookeeper地址 zookeeper.connect=localhost:2181
三、运行
进入bin->windows执行指令
kafka-server-start.bat ..\..\config\server.properties
springboot+kafka+ElasticSearch
1、引入依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.14.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.59</version>
</dependency>
2、配置文件
server:
port: 9999
spring:
kafka:
bootstrap-servers: localhost:9092
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 0
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
elasticsearch:
rest:
uris: localhost:9200
3、控制层
- 生产者
@RestController
public class KafkaController {
public static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
static SimpleDateFormat fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@RequestMapping("/send")
public void send() {
String date = fm.format(new Date());
kafkaTemplate.send(TOPIC_NAME, 0, "key", JSON.toJSONString(new Log(date, "" + System.currentTimeMillis())));
}
}
- 消费者
@Component
public class KafkaConsumer {
@Autowired
private RestHighLevelClient restHighLevelClient;
@KafkaListener(topics = {"my-replicated-topic"})
public void onMessage(ConsumerRecord<?, String> record) throws Exception {
IndexRequest indexRequest = new IndexRequest("logs");
Log log = JSON.parseObject(record.value(), Log.class);
if (log.getTime() == null && log.getInfo() == null) return;
Map<String, String> map = new HashMap<>();
map.put("time", log.getTime());
map.put("info", log.getInfo());
indexRequest.source(map).id(UUID.randomUUID().toString().replace("-",""));
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
}
- 实体层
public class Log implements Serializable {
private String time;
private String info;
public Log(String time, String info) {
this.time = time;
this.info = info;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
}
4、ElasticSearch初始化语句
# 创建索引和mapping
PUT /logs
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"time": {
"type": "date",
"store": true,
"format": "yyyy-MM-dd HH:mm:ss"
},
"info": {
"type": "keyword",
"store": true
}
}
}
}
*注
- ElasticSearch使用的是7.14.0版本的,里面的设置字段类型为Date,不要转Date不然会报错
- 真实日志收集是采用AOP去实现的
e": true,
“format”: “yyyy-MM-dd HH:mm:ss”
},
“info”: {
“type”: “keyword”,
“store”: true
}
}
}
}
# *注
- ElasticSearch使用的是7.14.0版本的,里面的设置字段类型为Date,不要转Date不然会报错
- 真实日志收集是采用AOP去实现的
- 要先启动zookeeper,因为这里Kafka依赖于zookeeper的