最近有个同学问了我一个非常有意思的问题, 今天我根据这个问题来给大家好好分析一下。
前提知识
我们都知道, 每个 Broker 都可以配置多个监听器, 用来用于网络分流。相关知识请看:一文搞懂Kafka中的listeners和advertised.listeners以及其他通信配置
然后, 我们客户端中需要配置bootstrap.servers=xxxx:port
来连接到集群中。然后当 Kafka 集群 Broker 数量很多的时候,我们不可能在bootstrap.servers
配置所有的地址
所以 Kafka 是允许你只配置其中部分地址的, 它会通过自身的元信息更新机制,去获取 Kafka 集群中的所有地址。然后如果需要去跟某一台 Broker 发起连接的话,就去元信息里面获取地址。
问题图述
那么问题来了, 既然一台 Broker 能够配置多个 Listener, 也就意味着有多个地址, 那么客户端在跟具体的 Broker 发起请求的话, 应该选择哪一个 Listener?是遍历吗?
上图客户端部分获取到的 Broker 列表 EndPoint 应该是什么呢?
问题源码探究
首先, 客户端(生产者、消费者)去获取集群元信息是通过元信息更新器 MetadataUpdater
具体的元信息更新器流程请看 客户端发起元信息更新请求.
我们重点看一下, 获取元信息返回之后,是如何解析 Broker 集群列表的,确定一下是不是把集群所有的 EndPoint 都获取了,还是只获取了一部分。
解析返回的元信息
直接定位到关键代码<font size=2 color=gray>Metadata#handleMetadataResponse</font>
private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse, boolean isPartialUpdate, long nowMs) {
//省略部分
Map<Integer, Node> nodes = metadataResponse.brokersById();
if (isPartialUpdate)
return this.cache.mergeWith(metadataResponse.clusterId(), nodes, partitions,
unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(),
(topic, isInternal) -> !topics.contains(topic) && retainTopic(topic, isInternal, nowMs));
else
return new MetadataCache(metadataResponse.clusterId(), nodes, partitions,
unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller());
}
复制代码
源码调试
本次启动的 Kafka 集群网络相关配置如下
①. server0.properties
listeners = PLAINTEXT://localhost:9090
复制代码
②. server1.properties
listeners = PLAINTEXT://localhost:9091,TEXT://localhost:9099
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,TEXT:PLAINTEXT
复制代码
③. server2.properties
listeners = PLAINTEXT://localhost:9090
复制代码
上面的配置, 只有 server1. 中的监听器配置了 2 个。
PLAINTEXT://localhost:9091
TEXT://localhost:9099
然后启动一个 KafkaProducer 客户端, bootstrap.servers=localhost:9099
.
通过 Debug 发现, KafkaProducer 客户端获取到的元信息集群列表只有
localhost:9099 (id: 1 rack: null)
也就是说,客户端拿到的信息是在对应 Broker 处理请求那里就已经做好的筛选了。为了搞清楚为什么这里只拿到了一个 EndPoint 信息, 我们需要去看看 Broker 是如何处理请求的。注意:这里发出去的请求是 UPDATE_METADATA
<font color=blue ><b>所以, 从客户端发出 UPDATE_METADATA 请求之后, 服务端是如何处理的呢?</b></font>
处理 handleTopicMetadataRequest 请求
先看一张服务端网络模型架构图
在 Kafka 启动的时候, 会根据 Listener 配置,启动对应个数的 Acceptor 和 Processor
比如在我们这个例子中, 有 2 个 Listener 配置, 那么就如下图所示(简化)
Acceptor:是专门用来监听连接过来的新链接请求的。
Kafka 启动的时候会创建对应个数的 Acceptor,这个 Acceptor 持有很多的信息, 比如 ChannelBuilder, 这个 ChannelBuilder 持有 ListenerName, 如下图所示
也就是说, 不管哪个客户端从哪个监听器访问到服务端, 都是可以确定它对应的监听器名称的。
比如, 配置了下面 2 个监听器, 如果我客户端通过localhost:9099
访问到了 Broker, 那么跟这个客户端建立链接的 Acceptor 就是监听器名为:TEXT 的那个。
PLAINTEXT://localhost:9091
TEXT://localhost:9099
知道这么一个前提之后, 我们再来分析如何处理 handleTopicMetadataRequest 请求
服务端接受请求入队
当对应的 Processor 监听到请求过来的时候,会将请求解析一下并组装成 Request,然后入队
<font color=gray size=2>Processor#processCompletedReceives</font>
我们可以看到, 在组装 Request 的时候, 是有把listenerName
传入的。
所以:<font color=red ><b>Request 持有 ListenerName。</b></font>
处理元信息更新请求 handleTopicMetadataRequest
<font color=gray size=2>KafkaApis#handleTopicMetadataRequest</font>
关键代码
从代码中可以看到, 我们拿到了所有的 Brokers 的 EndPoint, 包括多个监听器也都拿到了。
例子中, server1 配置了 2 个 listener, 这里就有 2 个 EndPoint。
但是真正把数据发往会客户端的时候, 是有根据listenerName
做过滤的!
brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
这个listenerName
也就是我们上面一开始分析过的, Processor 对应着监听器。 所以我们这个例子中, 监听器名是 TEXT
。
过滤完了之后,是不是发现只有一个复合要求,也就是他自己有一个 TEXT 监听器。
假如我们客户端发起请求的时候,bootstrap.servers=localhost:9091
, 是不是就命中的 PLAINTEXT 监听器。
因为这个例子中每个 Broker 都配置了 PLAINTEXT 监听器, 所以最终会返回 3 个 EndPoint。
结论
客户端对服务端发起请求的时候, 会根据命中的服务端的监听器, 然后根据这个命中的服务端监听器名 listenerName,过滤集群中其他 Broker 同样是配置了这个监听器名称的 EndPoint。
同样用一张图来回答一下最开始的问题图述的问题
因为图片里面的 case,是从 listener2 监听到的请求, 那么所有 Broker 的 EndPoint 也要用 Listener2 的监听器名称来进行过滤,也就会得到图中的结果。
关注公众号【编程程序V】,分享更多Java技术前沿文章,Java学习面试资源。
问题
如果客户端 bootstrap.servers 配置了多个地址,并且这些地址对应的监听器名字还不一样会有啥后果?
举个例子:
服务端配置:
server1
listeners = PLAINTEXT1://IP1:9090,PLAINTEXT3://IP1:9092
server2
listeners = PLAINTEXT1://IP2:9090,PLAINTEXT2://IP2:9091,PLAINTEXT3://IP2:9092
server3
listeners = PLAINTEXT1://IP3:9090,PLAINTEXT2://IP3:9091,PLAINTEXT3://IP3:9092
客户端配置
bootstrap.servers=IP1:9090,IP2:9090,IP3:9091
首先,客户端发起请求的时候,是去bootstrap.servers
获取一个最小负载的 IP, 然后去获取元数据。
比如第一次更新的时候,我们去 IP1:9090
请求元数据了。拿到的 listenerName=PLAINTEXT1
这个时候我们拿到的 Brokers 是 {IP1:9090、IP2:9090、IP3:9090} .
当后续更新的时候,如果去IP3:9091
获取数据的时候,拿到的 listenerName=PLAINTEXT2
这个时候我们拿到的 Brokers 是 {IP2:9091、IP3:9091} .
所以:客户端配置bootstrap.servers
的时候,尽量配置的地址都是属于同一个 ListenerName 。