Kafka简单使用

发布于:2025-02-10 ⋅ 阅读:(88) ⋅ 点赞:(0)

说明:kafka是一款消息中间件,可实现微服务之间的异步调用。本文介绍kafka的简单使用。windows操作系统下的kafka安装,参考下面这篇文章

启动

按照上面博客的介绍,使用CMD命令启动,如下:

在这里插入图片描述

Demo

Github上有一个现成的Demo,地址:https://github.com/xiaour/SpringBootDemo,clone到本地,里面有一个kafka的demo,打开。

在这里插入图片描述

启动

打开后pom文件中这个版本号需要修改成如下,不然启动会提示一个方法没找到

在这里插入图片描述

其他配置都可以不改(当然如果端口冲突了,可以换个端口),启动

在这里插入图片描述

运行

接着来看下代码,代码中消息生产者、消费者如下:

(消息生产者,手动创建一个对象,推到名为test的topic里)

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static Gson gson = new GsonBuilder().create();

    //发送消息方法
    public void send() {
        Message message = new Message();
        message.setId("KFK_"+System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        kafkaTemplate.send("test", gson.toJson(message));
    }
}

(Message是自定义对象,如下)

import java.util.Date;

public class Message {

    private String id;

    private String msg;

    private Date sendTime;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

(消息消费者,监听名为test的topic,打印消息内容)

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
public class Consumer {

    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?, ?> record){
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("---->"+record);
            System.out.println("---->"+message);
        }
    }
}

定义一个controller,手动触发,发送一个消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class SendController {

    @Autowired
    private Producer producer;

    @RequestMapping(value = "/send")
    public String send() {
        producer.send();
        return "{\"code\":0}";
    }
}

调用接口,往kafka中发送一个消息

在这里插入图片描述

可在kafka的可视化界面中,看到消息内容

在这里插入图片描述

控制台,可见消息消费者这边消费了消息,打印了消息内容

在这里插入图片描述

到这,kafka的简单使用就完成了。

实际开发中,可以在业务需要的地方发送消息到kafka中,如发送验证码、数据存入缓存、资源上传到OSS、生成静态资源文件等一些不需要实时进行的操作,可以发个消息到kafka,在消息消费者这边完成对应的业务逻辑。

总结

本文介绍了kafka在Spring Boot中的简单使用


网站公告


今日签到

点亮在社区的每一天
去签到