【RocketMQ 生产者和消费者】- 消费者重平衡(3)- 消费者 ID 对负载均衡的影响

发布于:2025-06-27 ⋅ 阅读:(17) ⋅ 点赞:(0)


本文章基于 RocketMQ 4.9.3

1. 前言

上两篇文章讲解了消费者重平衡的源码以及几种重平衡策略,这篇文章我们来看下消费者 ID 对重平衡的影响。


2. 消费者 ID 重复

public interface AllocateMessageQueueStrategy {

    List<MessageQueue> allocate(
        final String consumerGroup,
        final String currentCID,
        final List<MessageQueue> mqAll,
        final List<String> cidAll
    );

    
    String getName();
}

上面消费者重平衡接口可以看到传入的是消费者的 clientId,也就是客户端 id,这个 id 在前面消费者启动的时候就会设置,在当前版本会被替换为 PID + 当前时间,所以重平衡没什么问题。
在这里插入图片描述
在这里插入图片描述
但是像 4.7.1 这些之前的版本是没有当前时间的,只有当前进程名称。

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}

那么如果进程 PID 相同会有什么问题吗,我们可以看下消费者的注册过程。


3. 消费者连接存储

在这里插入图片描述
broker 处理消费者心跳是通过 heartBeat 方法处理的,可以看到在里面会创建一个 ClientChannelInfo,用于标识生产者或者消费者的连接,可以看到里面的 heartbearData.getClientID 就是客户端 ID。
在这里插入图片描述
创建好了 ClientChannelInfo 之后,传入到 registerConsumer 来处理,在里面会通过 updateChannel 方法更新消费者下面的连接集合 channelInfoTable,可以说消费者组信息里面的 channelInfoTable 存储了这个消费者组下面的消费者连接。

// 消费者组下面的连接信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
    new ConcurrentHashMap<Channel, ClientChannelInfo>(16);

由于 key 是 Channel,所以相同 ID 的消费者不会被覆盖。消费者连接的存储就到这,在负载均衡的时候消费者会通过 findConsumerIdList 方法查询出这个消费者组下面的所有消费者。


4. 查询消费者组下面的所有连接

/**
 * 从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合
 *
 * 1.获取topic下的所有consumer
 * 2.先通过topic随机找到一个broker
 * 3.然后通过这个broker找对应group下面的所有consumerId
 *
 * @param topic
 * @param group
 * @return
 */
public List<String> findConsumerIdList(final String topic, final String group) {
    // 随机选择一个当前topic所属的broker
    String brokerAddr = this.findBrokerAddrByTopic(topic);
    if (null == brokerAddr) {
        // 如果broker地址为null则请求nameserver更新topic路由信息
        this.updateTopicRouteInfoFromNameServer(topic);
        brokerAddr = this.findBrokerAddrByTopic(topic);
    }

    if (null != brokerAddr) {
        try {
            // 根据brokerAddr和group 得到消费者客户端id列表, 所以可以看出来,一个group里面的客户端都需要订阅同一个topic
            return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
        } catch (Exception e) {
            log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
        }
    }

    return null;
}

这个方法会随机选择一个 broker 集群的 master 地址,然后通过 getConsumerIdListByGroup 方法来发送请求获取这个消费者组下面的全部消费者客户端 id 集合。所以可以看得出来,一个消费者组下面的消费者都需要订阅同一个 topic,因为这里是以 group 维度去获取消费者的,如果这个消费者组里面有一个消费者订阅了 topicB,那么这个消费者注册的时候就会注册到 topicB 所在的 broker 集群,有可能这里就获取不到这个消费者了。

/**
 * 根据brokerAddr和group 得到消费者客户端id列表
 */
public List<String> getConsumerIdListByGroup(
    final String addr,
    final String consumerGroup,
    final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
    MQBrokerException, InterruptedException {
    // 构建请求头
    GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    // 构建请求命令对象,Code为GET_CONSUMER_LIST_BY_GROUP
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
    // 发起同步调用
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            if (response.getBody() != null) {
                // 响应解码
                GetConsumerListByGroupResponseBody body =
                    GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
                // 返回客户端id列表
                return body.getConsumerIdList();
            }
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

看上面代码,获取消费者集合的请求 Code 是 GET_CONSUMER_LIST_BY_GROUP,获取到返回结果之后返回 consumerIdList 集合,客户端处理这个请求是通过 getConsumerListByGroup 来处理的。

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
            // 返回指定group的所有客户端id集合
            return this.getConsumerListByGroup(ctx, request);
        case RequestCode.UPDATE_CONSUMER_OFFSET:
            // 更新消费偏移量
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET:
            // 查询消费偏移量
            return this.queryConsumerOffset(ctx, request);
        default:
            break;
    }
    return null;
}

