消息三剑客华山论剑:Kafka vs RabbitMQ vs RocketMQ

发布于:2025-09-10 ⋅ 阅读:(22) ⋅ 点赞:(0)

“选消息队列就像选交通工具:Kafka是货运专列,RabbitMQ是城市地铁,RocketMQ是全能高铁。选错工具?小心你的数据堵在五环!”

一、江湖地位速览

消息队列
Kafka
RabbitMQ
RocketMQ
大数据管道
日志收集
企业级中间件
复杂路由
金融交易
顺序消息

二、核心参数擂台赛

2.1 性能参数对比

public class MQBenchmark {
    // 吞吐量 (msg/s)
    private static final int KAFKA_THROUGHPUT = 150_000;
    private static final int RABBITMQ_THROUGHPUT = 20_000;
    private static final int ROCKETMQ_THROUGHPUT = 100_000;
    
    // 延迟 (ms)
    private static final double KAFKA_LATENCY = 5.2;
    private static final double RABBITMQ_LATENCY = 0.8;
    private static final double ROCKETMQ_LATENCY = 3.5;
}

2.2 架构复杂度

40% 25% 35% 架构复杂度 Kafka RabbitMQ RocketMQ

三、代码江湖见真章

3.1 Kafka生产者(日志采集)

public class LogProducer {
    private static final String BOOTSTRAP_SERVERS = "kafka1:9092,kafka2:9092";
    
    public void sendLog(String logData) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("acks", "all");
        
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("app_logs", logData);
            producer.send(record);
        }
    }
}

3.2 RabbitMQ消费者(订单处理)

public class OrderConsumer {
    private static final String EXCHANGE_NAME = "order_exchange";
    
    public void startConsuming() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("rabbitmq-host");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "payment.orders");
            
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                Order order = parseOrder(delivery.getBody());
                paymentService.process(order);
            };
            
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        }
    }
}

3.3 RocketMQ事务消息(金融交易)

public class TransactionProducer {
    private DefaultMQProducer producer;
    
    public void sendTransaction(TransferOrder order) throws Exception {
        Message msg = new Message("TRANSFER_TOPIC", 
            JSON.toJSONBytes(order));
        
        TransactionSendResult result = producer.sendMessageInTransaction(msg, 
            new LocalTransactionExecuter() {
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    return accountService.prepareTransfer(order) ? 
                        LocalTransactionState.COMMIT_MESSAGE :
                        LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }, null);
        
        if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
            alertService.notifyFailedTransaction(order);
        }
    }
}

四、武林争霸对比表

维度 Kafka RabbitMQ RocketMQ
吞吐量 15万+/秒 2万/秒 10万+/秒
延迟 5ms+ <1ms 3ms+
消息顺序 分区内有序 无序 严格顺序
事务支持 有限支持 完整支持
开发难度 高(需理解分区/副本) 低(AMQP标准) 中(有中文文档)
运维成本 高(需Zookeeper) 低(内置管理界面) 中(需NameServer)
最佳场景 日志收集/流处理 企业应用集成 金融交易/电商订单

五、实战选型指南针

业务需求
需要事务?
RocketMQ
需要低延迟?
RabbitMQ
海量数据?
Kafka
综合选择

5.1 电商系统架构示例

// 使用Kafka收集用户行为日志
kafkaProducer.send(new UserBehaviorLog(userId, action));

// 通过RabbitMQ处理库存变更
rabbitTemplate.convertAndSend("inventory", "stock.update", stockChange);

// RocketMQ处理支付订单
rocketMQTemplate.sendMessageInTransaction("PAY_ORDER_TOPIC", paymentOrder);

六、性能调优宝典

6.1 Kafka参数调优

props.put("linger.ms", 20);  // 适当增加批次等待时间
props.put("batch.size", 16384); // 增大批次大小
props.put("compression.type", "snappy"); // 启用压缩

6.2 RabbitMQ内存控制

// 设置队列最大内存 (50MB)
Map<String, Object> args = new HashMap<>();
args.put("x-max-length-bytes", 50 * 1024 * 1024);
channel.queueDeclare("image_queue", true, false, false, args);

6.3 RocketMQ刷盘策略

// 异步刷盘提升性能(适合允许少量数据丢失的场景)
DefaultMQProducer producer = new DefaultMQProducer("GROUP_NAME");
producer.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);

七、运维监控三件套

31% 48% 21% 监控工具使用率 Kafka Manager RabbitMQ Management RocketMQ Console
  1. Kafka Eagle:可视化监控平台
  2. Prometheus+Grafana:通用监控方案
  3. 阿里云专业版(RocketMQ商业支持)

八、血泪教训清单

  1. Kafka陷阱:分区数不是越多越好!
// 错误示范:创建1000个分区导致性能下降
new Topic("user_events", 1000, (short)3);
  1. RabbitMQ内存爆炸
// 必须设置队列最大长度
channel.queueDeclare("unlimited_queue", false, false, false, null); // 危险操作!
  1. RocketMQ顺序消费
// 必须使用MessageQueueOrderly模式
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 顺序处理逻辑
    }
});

九、未来趋势瞭望

  1. Serverless化:云原生消息服务
  2. 智能路由:基于AI的消息分发
  3. 统一协议:支持多协议转换网关

网站公告

今日签到

点亮在社区的每一天
去签到