.Net Core 中RabbitMQ基本使用

发布于:2025-07-04 ⋅ 阅读:(18) ⋅ 点赞:(0)


前言

`

一、RabbitMQ 是什么?

简介

  • 开源消息代理/消息队列系统: RabbitMQ 的核心是一个消息代理。想象它是一个高效的邮局:应用程序(生产者)把消息(信件)发送给它,它负责将消息安全、可靠地路由和递送给一个或多个目标应用程序(消费者)。

  • 基于 AMQP: RabbitMQ 最初实现了 AMQP (Advanced Message Queuing Protocol) 0-9-1 协议,这是一个开放标准的应用层协议,专为可靠、异步、跨平台的消息传递而设计。这使得不同语言和平台编写的应用程序能够轻松通信。

  • 平台与语言无关: 由于实现了标准协议(并支持其他协议如 MQTT, STOMP),RabbitMQ 可以被几乎所有主流编程语言(Python, Java, .NET, Ruby, JavaScript, Go, PHP 等)使用。客户端库非常丰富。

  • 构建于 Erlang/OTP: RabbitMQ 是用 Erlang 语言编写的,并运行在强大的 OTP (Open Telecom Platform) 框架之上。Erlang/OTP 以其高并发、分布式、容错和热代码升级能力而闻名,这使得 RabbitMQ 天生就非常可靠、稳定且可扩展。

核心作用/解决的问题

  • RabbitMQ 主要用于解决分布式系统或复杂应用程序中的通信问题:

    • 解耦: 将消息的发送者(生产者)和接收者(消费者)分离。生产者不需要知道消费者的存在或状态,只需将消息发送到 RabbitMQ。消费者可以在需要时处理消息。这提高了系统的灵活性和可维护性。

    • 异步通信: 生产者发送消息后可以立即返回,无需等待消费者处理完成。消费者可以在自己方便的时候处理消息。这显著提高了系统的响应速度和吞吐量。

    • 缓冲/削峰填谷: 当生产者发送消息的速度超过消费者处理的速度时,RabbitMQ 的队列可以充当缓冲区,暂存消息,避免系统因瞬时压力过载而崩溃。消费者可以按照自己的节奏处理积压的消息。

    • 负载均衡: 可以启动多个消费者实例从同一个队列消费消息,RabbitMQ 会以轮询或其他方式将消息分发给这些消费者,实现工作负载的分配。

    • 可靠性: RabbitMQ 提供了强大的机制(如消息持久化、发送方确认、消费者确认)来确保消息在传输和处理过程中不丢失,即使在部分系统故障的情况下。

    • 灵活的路由: 通过 Exchange 和 Binding 机制,RabbitMQ 能够根据设定的规则(如路由键、主题匹配、头信息匹配等)将消息精确地投递到不同的队列。

核心概念与工作模型

  • 理解 RabbitMQ 的关键是理解其核心组件和消息流:

    • Producer(生产者): 创建并发送消息的应用程序。

    • Message(消息): 包含要传输的数据(有效载荷)和一些元数据(如路由键、头信息等)。

    • Exchange(交换机): 生产者将消息发送到交换机。交换机相当于邮局的分拣中心,它根据消息的路由键(Routing Key) 和自身的类型以及绑定规则,决定将消息投递到哪些队列。常见的交换机类型有:

      • Direct: 精确匹配路由键。
      • Fanout: 广播到所有绑定的队列(忽略路由键)。
      • Topic: 基于路由键模式匹配(通配符)。
      • Headers: 基于消息头信息匹配。
    • Binding(绑定): 连接交换机和队列的规则。它告诉交换机“哪些消息应该被路由到哪个队列”。绑定通常包含一个路由键(对于 Direct 和 Topic 交换机)或头信息匹配规则(对于 Headers 交换机)。

    • Queue(队列): 消息存储的地方,等待消费者处理。队列是 FIFO(先进先出)的,但可以配置优先级。消息会一直存储在队列中,直到被消费者成功接收并确认。

    • Consumer(消费者): 连接到队列并接收消息进行处理的应用程序。消费者可以主动拉取消息,也可以让 RabbitMQ 在有消息时推送过来。

  • 消息流简述:
    生产者 -> (发送消息到) -> 交换机 -> (根据绑定规则路由到) -> 一个或多个队列 <- (消费者从队列) <- 消费消息

二、安装环境

1.安装 Erlang(必需依赖)

  • RabbitMQ 基于 Erlang 编写,需先安装匹配版本:
    • 访问 Erlang下载页
    • 下载 Windows 安装包(选择与 RabbitMQ 兼容的版本)
    • 运行安装程序(默认设置即可)
    • 添加环境变量:找到系统环境变量→选中变量Path→编辑→添加Erlang安装bin目录(如:C:\Program Files\Erlang OTP\bin)
    • 验证安装:打开命令提示符,输入 erl -version,显示版本号即成功。

2.安装 RabbitMQ

  • 访问 RabbitMQ下载页

  • 在 Windows Installer 部分下载 .exe 安装包

  • 在这里插入图片描述

  • 运行安装程序:安装路径保持默认(C:\Program Files\RabbitMQ Server)

  • 添加环境变量::找到系统环境变量→选中变量Path→编辑→添加RabbitMQ安装sbin目录(如:C:\Program Files\RabbitMQ Server\rabbitmq_server-4.1.1\sbin)

  • 保证 Cookie 文件同步

    • 复制 文件(C:\Windows\System32\config\systemprofile.erlang.cookie)中的Cookie 字符串。
    • 同步Cookie 字符串到文件(C:\Users\雷家饭碗.erlang.cookie)中。

3.启动 RabbitMQ 服务

  • 方法1:通过服务管理器启动

    • 按 Win + R 输入 services.msc
    • 找到 RabbitMQ 服务 → 右键启动
  • 方法2:命令行启动

    • 以管理员身份打开cmd窗口
    • 输入命令启动RabbitMQ:net start RabbitMQ
    • 查看RabbitMQ服务状态:sc query RabbitMQ

4.访问管理控制台

  • 打开浏览器访问RabbitMQ控制台
  • 使用默认账号登录
    • 用户名: guest
    • 密码: guest

三、创建生产者

1.新建生产者发送消息控制台

2.安装 NuGet 包:

  1. 安装命令:
    Install-Package RabbitMQ.Client
    

3.生产者发送消息

  1. 代码示例:
    using RabbitMQ.Client;
    using System.Text;
    
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        UserName = "guest",
        Password = "guest"
    };
    
    using var connection = await factory.CreateConnectionAsync();
    string exchangeName = "exchangeName1";
    
    // 创建初始化通道(用于声明交换机和队列)
    using (var initChannel = await connection.CreateChannelAsync())
    {
        await initChannel.ExchangeDeclareAsync(
            exchange: exchangeName,
            type: ExchangeType.Direct,
            durable: true
        );
    }
    
    while (true)
    {
        using var channel = await connection.CreateChannelAsync();
    
        // 创建消息属性 - 7.1.2 正确方式
        var properties = new BasicProperties
        {
            Persistent = true,  // 设置持久化
            ContentType = "text/plain",
            Headers = new Dictionary<string, object>
            {
                { "timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }
            }
        };
    
        string message = DateTime.Now.TimeOfDay.ToString();
        var body = Encoding.UTF8.GetBytes(message);
    
        await channel.BasicPublishAsync(
            exchange: exchangeName,
            routingKey: "Key1",
            mandatory: false,
            basicProperties: properties,  // 使用创建的消息属性
            body: body
            
        );
    
        Console.WriteLine($" [x] Sent {message}");
        await Task.Delay(1000);  // 使用异步延迟
    }
    

4.关键点:

  • durable: true + Persistent: true 确保消息与服务重启后不丢失。
  • 默认交换机(Key1)使用路由键直连队列。

四、创建消费者

1.创建消费者接收消息控制台

2.安装 NuGet 包:

  1. 安装命令:
    Install-Package RabbitMQ.Client
    

3.消费者接收消息

  1. 代码示例:
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using RabbitMQ.Client.Exceptions;
    using System.Text;
    
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",
        UserName = "guest",
        Password = "guest",
        AutomaticRecoveryEnabled = true,
        NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
    };
    
    Console.WriteLine("正在连接到 RabbitMQ...");
    using var connection = await factory.CreateConnectionAsync();
    using var channel = await connection.CreateChannelAsync();
    
    const string exchangeName = "exchangeName1";
    const string queueName = "hello";
    const string routingKey = "Key1";
    
    // 1. 声明交换机
    await channel.ExchangeDeclareAsync(
        exchange: exchangeName,
        type: ExchangeType.Direct,
        durable: true
    );
    QueueDeclareOk queueResult = null;
    try
    {
        // 2. 尝试声明队列(使用所需参数)
        queueResult=await channel.QueueDeclareAsync(
            queue: queueName,
            durable: true, // 保持您需要的持久化设置
            exclusive: false,
            autoDelete: false,
            arguments: null
        );
    }
    catch (OperationInterruptedException ex) when (ex.Message.Contains("inequivalent arg"))
    {
        // 3. 处理参数不匹配异常
        Console.WriteLine($"队列参数不匹配: {ex.Message}");
    
        // 删除现有队列并重新创建
        Console.WriteLine("正在删除现有队列...");
        await channel.QueueDeleteAsync(queueName);
    
        Console.WriteLine("重新创建队列...");
        queueResult = await channel.QueueDeclareAsync(
            queue: queueName,
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: null
        );
    }
    
    // 4. 绑定队列到交换机
    await channel.QueueBindAsync(
        queue: queueName,
        exchange: exchangeName,
        routingKey: routingKey
    );
    
    Console.WriteLine("队列信息:");
    Console.WriteLine($" 名称: {queueResult.QueueName}");
    Console.WriteLine($" 消息数: {queueResult.MessageCount}");
    Console.WriteLine($" 消费者数: {queueResult.ConsumerCount}");
    
    // 设置服务质量(QoS)限制未确认消息数量
    await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
    
    // 创建异步消费者
    var consumer = new AsyncEventingBasicConsumer(channel);
    
    // 消息接收事件处理
    consumer.ReceivedAsync += async (model, ea) =>
    {
        try
        {
            // 获取消息内容
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
    
            // 获取消息属性
            var properties = ea.BasicProperties;
            var timestamp = properties.Timestamp.UnixTime;
            var timestampStr = DateTimeOffset.FromUnixTimeMilliseconds(timestamp).ToString("HH:mm:ss.fff");
    
            var headers = properties.Headers != null
                ? string.Join(", ", properties.Headers.Select(kv => $"{kv.Key}={kv.Value}"))
                : "无";
    
            Console.WriteLine($"[{DateTime.Now:HH:mm:ss.fff}] 收到消息:");
            Console.WriteLine($"  内容: {message}");
            Console.WriteLine($"  原始时间戳: {timestampStr}");
            Console.WriteLine($"  处理延迟: {DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - timestamp} ms");
            Console.WriteLine($"  头部: {headers}");
    
            // 模拟业务处理
            Console.WriteLine("处理中...");
            await Task.Delay(1000); // 模拟处理耗时
    
            // 业务处理成功后手动确认
            await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
            Console.WriteLine("√ 处理完成并确认");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"× 处理失败: {ex.Message}");
    
            // 处理失败:拒绝消息并重新入队
            await channel.BasicNackAsync(
                deliveryTag: ea.DeliveryTag,
                multiple: false,
                requeue: true // 重新入队以便重试
            );
    
            Console.WriteLine("! 消息已重新入队");
        }
    };
    
    // 启动消费者
    await channel.BasicConsumeAsync(
        queue: queueName,
        autoAck: false, // 关闭自动确认
        consumer: consumer
    );
    
    Console.WriteLine(" [*] 等待消息中... 按 Ctrl+C 退出");
    Console.WriteLine("--------------------------------------------------");
    
    // 保持程序运行直到收到退出信号
    var exitEvent = new ManualResetEvent(false);
    Console.CancelKeyPress += (sender, eventArgs) => {
        eventArgs.Cancel = true; // 防止进程立即退出
        Console.WriteLine("\n正在关闭消费者...");
        exitEvent.Set(); // 通知主线程可以退出
    };
    
    exitEvent.WaitOne(); // 阻塞直到收到退出信号
    

4.关键点:

  • autoAck: false + BasicAck() 避免消息处理失败时丢失。
  • 事件驱动模型EventingBasicConsumer)适合实时处理。

五、主要优势

  • 高可靠性: 通过持久化、确认机制保障消息不丢。
  • 高可用性: 支持集群和镜像队列,避免单点故障。
  • 灵活的路由: 多种交换器类型满足复杂路由需求。
  • 可扩展性: 易于横向扩展(添加节点、消费者)以应对增长。
  • 丰富的协议支持: 原生 AMQP 0-9-1,插件支持 MQTT, STOMP 等。
  • 庞大的社区和生态系统: 开源、成熟、文档完善、客户端库丰富、插件众多。
  • 管理界面: 提供易用的 Web UI 和命令行工具进行监控和管理。

六、最佳实践

场景 方案 关键配置
消息持久化 队列声明 + 消息属性 durable: true + Persistent: true
消费者高可用 手动确认 + 异常重试 autoAck: false + BasicNack
流量控制 预取计数(QoS) channel.BasicQos(prefetchCount: 1)
延迟任务 死信队列 + TTL x-message-ttl + x-dead-letter-exchange

总结

RabbitMQ 是一个功能强大、稳定可靠、灵活且应用广泛的开源消息代理。它通过异步消息传递,有效地解决了分布式系统中的解耦、异步、削峰、负载均衡和可靠通信等关键问题。其基于标准的协议支持和丰富的特性,使其成为构建现代、可扩展、高可用应用程序的基石之一。


网站公告

今日签到

点亮在社区的每一天
去签到