分布式事务之通过RocketMQ事务实现可靠消息最终一致性方案

发布于:2023-01-20 ⋅ 阅读:(198) ⋅ 点赞:(0)

分布式事务基础理论

基于上述的CAP和BASE理论,一般情况下会保证P和A,舍弃C,保证最终一致性。最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的"支付中"状态,最终会变为“支付 成功”或者"支付失败",使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。

概述案例

此方案的核心是将分布式事务拆分成多个本地事务,然后通过网络由消息队列协调完成所有事务,并实现最终一致性。以转账为例:

1. 消息发送方张三,即Bank1: 扣减余额30元,然后通过网络发送消息到MQ

2. 消息接收方李四,即Bank2: 通过网络从MQ中接收消息,然后增加余额30元

该解决方案容易理解,实现成本低,但是面临以下几个问题:

1.消息发送方执行本地事务与发送消息的原子性问题,也就是说如何保证本地事务执行成功,消息一定发送成功

begin transaction

1.数据库操作

2.发送消息

commit transation

这种情况下,貌似没有问题,如果发送消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但此时消息已经正常发送了,同样会导致不一致。

2.消息接收方接收消息与本地事务的原子性问题,也就是说如何保证接收消息成功后,本地事务一定执行成功

3.由于消息可能会重复发送,这就要求消息接收方必须实现幂等性

由于在生产环境中,消费方很有可能是个集群,若某一个消费节点超时但是消费成功,会导致集群同组 其他节点重复消费该消息。另外意外宕机后恢复,由于消费进度没有及时写入磁盘,会导致消费进度部 分丢失,从而导致消息重复消费。

解决方案:RocketMQ可靠消息

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便 利性支持。因此,我们通过RocketMQ就可以解决前面的问题。

1.消息发送方执行本地事务与发送消息的原子性问题,也就是说如何保证本地事务执行成功,消息一定发送成功

RocketMQ中的Broker 与 发送方 具备双向通信能力,使得 broker 天生可以作为一个事务协调者存在;并且RocketMQ 本身提供了存储机制,使得事务消息可以持久化保存;这些优秀的设计可以保证即使发生了异常,RocketMQ依然能够保证达成事务的最终一致性。

 1. 发送方发送一个事务消息给Broker,RocketMQ会将消息状态标记为“Prepared”,此时这条消息暂时不能被接收方消费。这样的消息称之为Half Message,即半消息。

2. Broker返回发送成功给发送方

3. 发送方执行本地事务,例如操作数据库

4. 若本地事务执行成功,发送commit消息给Broker,RocketMQ会将消息状态标记为“可消费”,此 时这条消息就可以被接收方消费;若本地事务执行失败,发送rollback消息给Broker,RocketMQ 将删除该消息。

5. 如果发送方在本地事务过程中,出现服务挂掉,网络闪断或者超时,那Broker将无法收到确认结 果

6. 此时RocketMQ将会不停的询问发送方来获取本地事务的执行状态(即事务回查)

7. 根据事务回查的结果来决定Commit或Rollback,这样就保证了消息发送与本地事务同时成功或同时失败。

以上主干流程已由RocketMQ实现,对于我们来说只需要分别实现本地事务执行的方法以及本地事务回查的方法即可,具体来说就是实现下面这个接口:

public interface TransactionListener {
/**
- 发送prepare消息成功后回调该方法用于执行本地事务
- @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
- @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,
这里能获取到
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:未知,需要回查
*/
LocalTransactionState executeLocalTransaction(final Message msg, final
Object arg);
/**
- @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:未知,需要回查
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

2.消息接收方接收消息与本地事务的原子性问题,也就是说如何保证接收消息成功后,本地事务一定执行成功

如果是出现了异常,RocketMQ会通过重试机制,每隔一段时间消费消息,然后再执行本地事务;如果 是超时,RocketMQ就会无限制的消费消息,不断的去执行本地事务,直到成功为止。

快速入门

通过RocketMQ可靠消息实现最终一致性,模拟两个账户的转账交易过程。两个账户分别在不同 的银行(张三在bank1、李四在bank2),bank1、bank2是两个相互独立的微服务。

为了实现幂等性,需要分别在bank1、bank2数据库中新增de_duplication表,即交易记录表(去重表)。

消息发送方bank1

1. Controller层代码以及定义一个类封装转账消息:

@RestController
@Slf4j
public class AccountInfoController {
    @Autowired
    private AccountInfoService accountInfoService;

    @GetMapping(value = "/transfer")
    public String transfer(){
        accountInfoService.updateAccountBalance(new AccountChangeEvent("1",100,System.currentTimeMillis()));
        return "转账成功";
    }
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountChangeEvent implements Serializable {
    /**
     * 账号
     */
    private String accountNo;
    /**
     * 变动金额
     */
    private double amount;
    /**
     * 事务号,时间戳
     */
    private long txNo;

}

