霸王餐返利app的分布式架构设计:基于事件驱动的订单处理系统

发布于:2025-09-13 ⋅ 阅读:(18) ⋅ 点赞:(0)

霸王餐返利app的分布式架构设计:基于事件驱动的订单处理系统

大家好,我是阿可,微赚淘客系统及省赚客APP创始人,是个冬天不穿秋裤,天冷也要风度的程序猿!

在霸王餐返利app的业务场景中,订单处理涉及用户下单、商家确认、返利计算、资金结算等多个环节,传统单体架构面临耦合度高、扩展性差、故障传播快等问题。基于此,我们采用分布式架构+事件驱动模式设计订单处理系统,通过事件解耦服务、异步化处理流程,提升系统吞吐量与稳定性。以下从架构整体设计、核心组件实现、代码示例三方面展开说明。
在这里插入图片描述

一、架构整体设计:事件驱动的分布式分层模型

系统采用四层分布式架构,分别为接入层、业务服务层、事件中间件层、数据存储层,各层通过事件实现松耦合通信,架构图如下(文字描述):

  1. 接入层:负责用户请求转发,采用Nginx+Gateway实现负载均衡与接口鉴权;
  2. 业务服务层:拆分为订单服务、商家服务、返利服务、支付服务等微服务,每个服务专注于单一领域;
  3. 事件中间件层:基于RocketMQ实现事件发布与订阅,确保事件可靠投递;
  4. 数据存储层:采用MySQL分库分表存储订单数据,Redis缓存热点数据,Elasticsearch存储订单日志。

核心设计原则:通过事件驱动实现服务解耦,每个服务仅关注自身业务逻辑,通过订阅事件触发流程,例如用户下单后,订单服务发布“订单创建事件”,商家服务、返利服务订阅该事件分别执行确认订单、计算返利操作。

二、核心组件实现:事件定义与服务交互

2.1 事件模型设计

事件是系统通信的核心载体,需包含事件ID、事件类型、业务数据、时间戳等字段。基于Java实现通用事件模型,代码如下:

package cn.juwatech.bawangcan.event;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.UUID;

/**
 * 通用事件模型
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class BaseEvent {
    // 事件唯一标识
    private String eventId;
    // 事件类型(如ORDER_CREATED、MERCHANT_CONFIRMED)
    private String eventType;
    // 业务数据(存储订单ID、用户ID等关键信息)
    private Map<String, Object> bizData;
    // 事件创建时间戳
    private Long createTime;

    // 构建事件的静态方法
    public static BaseEvent build(String eventType, Map<String, Object> bizData) {
        BaseEvent event = new BaseEvent();
        event.setEventId(UUID.randomUUID().toString().replace("-", ""));
        event.setEventType(eventType);
        event.setBizData(bizData);
        event.setCreateTime(System.currentTimeMillis());
        return event;
    }
}

2.2 事件发布与订阅组件

基于RocketMQ封装事件发布者与订阅者,统一事件处理入口,代码如下:

2.2.1 事件发布者(生产者)
package cn.juwatech.bawangcan.event.producer;

import cn.juwatech.bawangcan.event.BaseEvent;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;

/**
 * 事件生产者:负责发布事件到RocketMQ
 */
@Component
public class EventProducer {
    @Value("${rocketmq.producer.group}")
    private String producerGroup;
    @Value("${rocketmq.namesrv.addr}")
    private String namesrvAddr;

    private DefaultMQProducer producer;

    @PostConstruct
    public void init() throws Exception {
        producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        // 重试次数
        producer.setRetryTimesWhenSendFailed(3);
        producer.start();
    }

    /**
     * 发布事件
     * @param topic 事件主题(如bawangcan_order_topic)
     * @param event 事件对象
     */
    public void publishEvent(String topic, BaseEvent event) throws Exception {
        Message message = new Message(
                topic,
                event.getEventType(),
                event.getEventId(),
                JSON.toJSONString(event).getBytes(StandardCharsets.UTF_8)
        );
        producer.send(message);
    }

    @PreDestroy
    public void destroy() {
        if (producer != null) {
            producer.shutdown();
        }
    }
}
2.2.2 事件订阅者(消费者)
package cn.juwatech.bawangcan.event.consumer;

import cn.juwatech.bawangcan.event.BaseEvent;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;

