目录
本文来源:极客时间vip课程笔记
一、RocketMQ如何实现事务的
1.1、普通业务代码实现RocketMQ 的事务大致流程
首先我们一起通过普通业务代码来看 RocketMQ 的事务大致流程。
public class CreateOrderService { @Inject private OrderDao orderDao; // 注入订单表的DAO @Inject private ExecutorService executorService; //注入一个ExecutorService private TransactionMQProducer producer; // 初始化transactionListener 和 producer @Init public void init() throws MQClientException { TransactionListener transactionListener = createTransactionListener(); producer = new TransactionMQProducer("myGroup"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); } // 创建订单服务的请求入口 @PUT @RequestMapping(...) public boolean createOrder(@RequestBody CreateOrderRequest request) { // 根据创建订单请求创建一条消息 Message msg = createMessage(request); // 发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, request); // 返回:事务是否成功 return sendResult.getSendStatus() == SendStatus.SEND_OK; } private TransactionListener createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { CreateOrderRequest request = (CreateOrderRequest ) arg; try { // 执行本地事务创建订单 orderDao.createOrderInDB(request); // 如果没抛异常说明执行成功,提交事务消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Throwable t) { // 失败则直接回滚事务消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } // 反查本地事务