概述
简介
RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ, 是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目, 具有高性能、高可靠、高实时、分布式特点。
历史
1.2010年:阿里巴巴B2B部门基于ActiveMQ的5.1版本也开发了自己的一款消息引擎,称为Napoli, 这款消息引擎在B2B里面广泛地被使用,不仅仅是在交易领域,在很多的后台异步解耦等方面也得到了广泛的应用。
2.2011年:业界出现了现在被很多大数据领域所推崇的Kafka消息引擎,阿里巴巴在研究了Kafka的整体机制和架构设计之后, 基于Kafka的设计使用Java进行了完全重写并推出了MetaQ 1.0版本,主要是用于解决顺序消息和海量堆积的问题。
3.2012年:阿里巴巴开源其自研的第三代分布式消息中间件——RocketMQ。经过几年的技术打磨, 阿里称基于RocketMQ技术,目前双十一当天消息容量可达到万亿级。
4.2016年11月:阿里将RocketMQ捐献给Apache软件基金会,正式成为孵化项目。 阿里称会将其打造成顶级项目。这是阿里迈出的一大步,因为加入到开源软件基金会需要经过评审方的考核与观察。
5.2017年2月20日:RocketMQ正式发布4.0版本,专家称新版本适用于电商领域,金融领域,大数据领域,兼有物联网领域的编程模型。
功能
1.是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点; 2.Producer、Consumer、队列都可以分布式;
3.Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费, 则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;
4.能够保证严格的消息顺序;
5.提供丰富的消息拉取模式;
6.高效的订阅者水平扩展能力;
7.实时的消息订阅机制;
8.亿级消息堆积能力;
9.较少的依赖。
核心概念
生产者和消费者
生产者负责生产消息,一般由业务系统负责生产消息
消费者即后台系统,它负责消费消息
消息模型
消息模型主要有队列模型和发布订阅模型,RabbitMQ采用的是队列模型,RocketMQ采用发布订阅模型
主题
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位
队列
存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。 一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。
技术架构
NameServer
NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。
主要包括两个功能:
1.Broker管理:接受Broker集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查Broker是否还存活。
2.路由信息管理:每个NameServer中都保存着Broker集群的整个路由信息和用于客户端查询的队列信息。 Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息,从而进行消息的投递和消费。
1)路由发现
2)路由剔除
3)路由注册
4)客户端选择NameServer的策略
Broker
Broker充当着消息的中转角色,负责存储消息、转发消息。 Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。 Broker同时也存储着消息相关的元数据,包括 消费者组消费进度偏移offset、主题、队列等。
Producer
消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列 (先选择Broker,再选择队列)进行消息投递,投递的过程支持快速失败并且低延迟。
Consumer
消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理。 RocketMQ中的消息消费者都是以消费者组(ConsumerGroup)的形式出现的。 消费者组是同一类消费者的集合,这类Consumer消费的是相同Topic类型(可以是一个, 可以是多个,但是要相同)、并且是相同的tag(可以是一个,可以是多个,但是要相同)(保证订阅关系的一致性)。 消费者组使得在消息消费方面,容易实现
安装
系统要求
64位操作系统,推荐 Linux/Unix/macOS 64位
JDK 1.8+
安装jdk
查看jdk是否安装:java -version
下载网址:下载地址
下载:wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
解压:tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local
修改环境变量【注意jdk版本可能不同】:sudo vim /etc/profile
export JAVA_HOME=/usr/local/jdk-17.0.11
export PATH=$PATH:$JAVA_HOME/bin
生效环境变量:source /etc/profile
安装rocketmq
源码包:源码包下载地址
二进制包: 二进制包下载地址
1.解压文件:unzip rocketmq-all-5.0.0-bin-release.zip -d /usr/local
2.修改文件名:mv rocketmq-all-5.0.0-bin-release rocketmq
3.修改环境变量:vim /etc/profile
export JAVA_HOME=/usr/local/jdk-17.0.11
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$JAVA_HOME/bin:$ROCKETMQ_HOME/bin
生效环境变量:source /etc/profile
4.修改runbroker.sh文件:vim /usr/local/rocketmq/bin/runbroker.sh
修改后:$JAVA ${JAVA_OPT} --add-exports=java.base/sun.nio.ch=ALL-UNNAMED $@
修改后:JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
修改后:JAVA_OPT="${JAVA_OPT} -Xmn256m -XX:+UseConcMarkSweepGC
1)添加启动参数
2)通过上述修改,将初始堆内存512M,最大堆内存设置为512M,新生代(Java中用于存储创建对象的部分)设置为256M,
修改完成后便可以正常启动broker以及查看日志。
启动rocketmq
1.启动NameServer:nohup sh mqnamesrv &
2.启动broker:nohup sh ./mqbroker -n localhost:9876 &
启动后生成日志,查看日志:cat /usr/local/rocketmq/bin/nohup.out
显示当前系统的java进程情况及进程id:jps
3.关闭namesrv服务:mqshutdown namesrv
4.关闭broker服务:mqshutdown broker
Topic命令
修改或创建一个Topic
示例:mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic
命令:mqadmin updateTopic -b | -c [-h] [-n ] [-o ] [-p ] [-r ] [-s ] -t [-u ] [-w ]
参数: -n: name server地址列表 -c: cluster 名称,表示topic 建在该集群 -t: 设置topic名称 -h: 打印help信息 -o: 设置topic是否为有序的 取值:true、false(默认) -p: 设置topic的权限
从broker和nameserver删除topic
示例:mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t testtopic
命令:mqadmin deleteTopic -c [-h] [-n ] -t
参数: -n: name server地址列表 -c: cluster 名称,表示topic 建在该集群 -t: 设置topic名称 -h: 打印help信息
从nameserver列出所有topic
示例:mqadmin topicList -n localhost:9876
命令:mqadmin topicList [-c] [-h] [-n ]
参数: -n: name server地址列表 -c: cluster 名称,表示topic 建在该集群 -h: 打印help信息
检查topic的状态信息
示例:mqadmin topicStatus -n localhost:9876 -t testtopic
命令:mqadmin topicStatus [-h] [-n ] -t
参数: -n: name server地址列表 -c: cluster 名称,表示topic 建在该集群 -t: 设置topic名称 -h: 打印help信息
清理未使用的topic
命令:mqadmin cleanUnusedTopic [-b ] [-c ] [-h] [-n ]
参数: -n: name server地址列表 -b: broker地址 -c: 集群名称 -h: 打印help信息