RabbitMQ

发布于:2025-05-14 ⋅ 阅读:(15) ⋅ 点赞:(0)

     MQ(message queue)即消息队列,RabbitMQ是Rabbit企业下的一个消息队列产品.RabbitMQ是一个实现了AMQP的消息队列服务,是当前主流的消息中间件之一.AMQP,高级消息队列协议,是一个通用的应用层协议,提供统一消息服务的协议,为面向消息的中间件设计.基于此协议的客户端与消息中间件可传递消息,并不受客户端或中间件,开发语言等条件的限制.

      MQ,本质上是个队列,队列中存放的内容是消息,消息可以是字符串或者是内嵌对象等.MQ多用于分布式系统之间的通信.系统之间的通信通常有两种:同步通信,即直接调用对方的服务,数据从一端发出后立即就可以到达另一端.异步通信,数据从一端发出后,先进入一个容器进行临时存储,当达到某种条件之后,再由这个容器发送给另一端.容器的一个具体实现就是MQ.RabbitMQ即使MQ的一种实现.MQ的主要作用是接收并且转发消息.在不同的应用场景之下可以展现不同的作用.

      MQ的主要工作是接收并转发消息,在不同的应用场景之下可以展现出不同的作用.MQ类似于仓库,里面存放的是消息,MQ负责存储和转发消息.当需要完成下面的功能时,可以选择使用MQ.1.异步解耦 2.流量削峰 3.消息分发  4.延迟通知.

      在Ubuntu环境安装RabbitMQ之后,下面介绍一下RabbitMQ的一些核心概念.

    RabbitMQ是一个消息中间件,是一个生产者消费者模型,负责接收,存储转发消息.上图中的Producer:生产者,RabbitMQ的客户端,向RabbitMQ发送消息.Consumer:消费者,RabbitMQ的客户端,从RabbitMQ接收消息.Brocker:用于收发消息.要注意的是,Brocker接收到的来自Producer的消息中,可以带有一定的标签,Brocker会根据标签进行路由,把消息发送给感兴趣的消费者,消费者在接收消息的过程中,标签会被丢掉,消费者只会收到消息,而不知道消息的生产者是谁.Connection和channel.Conncetion:连接.是客户端和RabbitMQ之间的一个TCP连接.这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息.channel:通道,信道.Channel是在Connection之上的一个抽象层,在RabbitMQ中,一个TCP连接可以有多个Channel,每个channel都是独立的虚拟连接.消息的发送和接收都是基于Channel的.通道的作用是将消息的读写操作复用在一个TCP连接上,这样可以减少建立和关闭连接的开销,提高性能. Virtual host:虚拟主机,这是一个虚拟的概念,为消息队列提供了一种逻辑上的隔离机制.对于RabbitMQ而言,一个Broker上可以存在多个Virtual Host.当多个不同的用户使用同一个Broker时,可以虚拟划分出多个vhost,每个用户在自己的virtual host创建exchange/queue等. Queue:队列,是RabbitMQ的内部对象,用于存储消息.多个消费者,可以订阅同一个队列.同样的,一个队列,可以被多个消费者订阅. Exchange:交换机.message到达broker的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或者多个Queue中.Exchange起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息.

    现在来总结一下,RabbitMQ的工作流程,1.Producer生产了一条消息.2.Producer连接到RabbitMQ,建立一个Connetion,并且开启一个信道(channel).3.Producer声明一个交换机(Exchange),路由消息.4.Producer声明一个队列(queue),存放信息.5.Producer发送消息至RabbitMQ Brocker.6.Rabbit MQ接收信息,并存入响应的队列中,如果未找到响应的队列,则根据生产者的配置,选择丢弃或者退回给生产者.

      AMQP是一种高级消息队列协议,AMQP定义了一套确定的消息交换功能,包括交换机,队列等.这些组件共同工作,使得生产者能够将消息发送到交换机,然后由队列接收并且等待消费者接收,AMQP还定义了一个网络协议,允许客户端应用通过该协议与消息代理和AMQP模型进行交互通信.

用户相关操作:

1.添加用户.

2.在用户详情界面的设置.

设置虚拟机权限

更新/删除用户

退出当前用户

虚拟机相关操作:
创建虚拟机创建虚拟机之后,会为当前用户设置虚拟机.

创建生产者,并发送消息.:
 

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory connectionFactory=new ConnectionFactory();
    connectionFactory.setHost("203.33.207.200");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("shiyu");
    connectionFactory.setPassword("shiyu");
    connectionFactory.setVirtualHost("maqiu");
    Connection connection=connectionFactory.newConnection();
    Channel channel=connection.createChannel();
//    这个方法的参数介绍
//    queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
//     queue,队列的名称   durable :可持久化    exclusive:是否独立,只是被一个消费者占用.
//    autoDelete:是否自动删除.arguments 参数
    channel.queueDeclare("hello",true,false,false,null);
    String message="hello,RabbitMQ";