2. 实现dao数据访问层,一共四个功能

@Mapper
@Component
public interface AccountInfoDao {
    /**
     * 修改某账号的余额
     * @param accountNo 账号
     * @param amount 变动金额
     * @return
     */
        @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")
                int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);


    /**
     * 查询某账号信息
     * @param accountNo 账号
     * @return
     */
    @Select("select * from account_info where where account_no=#{accountNo}")
    AccountInfo findByIdAccountNo(@Param("accountNo") String accountNo);

    /**
     * 查询某事务记录是否已执行
     * @param txNo 事务编号
     * @return
     */
    @Select("select count(1) from de_duplication where tx_no = #{txNo}")
    int isExistTx(long txNo);

    /**
     * 保存某事务执行记录
     * @param txNo 事务编号
     * @return
     */
    @Insert("insert into de_duplication values(#{txNo},now());")
    int addTx(long txNo);
}

3. 实现发送转账消息

@Component
public class BankMessageProducer {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void sendAccountChangeEvent(AccountChangeEvent accountChangeEvent) {
        //构造消息
        JSONObject jsonObject=new JSONObject();
        jsonObject.put("accountChange",accountChangeEvent);
        //这里Message需要导入spring的包
        Message<String> msg= MessageBuilder.withPayload( jsonObject.toJSONString()).build();

        //发送消息
        rocketMQTemplate.sendMessageInTransaction("producer_ensure_transfer",
                "topic_ensure_transfer",
                msg, null);

    }
}

业务流程1(发送转账消息)

controller接收修改金额请求  调用service修改金额方法 传入一个AccountChangeEvent对象,用当前时间代表事务号

service层方法调用BankMessageProducer消息发送类中的方法

 由于RocketMQTemplate发送消息需要一个message对象 所以要先创建一个JSONObject将AccountChangeEvent对象写入然后转换为JSON字符串封装到Message对象中,将其发送,使用sendMessageInTransaction发送事务消息

4. 实现业务层代码

分别实现了发送事务消息与本地事务扣减金额,注意 doUpdateAccountBalance的本地事务若执行成功,就会在交易记录去重表(de_duplication)保存数据。

@Transactional(isolation = Isolation.SERIALIZABLE)(最高隔离级别,不允许事务并发执行,而必须串行化执行,最安全,不可能出现更新、脏读、不可重复读、幻读)

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    private BankMessageProducer bankMessageProducer;

    @Autowired
    private AccountInfoDao accountInfoDao;
     /**
     * 更新帐号余额-发送消息
     * @param accountChange
     */
    @Override
    public void updateAccountBalance(AccountChangeEvent accountChange) {
        bankMessageProducer.sendAccountChangeEvent(accountChange);
    }
    /**
     * 更新帐号余额-本地事务
     * @param accountChange
     */
    @Override
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void doUpdateAccountBalance(AccountChangeEvent accountChange) {
        accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount()*-1);
        accountInfoDao.addTx(accountChange.getTxNo());
    }
}

修改账户金额完成后保存该事务执行的记录到de_duplication表中

5. 实现RocketMQ事务消息监听器

RocketMQLocalTransactionListener

其中有两个功能:

(1) executeLocalTransaction,该方法执行本地事务,会被RocketMQ自动调用

在这个方法中进行事务处理 如果失败了就选择回滚事务

return RocketMQLocalTransactionState.ROLLBACK;

成功就提交事务

return RocketMQLocalTransactionState.COMMIT;

