一、Producer和Consumer

二、Connetion和Channel
Connection: 连接.是客户端和RabbitMQ服务器之间的一个TCP连接.这个连接是建立在消息传递的基础上的,它负责传输客户端和服务器之间的所有数据金额控制信息。
Channel: 通道,信道 Channel 是 在Connection之上的一个抽象层,在RabbitMQ,一个TCP连接可以有多个Channel,每个Channel 都是独立的虚拟连接 消息的发送和接收都是基于Channel的。通到的主要作用是将消息的读写操作复用到同一个TCP连接上,这样可以减少建立和关闭连接的开销,提升性能。
三、Virtual host
Virtual host: 虚拟主机 这是一个虚拟概念,它为消息队列提供了一种逻辑上的隔离机制. 对于
RabbitMQ而言,一个BrokerServer上可以存多个Virtual Host. 当多个不同的用户使用同一个RabbitMQ Server提供的服务是 可以划分多个vhost 每个用户在自己的vhost创建exchange/queue等
类似MySQL的"database", 是⼀个逻辑上的集合. ⼀个MySQL服务器可以有多个database.
四、Queue
Queue:队列,是RabbitMQ的内部对象,用户存储消息
多个消费者,可以订阅同一个队列
五、Exchange
AMQP(Advanced Message Queuing Protocol)是一种高级消息队列协议,AMQP定义了一套确定的消息交换功能,包括交换器(Exchange),队列(Queue)等 这些组件共同工作,使得生产者能够将消息发送到交换器,然后由队列接收并等待消费者接收,AMQP还定义了一个网络协议,允许客户端应用通过该协议与消息代理和AMQP模型进行交互通信

五、初识RabbitMQ
步骤:
a 引入pom依赖
b 生产者代码
c 消费者代码
5.1 引入pom依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
5.2 生产者代码
package rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立连接
// 创建一个ConnectionFactory实例来配置RabbitMQ连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置RabbitMQ服务器的主机地址
connectionFactory.setHost("8.136.108.248");
// 设置RabbitMQ服务器的端口号
connectionFactory.setPort(5672);
// 设置登录RabbitMQ服务器的用户名
connectionFactory.setUsername("pinkboy");
// 设置登录RabbitMQ服务器的密码
connectionFactory.setPassword("123456");
// 设置RabbitMQ服务器的虚拟主机
connectionFactory.setVirtualHost("/");
// 使用ConnectionFactory创建一个新的连接
Connection connection = connectionFactory.newConnection();
//2、创建通道
Channel channel = connection.createChannel();
//3、声明交换机
//4、声明队列
/**
* 声明一个队列
*
* @param channel RabbitMQ的通道,用于执行队列操作
*
* 此处使用了queueDeclare方法来声明一个名为"hello"的队列该方法的参数分别表示:
* 1. 队列名称("hello"):指定要声明的队列的名称
* 2. true:表示该队列是持久化的,意味着即使RabbitMQ服务重启,队列也会被保留
* 3. false:表示该队列不是排他的,意味着该队列可以被所有通道共享
* 4. false:表示该队列不会在使用后自动删除,需要手动删除
* 5. null:表示不设置额外的参数
*
* 选择这些参数值的原因可能是希望创建一个持久化的、共享的队列,以便在不同的时间点和不同的消费者之间传递消息
*/
channel.queueDeclare("hello", true, false, false, null);
//5、发送消息
// 循环发送消息到 RabbitMQ 的 "hello" 队列中
for (int i = 0; i < 10; i++) {
// 构造消息内容
String msg = "hello rabbitmq~" + i;
// 发布消息到队列,使用空交换机名称表示使用默认交换机
// 第三个参数为消息的属性,null 表示使用默认属性
// 将消息内容转换为字节数组作为消息体发送
channel.basicPublish("", "hello", null, msg.getBytes());
}
System.out.println("消息发送成功!");
//6、释放资源
channel.close();
connection.close();
}
}
5.3 消费者代码
package rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
//1.创建连接
// 创建一个ConnectionFactory实例来配置RabbitMQ连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置RabbitMQ服务器的主机地址
connectionFactory.setHost("8.136.108.248");
// 设置RabbitMQ服务器的端口号
connectionFactory.setPort(5672);
// 设置登录RabbitMQ服务器的用户名
connectionFactory.setUsername("pinkboy");
// 设置登录RabbitMQ服务器的密码
connectionFactory.setPassword("123456");
// 设置RabbitMQ服务器的虚拟主机
connectionFactory.setVirtualHost("/");
// 使用ConnectionFactory创建一个新的连接
Connection connection = connectionFactory.newConnection();
//2.创建Channel
Channel channel = connection.createChannel();
/**
*3.声明一个队列
*
* @param channel RabbitMQ的通道,用于执行队列操作
*
* 此处使用了queueDeclare方法来声明一个名为"hello"的队列该方法的参数分别表示:
* 1. 队列名称("hello"):指定要声明的队列的名称
* 2. true:表示该队列是持久化的,意味着即使RabbitMQ服务重启,队列也会被保留
* 3. false:表示该队列不是排他的,意味着该队列可以被所有通道共享
* 4. false:表示该队列不会在使用后自动删除,需要手动删除
* 5. null:表示不设置额外的参数
*
* 选择这些参数值的原因可能是希望创建一个持久化的、共享的队列,以便在不同的时间点和不同的消费者之间传递消息
*/
channel.queueDeclare("hello", true, false, false, null);
// 4.开始从名为"hello"的队列中消费消息
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
/**
* 处理接收到的消息
*
* @param consumerTag 消费者标签,用于标识消费者
* @param envelope 包含消息路由信息的信封
* @param properties 消息的属性,如内容类型、内容编码等
* @param body 消息的主体内容,以字节数组形式表示
* @throws IOException 如果处理消息时发生I/O错误
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 打印消息的主体内容
System.out.println("body:" + new String(body));
}
});
Thread.sleep(2000);
//5.关闭资源
channel.close();
connection.close();
}
}
观察运行结果:
运行生产者代码
观察RabbitMQ界面
生产者生产了10条消息
运行消费者代码
观察RabbitMQ界面
消费者消费了10条消息