Springboot3.0 集成 RocketMQ5

发布于:2025-07-28 ⋅ 阅读:(15) ⋅ 点赞:(0)
1 引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.1</version>
</dependency>

2配置
rocketmq:
  name-server: 192.168.150.50:9876
  producer:
    group: dist-test # 生产者组
  pull-consumer: # pull模式消费者
    group: test
    topic: MyTopic

3 启动类引入配置
创建主题:
[root@localhost bin]# sh mqadmin updateTopic -n 192.168.150.50:9876 -b 192.168.150.50:10911 -t MyTopic -w 4 -r 4
create topic to 192.168.150.50:10911 success.
TopicConfig [topicName=MyTopic, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]

-n 192.168.150.50:9876:Name Server地址。
-b 192.168.150.50:10911:Broker地址。
-t MyTopic:主题名称。
-w 4:写队列数量。
-r 4:读队列数量。

引入配置类
@SpringBootApplication
@ImportAutoConfiguration({RocketMQAutoConfiguration.class})

4 测试,用SendController测试发送,用SendController测试接收

// RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive
// RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend

broker的配置也可以不用配置
namesrvAddr=
autoCreateTopicEnable=true
brokerIP1=192.168.150.50
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/rocketmq")
public class SendController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send")
    public String send() {
        /**
         * 普通消息:只负责发送无需等待响应
         * 参数1:topic:tag tag可省略不写
         * 参数2:Object类型的消息体
         * 消费监听器:一般配合着 RocketMQListener<T> 使用
         */
        rocketMQTemplate.convertAndSend("MyTopic2", "hello world");

//        Message message = new Message("topic", "tag", "key", "message body".getBytes());
//        message.putUserProperty("REPLY_TO_CLIENT", "yourClientID"); // Set the reply property
//        producer.send(message);
        return "success";
    }

    @GetMapping("/send2")
    public String send2() {
        /**
         * 普通消息:等待消费者响应
         * 参数信息和上面一样
         * 消费者监听器:一般配合着 RocketMQReplyListener<T, R> 使用
         */
        String res = rocketMQTemplate.sendAndReceive("MyTopic", "hello RocketMQ", String.class);
        return res;
    }
}

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.stereotype.Component;

// RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive
// RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend

@Slf4j
@Component
@RocketMQMessageListener(topic = "MyTopic", consumerGroup = "test_consumer")
public class TestRocketMQMessageListener implements RocketMQReplyListener<String, String> {

    @Override
    public String onMessage(String s) {
        log.info("接收到RocketMQ消息[topic={}] ======> {}", "test", s);
        return SendStatus.SEND_OK.name();
    }
}

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

// RocketMQReplyListener<T, R> 是一个接口,需要返回值的监听器,两个泛型分别是接收消息的类型和返回值类型,对应的发送者rocketMQTemplate.sendAndReceive
// RocketMQListener<T> 无需返回值 T 为接收消息的类型,对应的发送者rocketMQTemplate.convertAndSend

@Slf4j
@Component
@RocketMQMessageListener(topic = "MyTopic2", consumerGroup = "test_consumer2")
public class TestRocketMQMessageListener2 implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("接收到RocketMQ消息[topic={}] ======> {}", "test", s);
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到