日常工作,MQ的7种常用使用场景

发布于:2025-03-12 ⋅ 阅读:(68) ⋅ 点赞:(0)

目录

1. 异步处理

详细解释

运用场景

代码示例

2. 流量削峰

详细解释

运用场景

代码示例

3. 日志处理

详细解释

运用场景

代码示例

4. 数据同步

详细解释

运用场景

代码示例

5. 任务调度

详细解释

运用场景

代码示例

6. 分布式事务

详细解释

运用场景

代码示例

7. 系统集成

详细解释

运用场景

代码示例


以下为你详细介绍 MQ(消息队列)在日常工作中的 8 种常用使用场景:

1. 异步处理

详细解释

在一些业务流程中,存在部分操作耗时较长且不影响主流程的立即响应。通过将这些耗时操作放入消息队列,主流程可以继续执行,而耗时操作会在后台异步处理,从而提高系统的整体响应速度和吞吐量。

运用场景
  • 电商系统中,用户下单后,订单创建是主流程,而发送订单确认邮件、短信通知等操作可以异步处理。
  • 社交平台中,用户发布动态后,动态的点赞、评论计数更新等操作可以异步完成。
代码示例
// 生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AsyncProducer {
    private static final String QUEUE_NAME = "async_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "耗时操作的任务信息";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AsyncConsumer {
    private static final String QUEUE_NAME = "async_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                // 模拟耗时操作
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

2. 流量削峰

详细解释

在业务高峰期,系统会收到大量的请求,若直接处理这些请求,可能导致系统负载过高甚至崩溃。消息队列可以作为缓冲,将大量请求先存入队列,系统再按照自身处理能力从队列中逐步获取请求进行处理,实现流量的削峰填谷,保护系统稳定性。

运用场景
  • 电商大促活动,如双 11、618 等,大量用户同时下单,订单请求先进入消息队列,系统按一定速率处理订单。
  • 在线直播活动,大量观众同时发送弹幕、点赞等请求,消息队列可缓冲这些请求。
代码示例
// 生产者代码(模拟高并发请求)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TrafficPeakProducer {
    private static final String QUEUE_NAME = "traffic_peak_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 1000; i++) {
                String message = "Request " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(按一定速率处理请求)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TrafficPeakConsumer {
    private static final String QUEUE_NAME = "traffic_peak_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    // 模拟处理时间,控制处理速率
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

3. 日志处理

详细解释

系统运行过程中会产生大量日志,如业务日志、操作日志等。将日志信息发送到消息队列,由专门的日志处理系统从队列中获取日志进行存储、分析和展示等操作,实现日志处理与业务系统的解耦,方便对日志进行统一管理。

运用场景
  • 大型互联网应用中,各个微服务产生的日志可通过消息队列汇总到日志中心进行处理。
  • 企业级系统中,将用户操作日志发送到消息队列,用于后续的审计和安全分析。
代码示例
// 生产者代码(日志发送)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class LogProducer {
    private static final String QUEUE_NAME = "log_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String logMessage = "This is a log message";
            channel.basicPublish("", QUEUE_NAME, null, logMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent log: '" + logMessage + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(日志处理)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class LogConsumer {
    private static final String QUEUE_NAME = "log_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for log messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String logMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received log: '" + logMessage + "'");
                // 模拟日志存储操作
                // 这里可以将日志存储到文件、数据库等
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

4. 数据同步

详细解释

在分布式系统中,不同数据库、不同系统之间需要进行数据同步。消息队列可作为数据同步的桥梁,当数据发生变化时,将变化的消息发送到消息队列,接收方从队列中获取消息并更新相应数据,实现数据的异步同步。

运用场景
  • 电商系统中,订单数据在主数据库更新后,将订单变更消息发送到消息队列,库存系统从队列中获取消息更新库存数据。
  • 企业的数据仓库与业务系统之间的数据同步,通过消息队列保证数据的一致性。
代码示例
// 生产者代码(数据变更通知)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DataSyncProducer {
    private static final String QUEUE_NAME = "data_sync_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String dataChangeMessage = "Order data has been updated";
            channel.basicPublish("", QUEUE_NAME, null, dataChangeMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent data change message: '" + dataChangeMessage + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(数据同步更新)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DataSyncConsumer {
    private static final String QUEUE_NAME = "data_sync_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for data change messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String dataChangeMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received data change message: '" + dataChangeMessage + "'");
                // 模拟数据同步更新操作
                // 这里可以更新库存、数据仓库等
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

5. 任务调度

详细解释

对于定时任务或按一定顺序执行的任务,可将任务信息放入消息队列,由任务调度系统从队列中获取任务并按规则调度执行,提高任务调度的灵活性和可扩展性。

运用场景
  • 数据处理系统中,每天凌晨执行数据备份、数据统计等任务,可将任务信息提前放入消息队列。
  • 定时提醒系统,如会议提醒、任务到期提醒等,将提醒任务放入消息队列进行调度。
代码示例
// 生产者代码(任务发布)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TaskSchedulerProducer {
    private static final String QUEUE_NAME = "task_scheduler_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String taskMessage = "Data backup task";
            channel.basicPublish("", QUEUE_NAME, null, taskMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent task: '" + taskMessage + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(任务执行)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TaskSchedulerConsumer {
    private static final String QUEUE_NAME = "task_scheduler_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for tasks. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String taskMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received task: '" + taskMessage + "'");
                // 模拟任务执行
                // 这里可以执行数据备份、统计等任务
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

6. 分布式事务

详细解释

在分布式系统中,涉及多个服务之间的事务操作时,消息队列可用于实现最终一致性。通过将事务操作相关的消息发送到消息队列,各个服务根据消息进行相应操作,在一定时间内达到数据的一致性。

运用场景
  • 跨银行转账系统中,转出银行将转账消息发送到消息队列,转入银行从队列中获取消息并执行入账操作。
  • 电商系统中,订单服务和库存服务之间的事务处理,通过消息队列保证订单和库存数据的一致性。
代码示例
// 生产者代码(转出银行发送转账消息)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DistributedTransactionProducer {
    private static final String QUEUE_NAME = "distributed_transaction_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String transferMessage = "Transfer 1000 yuan from account A to account B";
            channel.basicPublish("", QUEUE_NAME, null, transferMessage.getBytes("UTF-8"));
            System.out.println(" [x] Sent transfer message: '" + transferMessage + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

// 消费者代码(转入银行处理转账消息)
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DistributedTransactionConsumer {
    private static final String QUEUE_NAME = "distributed_transaction_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for transfer messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String transferMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received transfer message: '" + transferMessage + "'");
                // 模拟转入银行入账操作
                // 这里可以更新账户余额等
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

7. 系统集成

详细解释

多个不同系统集成时,消息队列可作为系统间通信的桥梁。各系统将需交互的数据或操作指令以消息形式发送到消息队列,其他系统从队列中获取消息并处理,实现系统间的解耦和异步通信。

运用场景
  • 企业的 ERP 系统与 CRM 系统集成,ERP 系统创建新客户订单后,将订单消息发送到消息队列,CRM 系统从队列获取消息更新客户订单信息和销售数据。
  • 电商平台与物流系统集成,订单生成后将订单信息发送到消息队列,物流系统从队列获取信息进行发货处理。
代码示例
// 生产者代码(ERP系统发送订单消息)
import com.rabbitmq.client.Channel;

网站公告

今日签到

点亮在社区的每一天
去签到