elasticsearch+kafka

发布于:2022-08-05 ⋅ 阅读:(431) ⋅ 点赞:(0)

ElasticSearch+kafka实现日志功能

链接

图片无法看到请点击
代码地址

安装zookeeper

一、下载

zookeeper

  • 不要下载源码

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yyQIPkdY-1659668794311)(D:\notes\kafka\3.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NjhkFBpf-1659668794312)(D:\notes\kafka\5.png)]

二、配置

  • 把zoo_sample.cfg复制一份改名叫zoo.cfg

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uH7sdDbl-1659668794313)(D:\notes\kafka\4.png)]

    #配置以下信息
    dataDir=D:\environment\zks\apache-zookeeper-3.6.3-bin-server\data
    dataLogDir=D:\environment\zks\apache-zookeeper-3.6.3-bin-server\log
    

三、运行

  • 点击bin->zkServer.cmd

安装Kafka

一、下载

Kafka

  • 不要下载源码

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-seQ1YF2g-1659668794313)(D:\notes\kafka\1.png)]

二、配置

  • 打开配置文件

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wwzUE4hC-1659668794314)(D:\notes\kafka\2.png)]

    # 配置以下信息
    # 端口
    listeners=PLAINTEXT://localhost:9092
    # 日志文件
    # 数据文件
    log.dirs=D:\environment\kafka_2.12-3.2.1\logs
    dataDir=D:\environment\kafka_2.12-3.2.1\data
    # zookeeper地址
    zookeeper.connect=localhost:2181
    

三、运行

  • 进入bin->windows执行指令

    kafka-server-start.bat ..\..\config\server.properties
    

springboot+kafka+ElasticSearch

1、引入依赖

 <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>7.14.0</version>
 </dependency>
 <dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.14.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.59</version>
</dependency>

2、配置文件

server:
  port: 9999

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 0
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

  elasticsearch:
    rest:
      uris: localhost:9200

3、控制层

  • 生产者
@RestController
public class KafkaController {

    public static String TOPIC_NAME = "my-replicated-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    static SimpleDateFormat fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @RequestMapping("/send")
    public void send() {
        String date = fm.format(new Date());
        kafkaTemplate.send(TOPIC_NAME, 0, "key", JSON.toJSONString(new Log(date, "" + System.currentTimeMillis())));
    }
}
  • 消费者
@Component
public class KafkaConsumer {

    @Autowired
    private RestHighLevelClient restHighLevelClient;


    @KafkaListener(topics = {"my-replicated-topic"})
    public void onMessage(ConsumerRecord<?, String> record) throws Exception {
        IndexRequest indexRequest = new IndexRequest("logs");
        Log log = JSON.parseObject(record.value(), Log.class);
        if (log.getTime() == null && log.getInfo() == null) return;
        Map<String, String> map = new HashMap<>();
        map.put("time", log.getTime());
        map.put("info", log.getInfo());
        indexRequest.source(map).id(UUID.randomUUID().toString().replace("-",""));
        restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
    }
}
  • 实体层
public class Log implements Serializable {
    private String time;

    private String info;


    public Log(String time, String info) {
        this.time = time;
        this.info = info;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }
}

4、ElasticSearch初始化语句

# 创建索引和mapping
PUT /logs
{
"settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
      "properties": {
      "time": {
        "type": "date",
        "store": true, 
        "format": "yyyy-MM-dd HH:mm:ss"
      },
      "info": {
        "type": "keyword",
        "store": true
      }
    }
  }
}

*注

  • ElasticSearch使用的是7.14.0版本的,里面的设置字段类型为Date,不要转Date不然会报错
  • 真实日志收集是采用AOP去实现的
    e": true,
    “format”: “yyyy-MM-dd HH:mm:ss”
    },
    “info”: {
    “type”: “keyword”,
    “store”: true
    }
    }
    }
    }

# *注

- ElasticSearch使用的是7.14.0版本的,里面的设置字段类型为Date,不要转Date不然会报错
- 真实日志收集是采用AOP去实现的
- 要先启动zookeeper,因为这里Kafka依赖于zookeeper的

网站公告

今日签到

点亮在社区的每一天
去签到