//发送消息.
    channel.basicPublish("","hello",null,message.getBytes(StandardCharsets.UTF_8) );
    System.out.println("已经连接到RabbitMQ");
    channel.close();
    connection.close();
}

}

创建消费者,从RabbitMQ来获取信息. 

package RabbitMQ;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("203.33.207.200");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zhangxuan");
        connectionFactory.setPassword("zhangxuan");
        connectionFactory.setVirtualHost("maqiu");
        Connection connection=connectionFactory.newConnection();
        Channel channel=connection.createChannel();
        channel.queueDeclare("hello",true,false,false,null);
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("从RabbitMQ中接受的消息: "+new String(body));
            }

        };
        channel.basicConsume("hello",true,consumer);
        channel.close();
        connection.close();
    }
}

工作模式的介绍

1.简单模式

P:生产者,发送消息   C:消费者,消费消息.   Queue:消息队列,可以缓存消息,生产者向其中投递消息,消费者从其中取出消息.特点:一个生产者P,一个消费者C.消息只能被消费一次,也称能够为点对点模式.适用于消息只被单个消费者处理的场景.上面写的代码,就是实现了简单模式.

2.Work Queue(工作队列)

一个生产者P,多个消费者C1,C2,当Queue中有多个消息时,C1和C2会分别消费Queue中的消息,消费的消息不会重复.适用于,集群环境中做异步处理.下面代码可以体现工作队列.

6.通配符模式,通配符模式可以看作是路由模式的升级,路由模式的BindingKey是固定的,而通配符较为灵活支持通配符匹配.主要区别如下:1.topics模式使用的交换机类型为topic,路由模式的交换机类型是direct.2.topic类型的交换机在匹配规则上进行了扩展,BindingKey支持通配符匹配.如下图所示,可以看出topic模式中的RoutingKey是一系列由.分隔的单词,*表示一个单词,#表示多个单词.

生产者的代码如下:


import RabbitMQ.Constants.Constants;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
//       1.建立连接
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection=connectionFactory.newConnection();
//        2.开启信道
        Channel channel=connection.createChannel();
//        设置交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true);
//        3.声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE3,true,false,false,null);
        channel.queueDeclare(Constants.TOPIC_QUEUE4,true,false,false,null);
//    4.绑定交换机和队列
        channel.queueBind(Constants.TOPIC_QUEUE3,Constants.TOPIC_EXCHANGE,"*.k.*");
        channel.queueBind(Constants.TOPIC_QUEUE4,Constants.TOPIC_EXCHANGE,"*.*.b");
        channel.queueBind(Constants.TOPIC_QUEUE4,Constants.TOPIC_EXCHANGE,"c.#");
//      5.发送消息
        String msg1="hello topic,my routingkey is *.a.*";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ada.k.sfa",null,msg1.getBytes(StandardCharsets.UTF_8));
        String msg3="hello topic,my routimgkey is c.#";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.sdf",null,msg3.getBytes(StandardCharsets.UTF_8));
        String msg2="hello topic,my routingkey is *.*.b";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ass.hh.b",null,msg2.getBytes(StandardCharsets.UTF_8));
//        6.释放资源
        channel.close();
        connection.close();
    }

}

消费者的代码如下:

package RabbitMQ.Topic;

import RabbitMQ.Constants.Constants;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
//       1.建立连接
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection=connectionFactory.newConnection();
//        2.开启信道
        Channel channel=connection.createChannel();
        channel.queueDeclare(Constants.TOPIC_QUEUE3,true,false,false,null);
//        3.消费消息
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息是: "+new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE3,true,consumer);
        channel.close();
        connection.close();
    }
}
package RabbitMQ.Topic;

import RabbitMQ.Constants.Constants;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
//       1.建立连接
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection=connectionFactory.newConnection();
//        2.开启信道
        Channel channel=connection.createChannel();
        channel.queueDeclare(Constants.TOPIC_QUEUE4,true,false,false,null);
//        3.消费消息
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息是: "+new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE4,true,consumer);
        channel.close();
        connection.close();
    }
}

7.RPC通信,远程过程调用,是一种通过网络从计算机上请求服务,而不需要连接底层网络的技术.RabbitMQ实现RPC通信的过程,大概是通过两个队列实现一个可回调的过程. 

大致流程,客户端发送一个消息到达指定队列,并在消息属性中设置replyTo字段,这个字段指定了一个回调队列,服务端进行处理之后,会把响应结果发送到这个队列.服务端在收到请求之后,会把响应的消息发送到replyTo指定的队列.客户端在回调队列上等待消息,一旦收单响应,会确定响应中的correlationid属性,确保它是所期望的响应.具体代码如下:
client的代码如下:

package RabbitMQ.rpc;
import RabbitMQ.Constants.Constants;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.*;
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//        1.建立连接
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection=connectionFactory.newConnection();
//        2.开启信道
        Channel channel=connection.createChannel();