/**
 * 返回指定group的所有客户端id集合
 */
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    // 创建响应命令对象
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
    // 解析请求头
    final GetConsumerListByGroupRequestHeader requestHeader =
        (GetConsumerListByGroupRequestHeader) request
            .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
    // 从broker的consumerTable中获取指定group的消费者组信息
    ConsumerGroupInfo consumerGroupInfo =
        this.brokerController.getConsumerManager().getConsumerGroupInfo(
            requestHeader.getConsumerGroup());
    if (consumerGroupInfo != null) {
        // 获取所有客户端id集合
        List<String> clientIds = consumerGroupInfo.getAllClientId();
        if (!clientIds.isEmpty()) {
            GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
            body.setConsumerIdList(clientIds);
            response.setBody(body.encode());
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        } else {
            log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        }
    } else {
        log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
    }

    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
    return response;
}

在 getConsumerListByGroup 方法中会先通过消费者组获取这个消费者组信息 consumerGroupInfo,然后通过 getAllClientId 方法获取这个这个消费者组下面的消费者集合。

public List<String> getAllClientId() {
    List<String> result = new ArrayList<>();

    Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();

    while (it.hasNext()) {
        Entry<Channel, ClientChannelInfo> entry = it.next();
        ClientChannelInfo clientChannelInfo = entry.getValue();
        result.add(clientChannelInfo.getClientId());
    }

    return result;
}

可以看到获取消费者 id 就是直接遍历 channelInfoTable 集合,然后获取消费者的 clientId,所以如果两个消费者的 clientId 相同,那么这里 result 里面就会存储两个相同的 clientId。


5. AllocateMessageQueueAveragelyByCircle 策略

下面就以 AllocateMessageQueueAveragelyByCircle 为例子来看下如果 cidAll 里面有重复的 clientId 会发生什么,再来回顾下这个分配策略的源码。

/**
 * 平均分配策略,环形分配
 * 按照消费者的顺序进行一轮一轮的分配,直到分配完所有消息队列。例如有消费者A、B,有5个消息队列1、2、3、4、5。第一轮A分配1,B分配2;
 * 第二轮A分配3,B分配4;第二轮A分配5。因此A分配到1、3、5,B分配到2、4。
 * @param consumerGroup current consumer group
 * @param currentCID current consumer id
 * @param mqAll message queue set in current topic
 * @param cidAll consumer set in current consumer group
 * @return
 */
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
    // 参数校验
    if (currentCID == null || currentCID.length() < 1) {
        throw new IllegalArgumentException("currentCID is empty");
    }
    if (mqAll == null || mqAll.isEmpty()) {
        throw new IllegalArgumentException("mqAll is null or mqAll empty");
    }
    if (cidAll == null || cidAll.isEmpty()) {
        throw new IllegalArgumentException("cidAll is null or cidAll empty");
    }

    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
            consumerGroup,
            currentCID,
            cidAll);
        return result;
    }
    // 索引
    int index = cidAll.indexOf(currentCID);
    // 获取每个分配轮次轮次中属于该消费者的对应的消息队列
    for (int i = index; i < mqAll.size(); i++) {
        if (i % cidAll.size() == index) {
            result.add(mqAll.get(i));
        }
    }
    return result;
}

由于两个消费者 ID 是一样的,所以求出来的 index 也是相同的,所以分配的队列也是一样的,我们可以用一个 main 方法模拟下。

public static void main(String[] args) {
    AllocateMessageQueueAveragelyByCircle averagelyByCircle = new AllocateMessageQueueAveragelyByCircle();
    List<MessageQueue> queues = new ArrayList<MessageQueue>();
    for(int i = 0; i < 8; i++){
        queues.add(new MessageQueue("topic-test", "broker-a", i));
    }
    System.out.println(averagelyByCircle.allocate("group", "0", queues, Arrays.asList("0", "2", "2")));
    System.out.println(averagelyByCircle.allocate("group", "2", queues, Arrays.asList("0", "2", "2")));
    System.out.println(averagelyByCircle.allocate("group", "2", queues, Arrays.asList("0", "2", "2")));
}

在这里插入图片描述

上面的 main 方法创建了 8 个队列,然后消费者组下面有 3 个消费者,id 是 [0,2,2],可以看到,消费者-0 的队列是 [0,3,6],两个消费者-2 分配的队列是 [1,4,7],这是因为两个消费者-2 求出来的下标都是 1,所以从 1 开始分配,因此 i % cidAll.size() == index 这个判断条件也都是一样的,所以最终分配的队列都是一样的。
在这里插入图片描述
对于消费者来说一个消息队列分配给了多个消费者,这样就有可能导致重复消费问题,而对于消息队列来说由于消息队列 2 和消息队列 5 没有分配给其他消费者,因此这个消息队列上面的消息不会被消费,导致消息堆积。


6. AllocateMessageQueueAveragely 策略

上面我们看了 AllocateMessageQueueAveragelyByCircle 策略,下面再来看下 AllocateMessageQueueAveragely,因为这个策略是默认的分配策略,也就是用户没有自己设定的情况下用的就是 AllocateMessageQueueAveragely。