(2) checkLocalTransaction,该方法实现事务回查,利用了交易记录去重表(de_duplication),会被RocketMQ自动调用

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_ensure_transfer")
public class TransferTransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Autowired
    private AccountInfoService accountInfoService;

    @Autowired
    private AccountInfoDao accountInfoDao;

    /**
     * 执行本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //1.接收并解析消息
        final JSONObject jsonObject = JSON.parseObject(new String((byte[])
                msg.getPayload()));
        AccountChangeEvent accountChangeEvent =
                JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
                        class);

        //2.执行本地事务
        Boolean isCommit = true;
        try {
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
        }catch (Exception e){
            isCommit = false;
        }

        //3.返回执行结果
        if(isCommit){
            return RocketMQLocalTransactionState.COMMIT;
        }else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    /**
     * 事务回查
    * @param msg
    * @return
        */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        System.out.println("事务回查");
        //1.接收并解析消息
        final JSONObject jsonObject = JSON.parseObject(new String((byte[])
                msg.getPayload()));
        AccountChangeEvent accountChangeEvent =
                JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
                        class);

        //2.查询de_duplication表
        int isExistTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo());

        //3.根据查询结果返回值
        if(isExistTx>0){
            return RocketMQLocalTransactionState.COMMIT;
        }else {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

rocketMQ逻辑结构:

业务流程2(消息成功接受回调,执行本地事务)

监听器监听到broker传回的半消息发送成功后执行本地事务  设置一个boolean值代表事务执行成功与否,如果抓取到异常则说明本地事务执行失败,失败则进行回滚,否则就提交事务

业务流程3(事务回查)

由于在执行executeLocalTransaction这个方法的时候可能会出现异常而导致执行事务的结果并没有发送给rocketMQ,如果是超时异常,数据库回滚,但此时消息已经正常发送了,同样会导致不一致,破坏执行本地事务与发送消息这两个操作的原子性问题,所以rocketMQ会自动调用checkLocalTransaction这个方法会不断的检查事务执行结果  而这个方法是根据de_duplication表中的数据来判断事务是否成功,因为在业务流程2中修改bank1数据的时候也会同步记录到de_duplication表中 根据结果返回提交或者回滚消息给rocketMQ

至此 bank1这里的业务步骤就执行完毕了

消息接受方bank2

1.定义监听器

业务流程1(监听消息)

一旦监听到bank1服务发送的消息 就会执行onMessage方法

该方法会先解析JSON字符串,获取其中的accountChangeEvent对象

接着执行本地事务更新 调用service层方法

@Component
@RocketMQMessageListener(topic = "topic_ensure_transfer", consumerGroup =
        "consumer_ensure_transfer")
public class EnsureMessageConsumer implements RocketMQListener<String> {

    @Autowired
    private AccountInfoService accountInfoService;

    @Override
    public void onMessage(String message) {
        System.out.println("消费消息:"+message);
        //1.解析消息
        final JSONObject jsonObject = JSON.parseObject(message);
        AccountChangeEvent accountChangeEvent =
                JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
                        class);
        //2.执行本地事务
        accountChangeEvent.setAccountNo("2");
        accountInfoService.updateAccountBalance(accountChangeEvent);
    }
}

2.service层代码

业务流程2(执行本地事务)

最高级事务级别,先看下de_duplication中是否有事务数据 如果有的话证明已经修改过bank2的数据库,则不做修改,以此避免了接口幂等性问题

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    private AccountInfoDao accountInfoDao;

    @Override
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void updateAccountBalance(AccountChangeEvent accountChange) {
        System.out.println("bank2执行本地事务");
        int isExistsTx = accountInfoDao.isExistTx(accountChange.getTxNo());
        if(isExistsTx==0){
            accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount());
            accountInfoDao.addTx(accountChange.getTxNo());

        }
    }
}

总结:

bank1和bank2都成功

bank1执行本地事务失败,则bank2接收不到转账消息。

bank1执行完本地事务后,不返回任何信息,则Broker会进行事务回查,直到获取事务结果消息。

bank2执行本地事务失败,会进行重试消费。

可靠消息最终一致性事务适合执行周期长且实时性要求不高的场景。引入该机制后,同步的事务操作变为基于消息执行的异步操作, 避免了分布式事务中的同步阻塞操作的影响,并实现了两个服务的解耦。

本文含有隐藏内容,请 开通VIP 后查看