目录
通配符模式
概述
通配符模式是一种灵活的消息传递模式,可以根据消息的路由键(routing key)和绑定(binding)模式来实现精确的消息过滤和匹配。在RabbitMQ中,路由键由生产者定义,用于标识消息的目的地;而绑定则由消费者定义,用于指定消息的接收规则。
路由模式的升级版, 在routingKey的基础上,增加了通配符的功能, 使之更加灵活.
Topics和Routing的基本原理相同,即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列. 类似于正则表达式的⽅式来定义Routingkey的模式.
不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.
应用场景
RabbitMQ的通配符模式在需要根据消息的特定属性进行路由和过滤的场景中非常有用。例如,在一个日志系统中,可以使用通配符模式来将不同级别的日志消息路由到不同的队列中,以便进行不同的处理和分析。
优势
通配符模式的优势在于它可以灵活地匹配消息,使得消息可以根据不同的条件进行过滤和选择。通过合理地定义绑定和路由键,可以实现复杂的消息过滤和路由策略,提高系统的灵活性和性能。
代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.21.0</version> </dependency>
常量类
public class Constants {
public static final String HOST = "47.98.109.138";
public static final int PORT = 5672;
public static final String USER_NAME = "study";
public static final String PASSWORD = "study";
public static final String VIRTUAL_HOST = "aaa";
//通配符模式
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic_queue1";
public static final String TOPIC_QUEUE2 = "topic_queue2";
}
编写生产者代码
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 通配符模式生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
//4. 声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
//5. 绑定交换机和队列
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
//6. 发送消息
String msg = "hello topic, my routingkey is ae.a.f....";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes()); //转发到Q1
String msg_b = "hello topic, my routingkey is ef.a.b....";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2
String msg_c = "hello topic, my routingkey is c.ef.d....";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
}
编写消费者1代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
}
}
编写消费者2代码
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME);//账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2. 开启信道
Channel channel = connection.createChannel();
//3. 声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
//4. 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
//从队列中收到消息, 就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:"+ new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);
}
}
运行代码
观察管理界面可以看到两个队列都各自收到了2条消息,与预期符合。
两个消费者都各自从两个不同的队列中取出并消费了2条消息,与预期符合。