文章目录
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
- 【RocketMQ 生产者和消费者】- 消费者启动源码
- 【RocketMQ 生产者和消费者】- 消费者重平衡(1)
- 【RocketMQ 生产者和消费者】- 消费者重平衡(2)- 分配策略
上两篇文章讲解了消费者重平衡的源码以及几种重平衡策略,这篇文章我们来看下消费者 ID 对重平衡的影响。
2. 消费者 ID 重复
public interface AllocateMessageQueueStrategy {
List<MessageQueue> allocate(
final String consumerGroup,
final String currentCID,
final List<MessageQueue> mqAll,
final List<String> cidAll
);
String getName();
}
上面消费者重平衡接口可以看到传入的是消费者的 clientId,也就是客户端 id,这个 id 在前面消费者启动的时候就会设置,在当前版本会被替换为 PID + 当前时间,所以重平衡没什么问题。
但是像 4.7.1 这些之前的版本是没有当前时间的,只有当前进程名称。
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
那么如果进程 PID 相同会有什么问题吗,我们可以看下消费者的注册过程。
3. 消费者连接存储
broker 处理消费者心跳是通过 heartBeat 方法处理的,可以看到在里面会创建一个 ClientChannelInfo,用于标识生产者或者消费者的连接,可以看到里面的 heartbearData.getClientID 就是客户端 ID。
创建好了 ClientChannelInfo 之后,传入到 registerConsumer 来处理,在里面会通过 updateChannel 方法更新消费者下面的连接集合 channelInfoTable
,可以说消费者组信息里面的 channelInfoTable 存储了这个消费者组下面的消费者连接。
// 消费者组下面的连接信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
由于 key 是 Channel,所以相同 ID 的消费者不会被覆盖。消费者连接的存储就到这,在负载均衡的时候消费者会通过 findConsumerIdList 方法查询出这个消费者组下面的所有消费者。
4. 查询消费者组下面的所有连接
/**
* 从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合
*
* 1.获取topic下的所有consumer
* 2.先通过topic随机找到一个broker
* 3.然后通过这个broker找对应group下面的所有consumerId
*
* @param topic
* @param group
* @return
*/
public List<String> findConsumerIdList(final String topic, final String group) {
// 随机选择一个当前topic所属的broker
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
// 如果broker地址为null则请求nameserver更新topic路由信息
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
try {
// 根据brokerAddr和group 得到消费者客户端id列表, 所以可以看出来,一个group里面的客户端都需要订阅同一个topic
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
}
}
return null;
}
这个方法会随机选择一个 broker 集群的 master 地址,然后通过 getConsumerIdListByGroup 方法来发送请求获取这个消费者组下面的全部消费者客户端 id 集合。所以可以看得出来,一个消费者组下面的消费者都需要订阅同一个 topic,因为这里是以 group 维度去获取消费者的,如果这个消费者组里面有一个消费者订阅了 topicB,那么这个消费者注册的时候就会注册到 topicB 所在的 broker 集群,有可能这里就获取不到这个消费者了。
/**
* 根据brokerAddr和group 得到消费者客户端id列表
*/
public List<String> getConsumerIdListByGroup(
final String addr,
final String consumerGroup,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
// 构建请求头
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
// 构建请求命令对象,Code为GET_CONSUMER_LIST_BY_GROUP
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
// 发起同步调用
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
// 响应解码
GetConsumerListByGroupResponseBody body =
GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
// 返回客户端id列表
return body.getConsumerIdList();
}
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
看上面代码,获取消费者集合的请求 Code 是 GET_CONSUMER_LIST_BY_GROUP,获取到返回结果之后返回 consumerIdList 集合,客户端处理这个请求是通过 getConsumerListByGroup 来处理的。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
// 返回指定group的所有客户端id集合
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
// 更新消费偏移量
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
// 查询消费偏移量
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
/**
* 返回指定group的所有客户端id集合
*/
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
// 创建响应命令对象
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
// 解析请求头
final GetConsumerListByGroupRequestHeader requestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
// 从broker的consumerTable中获取指定group的消费者组信息
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(
requestHeader.getConsumerGroup());
if (consumerGroupInfo != null) {
// 获取所有客户端id集合
List<String> clientIds = consumerGroupInfo.getAllClientId();
if (!clientIds.isEmpty()) {
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
} else {
log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
} else {
log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
return response;
}
在 getConsumerListByGroup 方法中会先通过消费者组获取这个消费者组信息 consumerGroupInfo,然后通过 getAllClientId 方法获取这个这个消费者组下面的消费者集合。
public List<String> getAllClientId() {
List<String> result = new ArrayList<>();
Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, ClientChannelInfo> entry = it.next();
ClientChannelInfo clientChannelInfo = entry.getValue();
result.add(clientChannelInfo.getClientId());
}
return result;
}
可以看到获取消费者 id 就是直接遍历 channelInfoTable 集合,然后获取消费者的 clientId,所以如果两个消费者的 clientId 相同,那么这里 result 里面就会存储两个相同的 clientId。
5. AllocateMessageQueueAveragelyByCircle 策略
下面就以 AllocateMessageQueueAveragelyByCircle 为例子来看下如果 cidAll 里面有重复的 clientId 会发生什么,再来回顾下这个分配策略的源码。
/**
* 平均分配策略,环形分配
* 按照消费者的顺序进行一轮一轮的分配,直到分配完所有消息队列。例如有消费者A、B,有5个消息队列1、2、3、4、5。第一轮A分配1,B分配2;
* 第二轮A分配3,B分配4;第二轮A分配5。因此A分配到1、3、5,B分配到2、4。
* @param consumerGroup current consumer group
* @param currentCID current consumer id
* @param mqAll message queue set in current topic
* @param cidAll consumer set in current consumer group
* @return
*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
// 参数校验
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 索引
int index = cidAll.indexOf(currentCID);
// 获取每个分配轮次轮次中属于该消费者的对应的消息队列
for (int i = index; i < mqAll.size(); i++) {
if (i % cidAll.size() == index) {
result.add(mqAll.get(i));
}
}
return result;
}
由于两个消费者 ID 是一样的,所以求出来的 index 也是相同的,所以分配的队列也是一样的,我们可以用一个 main 方法模拟下。
public static void main(String[] args) {
AllocateMessageQueueAveragelyByCircle averagelyByCircle = new AllocateMessageQueueAveragelyByCircle();
List<MessageQueue> queues = new ArrayList<MessageQueue>();
for(int i = 0; i < 8; i++){
queues.add(new MessageQueue("topic-test", "broker-a", i));
}
System.out.println(averagelyByCircle.allocate("group", "0", queues, Arrays.asList("0", "2", "2")));
System.out.println(averagelyByCircle.allocate("group", "2", queues, Arrays.asList("0", "2", "2")));
System.out.println(averagelyByCircle.allocate("group", "2", queues, Arrays.asList("0", "2", "2")));
}
上面的 main 方法创建了 8 个队列,然后消费者组下面有 3 个消费者,id 是 [0,2,2],可以看到,消费者-0 的队列是 [0,3,6],两个消费者-2 分配的队列是 [1,4,7],这是因为两个消费者-2 求出来的下标都是 1,所以从 1 开始分配,因此 i % cidAll.size() == index
这个判断条件也都是一样的,所以最终分配的队列都是一样的。
对于消费者来说一个消息队列分配给了多个消费者,这样就有可能导致重复消费问题,而对于消息队列来说由于消息队列 2 和消息队列 5 没有分配给其他消费者,因此这个消息队列上面的消息不会被消费,导致消息堆积。
6. AllocateMessageQueueAveragely 策略
上面我们看了 AllocateMessageQueueAveragelyByCircle 策略,下面再来看下 AllocateMessageQueueAveragely,因为这个策略是默认的分配策略,也就是用户没有自己设定的情况下用的就是 AllocateMessageQueueAveragely。
/**
* 负载均衡分配队列, 平均分配
* @param consumerGroup 当前消费者组
* @param currentCID 当前消费者的 clientID
* @param mqAll 当前 topic 的所有队列
* @param cidAll 当前消费者组的所有消费者的 clientID
* @return
*/
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
// 参数校验
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
// 分配结果
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
// 如果这个消费者不是传入的消费者组下的, 有可能是刚启动没注册到 broker
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
// 就不分配队列处理了
return result;
}
// 假设现在有 8 个队列, 要负载均衡给 3 个消费者 [0, 1, 2], 而当前消费者[1]的 index = 1
// 那么在分配的时候三个消费者分配的队列标号就是 [0, 1, 2], [3, 4, 5], [6, 7]
// 这里就是求出来当前消费者 ID 所在的位置 = 1
int index = cidAll.indexOf(currentCID);
// 队列数 % 消费者数 = 剩余队列数 = 2
int mod = mqAll.size() % cidAll.size();
// 这里求平均数, 求出来的结果就是 8 / 3 + 1 = 3
// 1.如果队列数小于消费者数, 平均数就是 1
// 2.如果队列数大于消费者数, 并且当前队列的下标在 (0, mod) 这个范围, 那么平均数就是 mqAll.size() / cidAll.size() + 1
// 3.如果队列数大于消费者数, 并且当前队列的下标在 [mod, cidAll.size()) 这个范围, 那么平均数就是 mqAll.size() / cidAll.size()
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
// 计算当前消费者从哪里开始分配队列, 这里求出来的就是 1 * 3 = 3
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 这里求出来当前消费者要分配多少个队列, 比如当前就是分配 3 个队列
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
// 开始分配队列, 分配的起始位是是 startIndex, 分配的队列数量是 range
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
由于两个消费者 id 是一样的,所以 index 也是一样的,同样的 averageSize 也是一样的,startIndex 也是一样的,因此求出的队列也都是一样的。根据上一篇文章我们说过的消费者重平衡策略,这两个消费者分配到的队列是:[3,4,5]。
7. 示例
所以如果集合里面有多个消费者 id 相同,会导致这些消费者分配到的消息队列都一样,而且消息队列集合里面也有一些队列是没办法分配到消费者的,会导致消息堆积,现在我们可以模拟下真实情况,比如我们可以启动两个消费者消费同一个 topic,看看这两个消费者的分配队列输出情况,下面先来看下代码。
首先在 AllocateMessageQueueAveragely#allocate 的最后一行代码中加上打印日志。
然后我们启动消费者,消费者 ID 如果我们没有设置,默认就是 PID + 当前时间戳,不过可以通过 rocketmq.client.name
设置实例名称,如果不设置默认就是 DEFAULT。
不设置的情况下就会更改,如果设置了 instanceName,那么就用我们自己设置的。
所以我们启动两个消费者,一个是 Consumer-A,一个是 Consumer-B,下面是消费者的代码:
public class ConsumerA {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
public class ConsumerB {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
依次启动 NameServer、Broker、ConsumerA,ConsumerB,同时启动消费者 A 的时候设置参数 -Drocketmq.client.name=ConsumerA
,另一个消费者 B 启动设置 -Drocketmq.client.name=ConsumerB
,看消费者的输出:
可以看到,两个消费者平均分配了 [0,1,2,3] 四个队列,大家可能觉得奇怪,为什么输出 clientId 的时候前面还有一个 ip,那是因为我们设置的是 instanceName,真正获取 clientId 的时候还得拼接上 ip,不过我们是同一台机器启动的,所以 ip 也是相同的,只需要关注 instanceName 即可。
下面我们将两个消费者的 rocketmq.client.name=ConsumerA,也就是实例名称相同,下面是两个消费者的输出。
可以看到,这两个消费者 id 相同的情况下分配到的队列 id 都是一样的,而且可以发现 queueId = 2、3 的这两个队列没有分配给任何一个消费者,这也证明了我们上面的推测。
8. 小结
好了,这篇文章我们讲述了消费者 clientId 对消费者重平衡的影响,相同的 clientId 会导致消息队列分配不均衡,导致有一些队列没办法分配到消费者。
如有错误,欢迎指出!!!!