flink如何支持kafka容灾自动切换

发布于:2025-06-28 ⋅ 阅读:(14) ⋅ 点赞:(0)

背景

在flink消费kafka消息时,我们会指定连接的kafka服务器的地址以及起始消费偏移等信息,一旦指定,当kafka服务器挂掉后,flink也会由于连接不上服务器而导致失败,这里想要解决的问题是当kafka在机房A挂掉后,如果机房B有对kafka进行容灾的频道,那么flink怎么可以做到自动切换到机房B的进行kafka消费?同理,当机房A数据恢复后,如何自动切回到机房A进行消费?这个过程自动发生而不需要手动修改kafka的地址

技术实现

flink消费kafka的实现类是FlinkKafkaConsumerBase,这个类内部有一个功能:可以自动发现满足某个规则的kafka主题并消费,其关键代码如下:

private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
        discoveryLoopThread =
                new Thread(
                        () -> {
                            try {
                                // --------------------- partition discovery loop
                                // ---------------------

                                // throughout the loop, we always eagerly check if we are still
                                // running before
                                // performing the next operation, so that we can escape the loop as
                                // soon as possible

                                while (running) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug(
                                                "Consumer subtask {} is trying to discover new partitions ...",
                                                getRuntimeContext().getIndexOfThisSubtask());
                                    }

                                    final List<KafkaTopicPartition> discoveredPartitions;
                                    try {
                                        discoveredPartitions =
                                                partitionDiscoverer.discoverPartitions();
                                    } catch (AbstractPartitionDiscoverer.WakeupException
                                            | AbstractPartitionDiscoverer.ClosedException e) {
                                        // the partition discoverer may have been closed or woken up
                                        // before or during the discovery;
                                        // this would only happen if the consumer was canceled;
                                        // simply escape the loop
                                        break;
                                    }

                                    // no need to add the discovered partitions if we were closed
                                    // during the meantime
                                    if (running && !discoveredPartitions.isEmpty()) {
                                        kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                                    }

                                    // do not waste any time sleeping if we're not running anymore
                                    if (running && discoveryIntervalMillis != 0) {
                                        try {
                                            Thread.sleep(discoveryIntervalMillis);
                                        } catch (InterruptedException iex) {
                                            // may be interrupted if the consumer was canceled
                                            // midway; simply escape the loop
                                            break;
                                        }
                                    }
                                }
                            } catch (Exception e) {
                                discoveryLoopErrorRef.set(e);
                            } finally {
                                // calling cancel will also let the fetcher loop escape
                                // (if not running, cancel() was already called)
                                if (running) {
                                    cancel();
                                }
                            }
                        },
                        "Kafka Partition Discovery for "
                                + getRuntimeContext().getTaskNameWithSubtasks());

        discoveryLoopThread.start();
    }

如上所示,他是通过开启一个线程,然后定时检测的方式来发现是否有新的符合规则条件的主题,如果有添加到消费队列中,读者会不会很好奇,我们讨论的是flink如何对kafka进行容灾切换,你和我说这个主题自动发现做什么?
其实这里想表达的是一样的思路,我们进行kafka容灾的切换也是可以这样做,我们开启一个线程,然后线程里面不停的检测当前消费的kafka集群的连通性是否正常,如果连接不上,那么表明发生了kafka的机房容灾,flink需要切换到kafka的机房B进行消费,那么这里剩下的就只是如何确定消费的偏移量的问题了


网站公告

今日签到

点亮在社区的每一天
去签到