1.HelloWorld案例
简单队列
不使用SpringBoot 给MQ发送简单消息
package com.hrui.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hrui
* @date 2025/5/28 1:36
*/
public class RabbitMQSendMessage {
public static void main(String[] args) {
//1.建立连接
ConnectionFactory factory=new ConnectionFactory();
//1.1.设置连接参数 分别是:主机名host 端口port 用户名username 密码password 虚拟机vhost
factory.setHost("xx.xxx.xxx.xx");
factory.setPort(5672);
factory.setUsername("xxxx");
factory.setPassword("xxxxx");
factory.setVirtualHost("/");
Connection connection =null;
Channel channel =null;
try {
//1.2.建立连接
connection = factory.newConnection();
//2.创建频道(通道)Channel
channel = connection.createChannel();
//3.创建队列
String queueName="simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
//4.发送消息
String message = "hello world";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (RuntimeException | IOException | TimeoutException e) {
throw new RuntimeException(e);
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
简单消息不需要使用交换机
简单消息消费者
package com.hrui.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hrui
* @date 2025/5/28 1:51
*/
public class RabbitMQReceiveMessage {
public static void main(String[] args) {
//1.建立连接
ConnectionFactory factory=new ConnectionFactory();
//1.1.设置连接参数 分别是:主机名host 端口port 用户名username 密码password 虚拟机vhost
factory.setHost("xx.xx.xx.xx");
factory.setPort(5672);
factory.setUsername("xxxx");
factory.setPassword("xxxxx");
factory.setVirtualHost("/");
//1.2.建立连接
Connection connection =null;
Channel channel =null;
try {
connection = factory.newConnection();
//2.创建频道(通道)Channel
channel = connection.createChannel();
//3.创建队列
String queueName="simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
//4.接收消息(订阅该队列中的消息)
//注意输出的时候是 等待接收消息..... 先输出 说明已经开启了异步
channel.basicConsume(queueName, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:"+new String(body));
}
});
System.out.println("等待接收消息.....");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
上面的简单消息发送其实用的是直连交换机 也就是默认交换机
2.使用SpringAMQP
简化消息发送和接收的API
application.properties中的配置
这里生产者和消费者在一起
spring.rabbitmq.host=192.168.1.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=登录账号 spring.rabbitmq.password=登录密码 spring.rabbitmq.virtual-host=/
2.1.Basic Queue简单队列模型
简单队列消息接收者
2.2.Work Queue工作队列模型
这里先把简单队列的代码注释掉
如果你消费者和生产者分开两个应用 就会出现一人一半消费的现象
我这里因为引用启动前就已经往MQ发送了消息
哪个消费者的@Rabbitlistner 先注册,就可能全部抢完
分开应用会看到一人一半的现象
工作模式其实也是简单模式
简单模式和工作队列模式 都不允许 同一条消息给每个消费者消费
事实上简单队列模式和工作队列模式用的是默认交换机""
MQ的工作流程是
生产者 → 交换机 → 队列 → 消费者
发布订阅模式允许同一消息发送给多个消费者.实现方式是加入了exchange(交换机)
2.3.发布、订阅模型-Fanout-扇形交换机 真正广播
Fanout是交换机的一种类型 Fanout交换机常用于广播
交换机只用于消息转发
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的Queue
上面的简单队列是因为我们开始写代码时候就声明了Queue
工作队列也是通过@Bean的方式声明了
那么发布、订阅模式可以自己在web页面中配置交换机和队列
也可以应用内声明 注意注意注意:交换机 队列的声明需要在消费者端声明 这也是下面出现问题的原因 配置和
@RabbitListener 要在同一应用中 才会去声明
把工作队列代码注释,先启动下
package com.hrui.rabbitmq.fanoutExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RestController;
/**
* @author hrui
* @date 2025/5/28 14:52
*/
@Configuration
public class FanoutExchangeTest {
//声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hrui.fanout");
}
@Bean
public Queue queue1(){
return new Queue("fanout.queue1");
}
@Bean
public Queue queue2(){
return new Queue("fanout.queue2");
}
//交换机和队列1绑定
@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
//交换机和队列2绑定
@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
启动了 但是web页面上并没有看到 声明的交换机和队列
当我定义了两个消费者 启动之后才看到
package com.hrui.rabbitmq.fanoutExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author hrui
* @date 2025/5/28 15:21
*/
@Component
public class FanoutExchangeListener {
@RabbitListener(queues = "fanout.queue1")
public void receive1(String msg) {
System.out.println("queue1 收到消息:" + msg);
}
@RabbitListener(queues = "fanout.queue2")
public void receive2(String msg) {
System.out.println("queue2 收到消息:" + msg);
}
}
发消息
package com.hrui.rabbitmq.fanoutExchange;
import jakarta.annotation.PostConstruct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RestController;
/**
* @author hrui
* @date 2025/5/28 14:52
*/
@Configuration
public class FanoutExchangeTest {
//声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hrui.fanout");
}
@Bean
public Queue queue1(){
return new Queue("fanout.queue1");
}
@Bean
public Queue queue2(){
return new Queue("fanout.queue2");
}
//交换机和队列1绑定
@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
//交换机和队列2绑定
@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() throws InterruptedException {
//第一个参数是交换机 第二个参数是路由键(这里先用""空字符串) 第三个参数是消息
rabbitTemplate.convertAndSend("hrui.fanout","","hello fanout");
}
}
接收消息的上面已经写了
运行结果
2.4.发布、订阅模型-Direct 条件广播
Direct 按一定规则 交换机按一定队则将对应消息发送到对应(指定)的队列
说明Direct 也是可以适配Fanout 比Fanout更加灵活
上面是在消费者中声明交换机 声明队列的方式
其实还有另外一种更加直接的方式就是用
@RabbitListener注解来接收消息同时来声明交换机 队列 路由Key 就是 Echange Queue RoutingKey
package com.hrui.rabbitmq.directExchange;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author hrui
* @date 2025/5/28 15:53
*/
@Component
public class DirectExchangeListener {
// 消费者1监听 direct.queue1,绑定 routingKey = red, blue
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "hrui.queue3"),
exchange = @Exchange(name = "hrui.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到Direct消息: [" + msg + "]");
}
// 消费者2监听 direct.queue2,绑定 routingKey = red, yellow
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "hrui.queue4"),
exchange = @Exchange(name = "hrui.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到Direct消息: [" + msg + "]");
}
}
都可以接收到red 1只能接收到blue 2只能接收到 yellow
发送消息
package com.hrui.rabbitmq.directExchange;
import jakarta.annotation.PostConstruct;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;
/**
* @author hrui
* @date 2025/5/28 16:44
*/
@RestController
public class SendDirectExchange {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("hrui.direct", "red", "direct exchange msg red"+i);
rabbitTemplate.convertAndSend("hrui.direct", "blue", "direct exchange msg blue"+i);
rabbitTemplate.convertAndSend("hrui.direct", "yellow", "direct exchange msg yellow"+i);
}
}
}
2.5.发布、订阅模型-Topic 条件广播
Topic和Direct 条件广播非常相似 只不过上面 Direct是用的例如 red blue
而Topic要求必须多个单词 例如 china.bews jspan.news
2.6.消息转换器
消息接收时候
返送时候用的Map接收时候也用Map
发送时候用的Map接收时候也用Map那么 发送时候用List 接收用List