【RocketMq源码篇-01】环境搭建、基本使用、可视化界面

发布于:2025-05-01 ⋅ 阅读:(12) ⋅ 点赞:(0)

RocketMq源码核心篇整体栏目


内容 链接地址
【一】环境搭建、基本使用、可视化界面 https://zhenghuisheng.blog.csdn.net/article/details/147481401

如需转载,请附上链接:https://blog.csdn.net/zhenghuishengq/article/details/147481401

一,RocketMq源码分析

在现在流行的mq消息中间件中,rocketMq和kafka如今是占市面主流,其二者底层思想相互借鉴,又由于rocketMq底层源码是通过java代码实现,因此这里优先考虑研究rocketMq的底层,能把rocketMq中间件熟练掌握后,那么kafka自然也能够融汇贯通。从本系列开始正式迈入rocketMq源码篇章,在保证会用的同时,也能够知道其底层原理和内部实现,同时也能够学习内部优秀的源码设计和风格。

在rocketmq中,5.x版本是grpc协议实现,4.x版本还是基于netty协议,并且5.x内部模块相对于4.x版本重构了很多,文档等也相对较少,因此为了深度的学习rocketMq的底层源码,我们先从4.x的版本学起,这里推荐使用4.9.4版本,其源码地址如下:https://github.com/apache/rocketmq/tree/release-4.9.4

也可以直接使用我gitee仓库现成的源码,也包含一些案例和注释:https://gitee.com/zhenghuisheng/rocketmq-source-code-learning

可视化界面源码,里面的jar包也能直接使用:https://gitee.com/zhenghuisheng/rocketPageHome

1. docker安装rocketMq

docker拉取rocketmq的镜像,这里推荐4.9.4版本,因此这里还是拉取4.9.4的镜像

docker pull apache/rocketmq:4.9.4

启动nameServer,同时设置初始的堆内存大小

//运行新的nameServer    
docker run -d --name rmqnamesrv -p 9876:9876 -e "JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m" apache/rocketmq:4.9.4 sh mqnamesrv

在执行启动broker命令前,需要在配置文件中先设置一些参数,在 /root/rocketmq/broker.conf 配置文件中,这个文件就是linux环境下的原生文件,用于挂载映射到容器内部,没有这个文件的话需要创建一下,下面的namesrvAddr和brokerIP1里面的ip,需要替换成自己服务器的ip

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
namesrvAddr=159.75.102.237:9876
brokerIP1=159.75.102.237

启动broker,这里是4.x的版本启动方式,同时调整了初始的堆栈大小,因为默认的是4g内存,像我总共只有2核4g的服务器,刚启动就直接把内存给撑爆了,因此需要适当的去调整一下

//运行broker    
docker run -d --name rmqbroker -p 10911:10911 -p 10909:10909 -e "NAMESRV_ADDR=159.75.102.237:9876" -e "JAVA_OPT_EXT=-Xms512m -Xmx512m -Xmn256m -Duser.home=/home/rocketmq" -v /root/rocketmq/broker.conf:/opt/rocketmq/conf/broker.conf apache/rocketmq:4.9.4 sh mqbroker -c /opt/rocketmq/conf/broker.conf -n 159.75.102.237:9876

执行完这两条命令之后,执行在运行容器的命令即可,broker和nameserver全部启动起来了

docker ps

在这里插入图片描述

然后去对应的服务器把防火墙端口打开,分别是nameServer的9876端口和broker对应的10911、10909端口

2. rocketMq基本使用

2.1,创建topic主题

创建topic主题,可以直接在容器内部手动的去创建主题,进入broker容器内部,然后先创建一个测试的主题,如我这边新建一个 zhsTopic 的主题

# 进入broker容器内部
docker exec -it rmqbroker bash
cd /home/rocketmq/rocketmq-4.9.4/bin
export NAMESRV_ADDR=159.75.102.237:9876
# 创建 topic,绑定到默认集群和 broker 上
sh mqadmin updateTopic -n 159.75.102.237:9876 -c DefaultCluster -t zhsTopic

后续出现 create topic to 172.17.0.3:10911 success. 表示主题创建成功

在这里插入图片描述

2.2,java基础代码测试

引入项目依赖,我调试的源码版本是4.9.4,所以后续都通过这个

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
	<version>4.9.4</version>
</dependency>        
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>

接下来通过java程序模拟两个客户端,消费者的代码如下,setNamesrvAddr需要设置成对应的ip和开放的端口号,主题设置成自定义的主题

/**
 *
 * @Author zhenghuisheng
 * @Date:2025/4/15 19:24
 */
public class SimpleConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo-consumer-group");
        consumer.setNamesrvAddr("159.75.102.237:9876");
        consumer.subscribe("zhsTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("收到消息:%s%n", new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("消费者已启动");
    }
}

