Flink CDC 3.0 Starrocks建表失败会导致任务卡主!

发布于:2024-02-28 ⋅ 阅读:(75) ⋅ 点赞:(0)

Flink CDC 3.0 Starrocks建表失败会导致任务卡主!

现象

StarRocks建表失败,然后任务自动重启,重启完毕后数据回放,jobMaster打印下面日志后,整个任务会卡主

There are already processing requests. Wait for processing

原因分析

前提概要:可以先阅读CDC表变更处理流程然后再读下面会更加清晰

涉及类包括SchemaRegistrySchemaOperatorStarRocksMetadataApplier

SchemaRegistry->handleEventFromOperator方法执行建表失败后会导致任务重启,但是jobMaster不会重启,因此SchemaRegistry.requestHandler.pendingSchemaChanges无法删除导致任务卡主!

public void flushSuccess(TableId tableId, int sinkSubtask) {
    flushedSinkWriters.add(sinkSubtask);
    if (flushedSinkWriters.equals(activeSinkWriters)) {
        LOG.info(
                "All sink subtask have flushed for table {}. Start to apply schema change.",
                tableId.toString());
        PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
        //执行表结构变更操作!
        applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());
        waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));

        if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
            //异常会跳过删除pendingSchame!
            startNextSchemaChangeRequest();
        }
    }
}
//删除pendingSchemaChanges中已经完成的pendingSchame
private void startNextSchemaChangeRequest() {
    this.pendingSchemaChanges.remove(0);
    this.flushedSinkWriters.clear();
    ...
}

public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
        SchemaChangeRequest request) {
    //历史pendingSchame未删除导致,卡主
    if (pendingSchemaChanges.isEmpty()) {
        LOG.info(
                "Received schema change event request from table {}. Start to buffer requests for others.",
                request.getTableId().toString());
        if (request.getSchemaChangeEvent() instanceof CreateTableEvent
                && schemaManager.schemaExists(request.getTableId())) {
            return CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(false)));
        }
        CompletableFuture<CoordinationResponse> response =
                CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));
        schemaManager.applySchemaChange(request.getSchemaChangeEvent());
        pendingSchemaChanges.add(new PendingSchemaChange(request, response));
        pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
        return response;
    } else {
        LOG.info("There are already processing requests. Wait for processing.");
        CompletableFuture<CoordinationResponse> response = new CompletableFuture<>();
        pendingSchemaChanges.add(new PendingSchemaChange(request, response));
        return response;
    }
}

解决办法

  1. 让建表执行成功
  2. catch住异常,将schame删除后再异常重启(未验证)
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到