/**
 * 负载均衡分配队列, 平均分配
 * @param consumerGroup     当前消费者组
 * @param currentCID        当前消费者的 clientID
 * @param mqAll             当前 topic 的所有队列
 * @param cidAll            当前消费者组的所有消费者的 clientID
 * @return
 */
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
    // 参数校验
    if (currentCID == null || currentCID.length() < 1) {
        throw new IllegalArgumentException("currentCID is empty");
    }
    if (mqAll == null || mqAll.isEmpty()) {
        throw new IllegalArgumentException("mqAll is null or mqAll empty");
    }
    if (cidAll == null || cidAll.isEmpty()) {
        throw new IllegalArgumentException("cidAll is null or cidAll empty");
    }

    // 分配结果
    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {
        // 如果这个消费者不是传入的消费者组下的, 有可能是刚启动没注册到 broker
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
            consumerGroup,
            currentCID,
            cidAll);
        // 就不分配队列处理了
        return result;
    }

    // 假设现在有 8 个队列, 要负载均衡给 3 个消费者 [0, 1, 2], 而当前消费者[1]的 index = 1
    // 那么在分配的时候三个消费者分配的队列标号就是 [0, 1, 2], [3, 4, 5], [6, 7]

    // 这里就是求出来当前消费者 ID 所在的位置 = 1
    int index = cidAll.indexOf(currentCID);
    // 队列数 % 消费者数 = 剩余队列数 = 2
    int mod = mqAll.size() % cidAll.size();
    // 这里求平均数, 求出来的结果就是 8 / 3 + 1 = 3
    //      1.如果队列数小于消费者数, 平均数就是 1
    //      2.如果队列数大于消费者数, 并且当前队列的下标在 (0, mod) 这个范围, 那么平均数就是 mqAll.size() / cidAll.size() + 1
    //      3.如果队列数大于消费者数, 并且当前队列的下标在 [mod, cidAll.size()) 这个范围, 那么平均数就是 mqAll.size() / cidAll.size()
    int averageSize =
        mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
            + 1 : mqAll.size() / cidAll.size());
    // 计算当前消费者从哪里开始分配队列, 这里求出来的就是 1 * 3 = 3
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    // 这里求出来当前消费者要分配多少个队列, 比如当前就是分配 3 个队列
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
        // 开始分配队列, 分配的起始位是是 startIndex, 分配的队列数量是 range
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
}

由于两个消费者 id 是一样的,所以 index 也是一样的,同样的 averageSize 也是一样的,startIndex 也是一样的,因此求出的队列也都是一样的。根据上一篇文章我们说过的消费者重平衡策略,这两个消费者分配到的队列是:[3,4,5]。
在这里插入图片描述


7. 示例

所以如果集合里面有多个消费者 id 相同,会导致这些消费者分配到的消息队列都一样,而且消息队列集合里面也有一些队列是没办法分配到消费者的,会导致消息堆积,现在我们可以模拟下真实情况,比如我们可以启动两个消费者消费同一个 topic,看看这两个消费者的分配队列输出情况,下面先来看下代码。

首先在 AllocateMessageQueueAveragely#allocate 的最后一行代码中加上打印日志。
在这里插入图片描述

然后我们启动消费者,消费者 ID 如果我们没有设置,默认就是 PID + 当前时间戳,不过可以通过 rocketmq.client.name 设置实例名称,如果不设置默认就是 DEFAULT。
在这里插入图片描述

不设置的情况下就会更改,如果设置了 instanceName,那么就用我们自己设置的。
在这里插入图片描述
所以我们启动两个消费者,一个是 Consumer-A,一个是 Consumer-B,下面是消费者的代码:

public class ConsumerA {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

public class ConsumerB {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroupConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody(), StandardCharsets.UTF_8));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

依次启动 NameServer、Broker、ConsumerA,ConsumerB,同时启动消费者 A 的时候设置参数 -Drocketmq.client.name=ConsumerA,另一个消费者 B 启动设置 -Drocketmq.client.name=ConsumerB,看消费者的输出:
在这里插入图片描述
在这里插入图片描述
可以看到,两个消费者平均分配了 [0,1,2,3] 四个队列,大家可能觉得奇怪,为什么输出 clientId 的时候前面还有一个 ip,那是因为我们设置的是 instanceName,真正获取 clientId 的时候还得拼接上 ip,不过我们是同一台机器启动的,所以 ip 也是相同的,只需要关注 instanceName 即可。

下面我们将两个消费者的 rocketmq.client.name=ConsumerA,也就是实例名称相同,下面是两个消费者的输出。
在这里插入图片描述
在这里插入图片描述

可以看到,这两个消费者 id 相同的情况下分配到的队列 id 都是一样的,而且可以发现 queueId = 2、3 的这两个队列没有分配给任何一个消费者,这也证明了我们上面的推测。


8. 小结

好了,这篇文章我们讲述了消费者 clientId 对消费者重平衡的影响,相同的 clientId 会导致消息队列分配不均衡,导致有一些队列没办法分配到消费者。





如有错误,欢迎指出!!!!


网站公告

今日签到

点亮在社区的每一天
去签到