//        这里使用默认的交换机,不再进行声明
//        声明队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);
//        发送请求
        String msg="hello,rpc";
//        设置唯一的标识符
        String correlationID= UUID.randomUUID().toString();
//        设置请求的相关属性
        AMQP.BasicProperties props=new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
//        发送消息
        channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes(StandardCharsets.UTF_8));
//        接收响应,这里使用阻塞队列来存储响应的信息.
        final BlockingQueue<String> response=new ArrayBlockingQueue<>(1);
//        对接收到的信息进行处理
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                 String respMsg=new String(body);
                System.out.println("接收到的消息是:"+respMsg);
//                如果correlationId保持一致,存入队列
                if(correlationID.equals(properties.getCorrelationId())){
//                    加入队列即可
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);
        String result=response.take();
        System.out.println("[RPC Client 响应结果]: "+result);
        channel.close();
        connection.close();
    }
}

server端的代码如下:

package RabbitMQ.rpc;

import RabbitMQ.Constants.Constants;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
//        1.建立连接
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection=connectionFactory.newConnection();
//        2.开启信道
        Channel channel=connection.createChannel();
//        这里使用默认的交换机,不再进行声明
//        声明队列
//        设置消费者的预取数量.一次消费一条消息
        channel.basicQos(1);
//      接收请求
        DefaultConsumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                在接受到请求之后,就需要进行响应
                String request=new String(body);
                System.out.println("接收到请求:"+request);
//                进行响应
                String response="针对request:"+request+"响应成功";
                AMQP.BasicProperties basicProperties=new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
//                进行响应
                channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes(StandardCharsets.UTF_8));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);
        channel.close();
        connection.close();
    }
}

7.Publisher Confirms(发布确认)

作为消息中间件,都会面临消息丢失的情况.消息的丢失由下面三个方面:

1.生产者问题.因为应用程序的故障,网络抖动等各种原因.生产者没有成功向brocker发送消息.

2.消息中间件问题.生产者成功发送给brocker,但是brocker没有把消息保存好,导致消息丢失.

3.消费者问题.Broker发送消息到消费者,消费者在消费消息时,因为没有处理好,导致brocker将消费失败的消息从队列中删除了. 

针对上述三种情况,RabbitMQ也给出了相应的解决方案.问题1可以通过发布确认机制解决,问题2可以通过持久化机制解决.问题3可以采用消息应答机制. 发布确认 属于RabbitMQ的七大工作模式之一.生产者将信道设置为confirm(确认)模式,当信道进入confirm模式,所有在该信道上发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个包含唯一ID的确认给生产者,这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliveryTag包含了确认消息的序号,此外broker也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都已经得到了处理.

1.单独确认,即broker收到一条消息返回一条确认信息.

private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
        Connection connection=createConnection();
//      1.  开启信道
        Channel channel=connection.createChannel();
//      2.设置信道为confirm模式
        channel.confirmSelect();
//       3.声明队列
        channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);
//        4.发送消息
//        这里发送两百条消息,在开始和结束的时候即使,比较一下三种的效率
        long start=System.currentTimeMillis();
//        因为是单独确认,所以就是发布一条消息,就要确认一条
        for(int i=0;i<MESSAGE_COUNT;i++){
            String msg="hello publisher confirms"+i;
            channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes(StandardCharsets.UTF_8));
//               这里将等待时间设置为5s
            channel.waitForConfirmsOrDie(5000);
        }
        long end=System.currentTimeMillis();
        System.out.printf("单独确认策略,消息条数: %d,耗时: %d ms\n",MESSAGE_COUNT,end-start);
    }

2.批量确认

 private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {
//        批量确认就是返回一个确认序号,以确定确认序号之前的消息都收到了
        Connection connection=createConnection();
        Channel channel=connection.createChannel();
//        设置信道为confirm模式
        channel.confirmSelect();
//        声明队列
        channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null);
        Long start=System.currentTimeMillis();
//        发送消息,并进行确认.每一百条消息进行一次确认
        int batchSize=100;
//        这个变量用来记录已经发送的消息条数
        int outstandingMessageCount=0;
        for(int i=0;i<MESSAGE_COUNT;i++){
            String msg="hello publlisher confirms"+i;
//            发送消息
            channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2,null,msg.getBytes(StandardCharsets.UTF_8));
            outstandingMessageCount++;
            if(outstandingMessageCount==batchSize){
                channel.waitForConfirmsOrDie(5000);
                outstandingMessageCount=0;
            }

        }
        if(outstandingMessageCount>0){
            channel.waitForConfirmsOrDie(5000);
        }
        long end=System.currentTimeMillis();
        System.out.printf("批量确认策略,消息条数: %d,耗时: %d ms\n",MESSAGE_COUNT,end-start);
    }

3. 


网站公告

今日签到

点亮在社区的每一天
去签到