高可用环境kafka消息未按顺序消费问题

发布于:2024-04-20 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

1、背景

2、问题排查

3、问题解决


1、背景

质检任务是异步执行,正常情况下任务状态扭转是    等待中》运行中》成功(失败)。在质量平台生成任务实例,此时状态是等待中,生成实例之后把具体的任务sql给到大数据平台执行,大数据平台会发运行中、成功、失败状态的kafka消息,正常情况下状态是顺序下发。

升级部署某个项目,生产环境突然出现很多任务,一直是运行中状态。

2、问题排查

(1)怀疑大数据平台,任务没有正常执行完成,所以任务一直是运行中

1、在yarn平台以及数据库中,都没有发现正在运行中的质量sql任务

2、排查质量平台服务器日志(kafka消息打印接收消息日志很必要,出问题利于排查),发现这个某个sqlId,正常返回了kafka消息,包括运行中、成功、失败等消息。

通过上面的排查,大数据平台没问题,正常执行了任务,正常按顺序给质量平台发了kafka消息

(2)排查质量平台处理kafka消息逻辑

kafka按顺序返回了状态,质量平台没按顺序消费,看质量平台代码如下。

    @Override
    @Async("zyslClThreadPool")
    public void procZyslJobData(StatusInfo statusInfo) {
        try {
            String zyslId = statusInfo.getTaskId();
            String sqlId = statusInfo.getSqlId();
          -dosomething();
            //运行中任务把检核状态更新成运行中
            if (StatusEnum.RUNNING.getCode().equals(statusInfo.getStatus().getCode())) {
                if (ZyslYxztEnum.WAITING_SUBMIT.getBm().equals(oldGxZysl.getYxzt())
               || ZyslYxztEnum.WAITING.getBm().equals(oldGxZysl.getYxzt())) {
                  //运行中
                oldGxZysl.setYxzt(ZyslYxztEnum.RUNNING.getBm());


                    gxZyslMapper.updateById(oldGxZysl);
                }
                //运行中状态
               oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.RUNNING.getBm());
                gxZyrzMapper.updateById(oldGxZyrzUpdate);
                return;
            }
            if (StatusEnum.FAILED.getCode().equals(statusInfo.getStatus().getCode())) {
                log.error("大数据job错误,执行任务失败:{},zyslid:{},参数:{}", statusInfo.getMessage(), zyslId,
                        JSON.toJSONString(statusInfo));
                //处理失败
                procFail(statusInfo, oldGxZyrzUpdate);
            }
            if (StatusEnum.FINISH.getCode().equals(statusInfo.getStatus().getCode())) {
                //正常sql和异常sql都执行完成
                oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.SUCCESS.getBm());
            }
            // 先更新状态,后处理事件
             dosomething2();
        } catch (Exception e) {
            log.error("大数据作业实例结果处理报错:{}", e.getMessage());
        }
    }
    
     public void  dosomething2(GxZyrz gxZyrz, SsZyxx zyxx) {
        if (Constants.CODE_SUCCESS.equals(gxZyrz.getYxzt())) {
              //成功状态查询es、发邮件等
             dosomething3();
        } else if (Constants.CODE_FAILED.equals(gxZyrz.getYxzt())) {
            gxZyrz.setYcs(0L);
            gxZyrz.setZyl(0L);
            gxZyrz.setSfgj("N");
        }
        gxZyrzMapper.updateById(gxZyrz);
    }

kafka消息被多线程异步处理了。

1、如果任务sql执行成功,kafka返回运行中、执行成功消息

线程1 - 处理待运行任务

线程2- 处理成功状态

通过代码分析,线程1更新任务之前会更新另外一张表状态,假如gxZyslMapper.updateById是2秒时间,   线程2更新状态之前会dosomething3()查询es、查询数据表、发邮件,肯定超过5秒,然后更新sql任务状态。

结论: 如果sql是执行成功,这种情况,应该不会出现线程2先把任务状态更新成成功,然后线程1把状态更新是运行中。

2、如果任务sql执行失败,kafka返回运行中、执行失败消息

线程1 - 处理待运行任务

线程2- 处理失败状态

通过代码分析,线程1更新任务之前会更新另外一张表状态,假如update是2秒时间,   线程2直接更新sql任务状态。

结论: 如果sql是失败成功,这种情况,如果运行中、运行失败状态消息时间建个在2秒内,应该会出现线程2先把任务状态更新成失败,然后线程1把状态更新是运行中。

代码分析之后,带着结论去现场验证,发现确实是失败状态任务状态被逆写了。

按这个结论,按理来讲部署的所有现场都会出现问题,为什么只有这个现场有问题呢?

大数据那边升级了代码,以前执行失败的任务,运行中和运行失败,他们发消息间隔至少耗时在5秒,改了逻辑之后直接失败的任务,发信息间隔在2秒内。这就验证了这个问题

3、问题解决

1、运行中状态,先更新sqlId对应的任务状态,然后更新别的数据表状态;2、更新运行中的状态不直接更新,带着状态更新    update zyrz set yxzt = '2' where id = 'xxx' and yxzt not in('0','1')