【RocketMQ学习笔记篇一】

发布于:2022-08-07 ⋅ 阅读:(412) ⋅ 点赞:(0)

一、MQ的概念:

消息模型:
  • ?RocketMQ主要由 ProducerBrokerConsumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 BrokerMessage Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
  • ?Producer:
    • 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
  • ?Consumer:
    • 负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
  • ?Broker:
    • 负责存储消息,生产者发送的消息存储在这里而这里则是由多个队列组成,主要是担任中转、代理的角色。

在这里插入图片描述

MQ的优缺点:

优点:

  • ?应用解耦,客户端不直接和服务端打交道
  • ?适应应用的快速变更,服务器数量的变更
  • ?流量削峰,突发的流量不会直接打到服务端

缺点:

  • ?系统可用性降低,一旦 MQ 挂掉客户端发送不了消息和服务端拉去不到消息
  • ?系统负责度提高,增加了一项技术自然复杂度提高了
  • ?异步消息机制
    • 1️⃣ 消息的顺序性
    • 2️⃣ 消息的丢失
    • 3️⃣ 消息的一致性
    • 4️⃣ 消息重复使用
MQ的产品分类:
名称 开发语言 吞吐量 处理时间 架构 亮点
ActiveMQ Java 万级 ms级 主从 成熟度高
RabbitMQ Erlang 万级 us级 主从 性能更好
RocketMQ Java 十万级 ms级 分布式 扩展性强
kafka Scala 十万 ms级 分布式 多应用于大数据

?RocketMQ完美解决MQ的缺点

二、环境搭建:

?下载:

?下载地址:https://rocketmq.apache.org/docs/quick-start/

在这里插入图片描述

?版本支持:

在这里插入图片描述

?安装:
  • JDK1.8 我想这是必要条件

  • 上传到Liunx 后解压即可

  • 启动Name Server

     nohup sh bin/mqnamesrv &
     tail -f ~/logs/rocketmqlogs/namesrv.log
     The Name Server boot success...
    
  • 启动Broker

     nohup sh bin/mqbroker -n localhost:9876 &
     tail -f ~/logs/rocketmqlogs/broker.log 
     The broker[%s, 172.30.30.233:10911] boot success...
    
  • 如果你在启动的时候出现这个错:

在这里插入图片描述

  • 那么你需要配置下JVM的给与Broker的内存大小, /bin/runbroker.sh

在这里插入图片描述

三、消息发送:

1️⃣ 导入客户端依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.3</version>
</dependency>
2️⃣ 编写生成者Producer
public class Producer {
    public static void main(String[] args) throws Exception {

        // 1.创建mq对象
        DefaultMQProducer producer = new DefaultMQProducer();
        //2.设置组
        producer.setProducerGroup("group");
        //3.设置nameServer的地址
        producer.setNamesrvAddr("43.142.107.50:9876");
        //4.开启发送
        producer.start();
        //5.创建消息对象
        Message message = new Message("topic1","发送的消息".getBytes(StandardCharsets.UTF_8));
        //6.发送消息
        SendResult sendResult = producer.send(message); //指定超时时间
        System.out.println(sendResult);
        //7.关闭连接
        producer.shutdown();

    }
}
3️⃣ 编写消费者Consumer
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建mq对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
        //2.设置nameServer的地址
        consumer.setNamesrvAddr("43.142.107.50:9876");
        //3.设置topic以及flag
        consumer.subscribe("topic1","*");
        //4.注册一个监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("接收到的消息:" + new String(messageExt.getBody()));
                    System.out.println(messageExt);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //.开启发送
        consumer.start();
    }
}

?如果你启动Producer 报如下错误,你只需要在发送消息的时候指定超时时间即可

在这里插入图片描述

? 如果任然出现和我个一样的错,并且你使用的是云服务器,那么你需要开发9876和10911端口,如果你使用的虚拟机那么你应该关闭防火墙

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到