文章目录
前言
`
一、RabbitMQ 是什么?
简介
开源消息代理/消息队列系统: RabbitMQ 的核心是一个消息代理。想象它是一个高效的邮局:应用程序(生产者)把消息(信件)发送给它,它负责将消息安全、可靠地路由和递送给一个或多个目标应用程序(消费者)。
基于 AMQP: RabbitMQ 最初实现了 AMQP (Advanced Message Queuing Protocol) 0-9-1 协议,这是一个开放标准的应用层协议,专为可靠、异步、跨平台的消息传递而设计。这使得不同语言和平台编写的应用程序能够轻松通信。
平台与语言无关: 由于实现了标准协议(并支持其他协议如 MQTT, STOMP),RabbitMQ 可以被几乎所有主流编程语言(Python, Java, .NET, Ruby, JavaScript, Go, PHP 等)使用。客户端库非常丰富。
构建于 Erlang/OTP: RabbitMQ 是用 Erlang 语言编写的,并运行在强大的 OTP (Open Telecom Platform) 框架之上。Erlang/OTP 以其高并发、分布式、容错和热代码升级能力而闻名,这使得 RabbitMQ 天生就非常可靠、稳定且可扩展。
核心作用/解决的问题
RabbitMQ 主要用于解决分布式系统或复杂应用程序中的通信问题:
解耦: 将消息的发送者(生产者)和接收者(消费者)分离。生产者不需要知道消费者的存在或状态,只需将消息发送到 RabbitMQ。消费者可以在需要时处理消息。这提高了系统的灵活性和可维护性。
异步通信: 生产者发送消息后可以立即返回,无需等待消费者处理完成。消费者可以在自己方便的时候处理消息。这显著提高了系统的响应速度和吞吐量。
缓冲/削峰填谷: 当生产者发送消息的速度超过消费者处理的速度时,RabbitMQ 的队列可以充当缓冲区,暂存消息,避免系统因瞬时压力过载而崩溃。消费者可以按照自己的节奏处理积压的消息。
负载均衡: 可以启动多个消费者实例从同一个队列消费消息,RabbitMQ 会以轮询或其他方式将消息分发给这些消费者,实现工作负载的分配。
可靠性: RabbitMQ 提供了强大的机制(如消息持久化、发送方确认、消费者确认)来确保消息在传输和处理过程中不丢失,即使在部分系统故障的情况下。
灵活的路由: 通过 Exchange 和 Binding 机制,RabbitMQ 能够根据设定的规则(如路由键、主题匹配、头信息匹配等)将消息精确地投递到不同的队列。
核心概念与工作模型
理解 RabbitMQ 的关键是理解其核心组件和消息流:
Producer(生产者): 创建并发送消息的应用程序。
Message(消息): 包含要传输的数据(有效载荷)和一些元数据(如路由键、头信息等)。
Exchange(交换机): 生产者将消息发送到交换机。交换机相当于邮局的分拣中心,它根据消息的路由键(Routing Key) 和自身的类型以及绑定规则,决定将消息投递到哪些队列。常见的交换机类型有:
- Direct: 精确匹配路由键。
- Fanout: 广播到所有绑定的队列(忽略路由键)。
- Topic: 基于路由键模式匹配(通配符)。
- Headers: 基于消息头信息匹配。
Binding(绑定): 连接交换机和队列的规则。它告诉交换机“哪些消息应该被路由到哪个队列”。绑定通常包含一个路由键(对于 Direct 和 Topic 交换机)或头信息匹配规则(对于 Headers 交换机)。
Queue(队列): 消息存储的地方,等待消费者处理。队列是 FIFO(先进先出)的,但可以配置优先级。消息会一直存储在队列中,直到被消费者成功接收并确认。
Consumer(消费者): 连接到队列并接收消息进行处理的应用程序。消费者可以主动拉取消息,也可以让 RabbitMQ 在有消息时推送过来。
消息流简述:
生产者 -> (发送消息到) -> 交换机 -> (根据绑定规则路由到) -> 一个或多个队列 <- (消费者从队列) <- 消费消息
二、安装环境
1.安装 Erlang(必需依赖)
- RabbitMQ 基于 Erlang 编写,需先安装匹配版本:
- 访问 Erlang下载页
- 下载 Windows 安装包(选择与 RabbitMQ 兼容的版本)
- 运行安装程序(默认设置即可)
- 添加环境变量:找到系统环境变量→选中变量Path→编辑→添加Erlang安装bin目录(如:C:\Program Files\Erlang OTP\bin)
- 验证安装:打开命令提示符,输入 erl -version,显示版本号即成功。
2.安装 RabbitMQ
访问 RabbitMQ下载页
在 Windows Installer 部分下载 .exe 安装包
运行安装程序:安装路径保持默认(C:\Program Files\RabbitMQ Server)
添加环境变量::找到系统环境变量→选中变量Path→编辑→添加RabbitMQ安装sbin目录(如:C:\Program Files\RabbitMQ Server\rabbitmq_server-4.1.1\sbin)
保证 Cookie 文件同步:
- 复制 文件(C:\Windows\System32\config\systemprofile.erlang.cookie)中的Cookie 字符串。
- 同步Cookie 字符串到文件(C:\Users\雷家饭碗.erlang.cookie)中。
3.启动 RabbitMQ 服务
方法1:通过服务管理器启动
- 按 Win + R 输入 services.msc
- 找到 RabbitMQ 服务 → 右键启动
方法2:命令行启动
- 以管理员身份打开cmd窗口
- 输入命令启动RabbitMQ:net start RabbitMQ
- 查看RabbitMQ服务状态:sc query RabbitMQ
4.访问管理控制台
- 打开浏览器访问:RabbitMQ控制台
- 使用默认账号登录:
- 用户名: guest
- 密码: guest
三、创建生产者
1.新建生产者发送消息控制台
2.安装 NuGet 包:
- 安装命令:
Install-Package RabbitMQ.Client
3.生产者发送消息
- 代码示例:
using RabbitMQ.Client; using System.Text; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using var connection = await factory.CreateConnectionAsync(); string exchangeName = "exchangeName1"; // 创建初始化通道(用于声明交换机和队列) using (var initChannel = await connection.CreateChannelAsync()) { await initChannel.ExchangeDeclareAsync( exchange: exchangeName, type: ExchangeType.Direct, durable: true ); } while (true) { using var channel = await connection.CreateChannelAsync(); // 创建消息属性 - 7.1.2 正确方式 var properties = new BasicProperties { Persistent = true, // 设置持久化 ContentType = "text/plain", Headers = new Dictionary<string, object> { { "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } } }; string message = DateTime.Now.TimeOfDay.ToString(); var body = Encoding.UTF8.GetBytes(message); await channel.BasicPublishAsync( exchange: exchangeName, routingKey: "Key1", mandatory: false, basicProperties: properties, // 使用创建的消息属性 body: body ); Console.WriteLine($" [x] Sent {message}"); await Task.Delay(1000); // 使用异步延迟 }
4.关键点:
- durable: true + Persistent: true 确保消息与服务重启后不丢失。
- 默认交换机(Key1)使用路由键直连队列。
四、创建消费者
1.创建消费者接收消息控制台
2.安装 NuGet 包:
- 安装命令:
Install-Package RabbitMQ.Client
3.消费者接收消息
- 代码示例:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System.Text; var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(10) }; Console.WriteLine("正在连接到 RabbitMQ..."); using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync(); const string exchangeName = "exchangeName1"; const string queueName = "hello"; const string routingKey = "Key1"; // 1. 声明交换机 await channel.ExchangeDeclareAsync( exchange: exchangeName, type: ExchangeType.Direct, durable: true ); QueueDeclareOk queueResult = null; try { // 2. 尝试声明队列(使用所需参数) queueResult=await channel.QueueDeclareAsync( queue: queueName, durable: true, // 保持您需要的持久化设置 exclusive: false, autoDelete: false, arguments: null ); } catch (OperationInterruptedException ex) when (ex.Message.Contains("inequivalent arg")) { // 3. 处理参数不匹配异常 Console.WriteLine($"队列参数不匹配: {ex.Message}"); // 删除现有队列并重新创建 Console.WriteLine("正在删除现有队列..."); await channel.QueueDeleteAsync(queueName); Console.WriteLine("重新创建队列..."); queueResult = await channel.QueueDeclareAsync( queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null ); } // 4. 绑定队列到交换机 await channel.QueueBindAsync( queue: queueName, exchange: exchangeName, routingKey: routingKey ); Console.WriteLine("队列信息:"); Console.WriteLine($" 名称: {queueResult.QueueName}"); Console.WriteLine($" 消息数: {queueResult.MessageCount}"); Console.WriteLine($" 消费者数: {queueResult.ConsumerCount}"); // 设置服务质量(QoS)限制未确认消息数量 await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false); // 创建异步消费者 var consumer = new AsyncEventingBasicConsumer(channel); // 消息接收事件处理 consumer.ReceivedAsync += async (model, ea) => { try { // 获取消息内容 var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); // 获取消息属性 var properties = ea.BasicProperties; var timestamp = properties.Timestamp.UnixTime; var timestampStr = DateTimeOffset.FromUnixTimeMilliseconds(timestamp).ToString("HH:mm:ss.fff"); var headers = properties.Headers != null ? string.Join(", ", properties.Headers.Select(kv => $"{kv.Key}={kv.Value}")) : "无"; Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 收到消息:"); Console.WriteLine($" 内容: {message}"); Console.WriteLine($" 原始时间戳: {timestampStr}"); Console.WriteLine($" 处理延迟: {DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - timestamp} ms"); Console.WriteLine($" 头部: {headers}"); // 模拟业务处理 Console.WriteLine("处理中..."); await Task.Delay(1000); // 模拟处理耗时 // 业务处理成功后手动确认 await channel.BasicAckAsync(ea.DeliveryTag, multiple: false); Console.WriteLine("√ 处理完成并确认"); } catch (Exception ex) { Console.WriteLine($"× 处理失败: {ex.Message}"); // 处理失败:拒绝消息并重新入队 await channel.BasicNackAsync( deliveryTag: ea.DeliveryTag, multiple: false, requeue: true // 重新入队以便重试 ); Console.WriteLine("! 消息已重新入队"); } }; // 启动消费者 await channel.BasicConsumeAsync( queue: queueName, autoAck: false, // 关闭自动确认 consumer: consumer ); Console.WriteLine(" [*] 等待消息中... 按 Ctrl+C 退出"); Console.WriteLine("--------------------------------------------------"); // 保持程序运行直到收到退出信号 var exitEvent = new ManualResetEvent(false); Console.CancelKeyPress += (sender, eventArgs) => { eventArgs.Cancel = true; // 防止进程立即退出 Console.WriteLine("\n正在关闭消费者..."); exitEvent.Set(); // 通知主线程可以退出 }; exitEvent.WaitOne(); // 阻塞直到收到退出信号
4.关键点:
- autoAck: false + BasicAck() 避免消息处理失败时丢失。
- 事件驱动模型(EventingBasicConsumer)适合实时处理。
五、主要优势
- 高可靠性: 通过持久化、确认机制保障消息不丢。
- 高可用性: 支持集群和镜像队列,避免单点故障。
- 灵活的路由: 多种交换器类型满足复杂路由需求。
- 可扩展性: 易于横向扩展(添加节点、消费者)以应对增长。
- 丰富的协议支持: 原生 AMQP 0-9-1,插件支持 MQTT, STOMP 等。
- 庞大的社区和生态系统: 开源、成熟、文档完善、客户端库丰富、插件众多。
- 管理界面: 提供易用的 Web UI 和命令行工具进行监控和管理。
六、最佳实践
场景 | 方案 | 关键配置 |
---|---|---|
消息持久化 | 队列声明 + 消息属性 | durable: true + Persistent: true |
消费者高可用 | 手动确认 + 异常重试 | autoAck: false + BasicNack |
流量控制 | 预取计数(QoS) | channel.BasicQos(prefetchCount: 1) |
延迟任务 | 死信队列 + TTL | x-message-ttl + x-dead-letter-exchange |
总结
RabbitMQ 是一个功能强大、稳定可靠、灵活且应用广泛的开源消息代理。它通过异步消息传递,有效地解决了分布式系统中的解耦、异步、削峰、负载均衡和可靠通信等关键问题。其基于标准的协议支持和丰富的特性,使其成为构建现代、可扩展、高可用应用程序的基石之一。