一、MQ的概念:
消息模型:
- ?
RocketMQ
主要由Producer
、Broker
、Consumer
三部分组成,其中Producer
负责生产消息,Consumer
负责消费消息,Broker
负责存储消息。Broker
在实际部署过程中对应一台服务器,每个Broker
可以存储多个Topic
的消息,每个Topic
的消息也可以分片存储于不同的Broker
。Message Queue
用于存储消息的物理地址,每个Topic
中的消息地址存储于多个Message Queue
中。ConsumerGroup
由多个Consumer
实例构成。 - ?
Producer
:- 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到
broker
服务器。RocketMQ
提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker
返回确认信息,单向发送不需要。
- 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到
- ?
Consumer
:- 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从
Broker
服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
- 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从
- ?
Broker
:- 负责存储消息,生产者发送的消息存储在这里而这里则是由多个队列组成,主要是担任中转、代理的角色。
MQ的优缺点:
优点:
- ?应用解耦,客户端不直接和服务端打交道
- ?适应应用的快速变更,服务器数量的变更
- ?流量削峰,突发的流量不会直接打到服务端
缺点:
- ?系统可用性降低,一旦
MQ
挂掉客户端发送不了消息和服务端拉去不到消息 - ?系统负责度提高,增加了一项技术自然复杂度提高了
- ?异步消息机制
- 1️⃣ 消息的顺序性
- 2️⃣ 消息的丢失
- 3️⃣ 消息的一致性
- 4️⃣ 消息重复使用
MQ的产品分类:
名称 | 开发语言 | 吞吐量 | 处理时间 | 架构 | 亮点 |
---|---|---|---|---|---|
ActiveMQ | Java | 万级 | ms级 | 主从 | 成熟度高 |
RabbitMQ | Erlang | 万级 | us级 | 主从 | 性能更好 |
RocketMQ | Java | 十万级 | ms级 | 分布式 | 扩展性强 |
kafka | Scala | 十万 | ms级 | 分布式 | 多应用于大数据 |
?RocketMQ完美解决MQ的缺点
二、环境搭建:
?下载:
?下载地址:https://rocketmq.apache.org/docs/quick-start/
?版本支持:
?安装:
JDK1.8
我想这是必要条件上传到
Liunx
后解压即可启动
Name Server
nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
启动
Broker
nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success...
如果你在启动的时候出现这个错:
- 那么你需要配置下
JVM
的给与Broker
的内存大小,/bin/runbroker.sh
三、消息发送:
1️⃣ 导入客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
2️⃣ 编写生成者Producer
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建mq对象
DefaultMQProducer producer = new DefaultMQProducer();
//2.设置组
producer.setProducerGroup("group");
//3.设置nameServer的地址
producer.setNamesrvAddr("43.142.107.50:9876");
//4.开启发送
producer.start();
//5.创建消息对象
Message message = new Message("topic1","发送的消息".getBytes(StandardCharsets.UTF_8));
//6.发送消息
SendResult sendResult = producer.send(message); //指定超时时间
System.out.println(sendResult);
//7.关闭连接
producer.shutdown();
}
}
3️⃣ 编写消费者Consumer
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1.创建mq对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
//2.设置nameServer的地址
consumer.setNamesrvAddr("43.142.107.50:9876");
//3.设置topic以及flag
consumer.subscribe("topic1","*");
//4.注册一个监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("接收到的消息:" + new String(messageExt.getBody()));
System.out.println(messageExt);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//.开启发送
consumer.start();
}
}
?如果你启动
Producer
报如下错误,你只需要在发送消息的时候指定超时时间即可
? 如果任然出现和我个一样的错,并且你使用的是云服务器,那么你需要开发9876和10911端口,如果你使用的虚拟机那么你应该关闭防火墙