生产者的代码如下,同样也是设置对应的ip和端口以及主题,启动成功后手动销毁结束一下

/**
 *
 * @Author zhenghuisheng
 * @Date:2025/4/15 19:24
 */
public class SimpleProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("demo-producer-group");
        producer.setNamesrvAddr("159.75.102.237:9876");
        producer.start();
        Message msg = new Message("zhsTopic", "TagA", "Hello RocketMQ!".getBytes());
        SendResult sendResult = producer.send(msg);
        System.out.printf("发送结果:%s%n", sendResult);
        producer.shutdown();
    }
}

最后先启动消费者,然后再启动生产者,其对应的日志如下

生产者日志:发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000013EF818B4AAC26B8EE9230000, offsetMsgId=9F4B66ED00002A9F0000000000000178, messageQueue=MessageQueue [topic=zhsTopic, brokerName=broker-a, queueId=3], queueOffset=0]
21:15:27.970 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[159.75.102.237:9876] result: true
21:15:27.973 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[159.75.102.237:10911] result: true

消费者日志:

消费者已启动
收到消息:Hello RocketMQ!

2.3,结合springboot使用

一般整合springboot时,都要引入对应的starer让spring自动注入到容器中,因此这里也引入对应的starer,这道理版本选择2.2.3即可

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

然后消费者代码如下,直接@Service或者@Component都可以,使用注解 RocketMQMessageListener 即可监听消息并且消费

/**
 *
 * @Author zhenghuisheng
 * @Date:2025/4/21 21:22
 */
@Service
@RocketMQMessageListener(topic = IndexConstant.CUSTOM_TOPIC, consumerGroup = "demo-consumer-group")
public class MQConsumerService implements RocketMQListener<String> {

    public void onMessage(String message) {
        System.out.println("收到消息: " + message);
    }

}

常量如下,定义上面要取的topic主题

@Data
public class IndexConstant implements Serializable {
    public static final String CUSTOM_TOPIC = "zhsTopic";
}

生产者就比较简单,直接使用这个 RocketMQTemplate 模板即可,内部是使用刚刚引入的starer中的注解

/**
 *
 * @Author zhenghuisheng
 * @Date:2025/4/21 21:21
 */
@Slf4j
@Service
public class MQProducerService {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String message) {
        log.info("发送的主题为,{} --- 消息为,{}", IndexConstant.CUSTOM_TOPIC, message);
        rocketMQTemplate.convertAndSend(IndexConstant.CUSTOM_TOPIC, message);
    }
}

最后定义一个controller发送消息即可,消息从接口处传入

/**
 *
 * @Author zhenghuisheng
 * @Date:2025/4/21 21:28
 */
@RestController
@RequestMapping("/mq")
@Slf4j
public class MqController {

    @Resource
    private MQProducerService mqProducerService;

    @GetMapping("/sendMessage")
    public R<String> sendMessage(String message) {
        mqProducerService.sendMessage(message);
        return R.ok(message);
    }
}

3. 安装可视化界面

在使用rocketmq时,可以用官方推荐的可视化界面进行查看消息的消费情况以及集群等问题,对相关问题排查也比较友好,因此这里选择使用官方推荐的 https://github.com/apache/rocketmq-dashboard/tags ,这里目前有两个版本,2.0版本是为了rocketMq5.0版本之后使用的,会涉及到一些grpc等,因此先使用下面的1.0版本

在这里插入图片描述

可以将代码拉到本地,解压后本地先进行一些配置修改,如yml中 namesrvAddr 地址,以及修改一些默认的端口号等

rocketmq.config.namesrvAddr=
server.port=8888

随后本地通过maven打一个jar包,将这个jar包发到服务器上面,随后可以看到此时已经build success

在这里插入图片描述

随后将本地打出来的包发布到服务器上,也可以在本地执行一下,是可以运行的

在这里插入图片描述

我把它放在/opt/rocketmq目录下面,由于java的可移植性,windows打出的包linux也可以运行

在这里插入图片描述

随后执行服务启动命令,java -jar rocketmq-dashboard-1.0.0.jar 即可,端口号是改成了8888

在这里插入图片描述

随后打开ip+端口号,像8888端口号的话,需要开启防火墙,开放端口号,直接进入这个可视化界面这样就完成可视化几面的安装

在这里插入图片描述

可以看到刚刚新建的主题zhsTopic, 可以在这里查看状态,手动的发消息等

在这里插入图片描述

以及消费者状态,看每个topic主题消费的情况等,以及生产者等等

在这里插入图片描述

这个可视化界面在后续用到时也可以详细的描述他的功能


网站公告

今日签到

点亮在社区的每一天
去签到