/**
 * 事件消费者:订阅事件并触发业务处理
 */
@Component
public class OrderEventConsumer {
    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;
    @Value("${rocketmq.namesrv.addr}")
    private String namesrvAddr;

    private DefaultMQPushConsumer consumer;

    @PostConstruct
    public void init() throws Exception {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        // 订阅订单相关主题,监听所有标签(事件类型)
        consumer.subscribe("bawangcan_order_topic", "*");
        // 注册事件处理监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
                        BaseEvent event = JSON.parseObject(msgBody, BaseEvent.class);
                        // 根据事件类型分发处理
                        handleEvent(event);
                    } catch (Exception e) {
                        // 消费失败,返回重试状态
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                // 消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

    /**
     * 事件分发处理
     */
    private void handleEvent(BaseEvent event) {
        switch (event.getEventType()) {
            case "ORDER_CREATED":
                // 处理订单创建事件(如通知商家)
                handleOrderCreatedEvent(event);
                break;
            case "MERCHANT_CONFIRMED":
                // 处理商家确认事件(如计算返利)
                handleMerchantConfirmedEvent(event);
                break;
            default:
                throw new IllegalArgumentException("未知事件类型:" + event.getEventType());
        }
    }

    private void handleOrderCreatedEvent(BaseEvent event) {
        // 业务逻辑:获取订单ID,调用商家服务发送确认通知
        String orderId = (String) event.getBizData().get("orderId");
        // 省略商家通知逻辑...
        System.out.println("订单创建事件处理完成,订单ID:" + orderId);
    }

    private void handleMerchantConfirmedEvent(BaseEvent event) {
        // 业务逻辑:获取订单ID,调用返利服务计算返利
        String orderId = (String) event.getBizData().get("orderId");
        // 省略返利计算逻辑...
        System.out.println("商家确认事件处理完成,订单ID:" + orderId);
    }

    @PreDestroy
    public void destroy() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

2.3 订单服务核心逻辑

订单服务作为核心服务,负责订单创建、状态更新,并发布相关事件,代码如下:

package cn.juwatech.bawangcan.service;

import cn.juwatech.bawangcan.event.BaseEvent;
import cn.juwatech.bawangcan.event.producer.EventProducer;
import cn.juwatech.bawangcan.mapper.OrderMapper;
import cn.juwatech.bawangcan.model.Order;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.Map;

/**
 * 订单服务
 */
@Service
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private EventProducer eventProducer;

    /**
     * 创建订单
     * @param order 订单信息
     * @return 订单ID
     */
    @Transactional(rollbackFor = Exception.class)
    public String createOrder(Order order) throws Exception {
        // 1. 保存订单到数据库
        orderMapper.insert(order);
        String orderId = order.getOrderId();

        // 2. 发布“订单创建事件”
        Map<String, Object> bizData = new HashMap<>();
        bizData.put("orderId", orderId);
        bizData.put("userId", order.getUserId());
        bizData.put("merchantId", order.getMerchantId());
        BaseEvent event = BaseEvent.build("ORDER_CREATED", bizData);
        eventProducer.publishEvent("bawangcan_order_topic", event);

        return orderId;
    }

    /**
     * 更新订单状态(由商家服务调用)
     */
    public void updateOrderStatus(String orderId, String status) {
        LambdaUpdateWrapper<Order> wrapper = new LambdaUpdateWrapper<>();
        wrapper.eq(Order::getOrderId, orderId)
                .set(Order::getStatus, status);
        orderMapper.update(null, wrapper);
    }
}

三、关键技术亮点与问题解决

  1. 事件幂等性处理:由于RocketMQ可能存在重复投递,每个事件处理逻辑需实现幂等。例如在handleOrderCreatedEvent方法中,先查询订单状态,仅当状态为“未处理”时执行后续操作;
  2. 分布式事务保障:采用“本地消息表+事务消息”方案,订单服务在保存订单时,同时将事件存入本地消息表,事务提交后再发布事件,确保订单创建与事件发布的一致性;
  3. 系统可观测性:通过SkyWalking实现分布式追踪,每个事件携带traceId,可追踪从订单创建到返利结算的完整链路;同时通过Prometheus监控各服务的事件处理延迟、成功率等指标。

本文著作权归聚娃科技省赚客app开发者团队,转载请注明出处!


网站公告

今日签到

点亮在社区的每一天
去签到