【RocketMQ 生产者和消费者】- 生产者发送故障延时策略

发布于:2025-05-28 ⋅ 阅读:(19) ⋅ 点赞:(0)


本文章基于 RocketMQ 4.9.3

1. 前言

在看生产者源码之前,先来看下 RocketMQ 的发送故障延时策略,具体的容错策略均在MQFaultStrategy 这个类中定义。

生产者发送消息的时候需要先查询出 topic 的路由信息,然后通过 selectOneMessageQueue 方法选择 messageQueueList 中的一个消息队列来发送消息。之前我们也说过,topic 下面的队列是可以分配到不同 broker 的,在开启了 sendLatencyFaultEnable 选项的前提下,RocketMQ 会在随机递增取模的基础上排除掉那些 not available 的 broker。

对于 broker 而言,消息发送可能会有一定的延时,RocketMQ 就会通过这个延时来判断 broker 到底有多少时间的不可用,比如如果上次请求的 latency 超过 550ms,就退避 3s;超过1000L,就退避 60s,退避意思是这个 broker 多少秒不可用。

那如果没有开启 sendLatencyFaultEnable,就直接从 messageQueueList 中通过 index++ % messageQueueList.size() 的方式选一个队列就行。


2. FaultItem

FaultItem 存储了各个 broker 的请求耗时以及什么时候开始才可用,这个类属性也只有三个,直接看下面代码。

class FaultItem implements Comparable<FaultItem> {
        // broker 名称
        private final String name;
        // 生产者发送到这个 broker 的耗时
        private volatile long currentLatency;
        // 这个 broker 什么时候开始才可用
        private volatile long startTimestamp;

        public FaultItem(final String name) {
            this.name = name;
        }
		...
}

3. LatencyFaultToleranceImpl 容错集合处理类

LatencyFaultToleranceImpl 容错集合处理类,在里面会通过 faultItemTable 来存储这个 broker 的延时信息。

private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);

private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();

3.1 updateFaultItem 更新容错集合

/**
 * 更新容错集合
 * @param name                   broker 名称
 * @param currentLatency         耗时
 * @param notAvailableDuration   不可访问的时间间隔
 */
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    // 先获取下旧的容错项
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        // 如果不存在就新增一个新的进去
        final FaultItem faultItem = new FaultItem(name);
        // 设置耗时时间
        faultItem.setCurrentLatency(currentLatency);
        // 设置下一次能访问的时间
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        // 添加到集合中,这里就是为了防止并发
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            // 重新设置耗时时间和下一次能访问的时间
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        // 如果已经存在就直接更新耗时和下一次能访问的时间就行了
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

更新容错集合就是更新 faultItemTable 集合,传入的三个参数:

  • name:brokerName
  • currentLatency:生产者的延时时间
  • notAvailableDuration:broker 不可用的时间

同时在第 2 小节我们也介绍了 FaultItem 这个类,里面的 startTimestamp 需要根据传入的 notAvailableDuration 来计算,因此可以看到 startTimestamp 可以通过 System.currentTimeMillis() + notAvailableDuration 来算出,其他的逻辑就直接看注释即可。


3.2 isAvailable 判断 broker 是否可用

/**
 * 当前的 broker 是否可用,broker 添加到 faultItemTable 的时候会设置属性 startTimestamp 为下一次可以访问的时间
 * @param name
 * @return
 */
@Override
public boolean isAvailable(final String name) {
    // 获取当前这个 broker 的容错信息
    final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        // 判断这个 broker 是否可用,也就是判断当前时间是否超过这个 broker 下一次可访问的时间
        return faultItem.isAvailable();
    }
    return true;
}

/**
 * 当前时间是否超过这个 broker 下一次可访问的时间
 * @return
 */
public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

传入参数 brokerName,然后从 faultItemTable 中获取这个 broker 的故障信息,判断是否可用也比较简单,直接判断当前时间是不是比 startTimestamp 要大就行。


3.3 pickOneAtLeast 至少选出一个故障 broker

当生产者发送消息的时候需要发送的 topic 下面的队列都是不可用的,这种情况下需要通过 pickOneAtLeast 方法选出一个 broker,也就是兜底方法,不让生产者发送不了消息了。

/**
 * 至少选出一个容错项
 * @return
 */
@Override
public String pickOneAtLeast() {
    final Enumeration<FaultItem> elements = this.faultItemTable.elements();
    List<FaultItem> tmpList = new LinkedList<FaultItem>();
    // 临时的 FaultItem 集合
    while (elements.hasMoreElements()) {
        final FaultItem faultItem = elements.nextElement();
        tmpList.add(faultItem);
    }

    if (!tmpList.isEmpty()) {
        // 随机打乱
        Collections.shuffle(tmpList);

        // 接着排序
        Collections.sort(tmpList);

        // 一半的数量
        final int half = tmpList.size() / 2;
        if (half <= 0) {
            // 意思是只有一个项
            return tmpList.get(0).getName();
        } else {
            // 这里是从前 1/2 中获取
            final int i = this.whichItemWorst.incrementAndGet() % half;
            return tmpList.get(i).getName();
        }
    }

    return null;
}

这里面获取的逻辑就是对 tmpList 随机打乱之后再排序,然后判断如果只有一个 broker ,就直接返回这个 brokerName,否则就从前 1/2 递增获取,不过这里没搞懂为什么要先打乱后排序,直接排序不可以吗,如果说是为了排序的时候少几个步骤也不需要刻意打乱吧?大家如果有知道的可以分析下。
在这里插入图片描述
而这个 FaultItem 的排序是按照 currentLatency 和 startTimestamp 排序的,也就是说如果这两个 broker 上一次请求的延时不同,小的就排在前面,因为延时少 broker 不可用的时间也少,如果延时一样就看 startTimestamp,就是 broker 可用的时间,最先可用的就返回。


4. MQFaultStrategy 故障策略类

这个就是核心的故障策略类了,生产者就是通过这个类下面的 selectOneMessageQueue 方法来选择消息队列的,所以就来看下这个类里面的一些方法和属性。


4.1 属性

private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

/**
 * 是否打开容错开关
 */
private boolean sendLatencyFaultEnable = false;

/**
 * 延迟级别
 */
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
/**
 * 不可用的时间
 */
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  • latencyFaultTolerance:第 3 小节说的容错集合管理类。
  • sendLatencyFaultEnable:故障容错开关,就是第 1 小节说的,如果打开了才会根据故障容错集合排除掉那些不可用的 broker。
  • latencyMax:延迟级别,也是延时时间。
  • notAvailableDuration:不可用时间,和上面的延时时间一一对应。

关于 latencyMax 和 notAvailableDuration 的关系,举个例子,比如延时在 100ms - 550ms 之间的,故障时间就是 0L,如果延时在 550ms - 1s 之间的,故障时间就是 30s,看下面图就知道了。
在这里插入图片描述


4.2 updateFaultItem 更新延迟故障容错信息

/**
 * 更新延迟故障容错信息
 * @param brokerName broker 名称
 * @param currentLatency 发送耗时
 * @param isolation 是否是独立的
 */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    // 如果打开了容错开关
    if (this.sendLatencyFaultEnable) {
        // 计算出这个 broker 不可用的时间,如果是独立的,那么每一次不可用时间都是 30s
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        // 将这个 broker 不可用的信息存储到 latencyFaultTolerance 中
        // 下一次发送的时候根据 latencyFaultTolerance#faultItemTable 集合来判断这个 broker 是否可用了
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

这里就是更新故障容错信息的方法可以看到也是先判断了是否设置了 sendLatencyFaultEnable 为 true,如果设置了才允许更新,没有设置 sendLatencyFaultEnable 就直接返回,在方法 computeNotAvailableDuration 中计算出这个 broker 不可用的时间,如果是独立的,那么每一次不可用时间都是 30s,这个 isolation 在生产者发送消息的时候如果是正常发送成功之后更新的就传的是 false,如果是有异常出现,在 catch 方法中调用的就是 true。
在这里插入图片描述
在这里插入图片描述
计算出不可用时间之后,通过 3.1 小节的 updateFaultItem 更新容错集合信息。


4.3 computeNotAvailableDuration 计算不可用时间

/**
 * 获取这个 broker 的不可用的时间,从后往前遍历查表
 * @param currentLatency 耗时
 * @return
 */
private long computeNotAvailableDuration(final long currentLatency) {
    // 从后往前遍历去找
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            // 比如当前 currentLatency = 4000, 那么下标就是 5
            // 这里返回的不可用时间就是 180s,也就是 3 分钟
            return this.notAvailableDuration[i];
    }

    return 0;
}

在这里插入图片描述
这个方法就是计算不可用时间的,从后往前遍历 latencyMax 数组,当发现 latencyMax[i] >= currentLatency 的时候就返回 notAvailableDuration[i],上面图就是根据这个方法画出来的。


4.4 selectOneMessageQueue 选择一个消息队列

/**
 * 根据 TopicPublishInfo#sendWhichQueue 选择一个消息队列
 * @param tpInfo
 * @param lastBrokerName
 * @return
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // 判断容错开关有没有打开
    if (this.sendLatencyFaultEnable) {
        try {
            // 1. 根据负载均衡策略选择一个 MessageQueue,index 是这个 MQ 的下标
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            // 从 index 开始遍历这个 topic 下面的队列,选择第一个可用的 MQ
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    // 这里是溢出判断
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 判断这个 MessageQueue 所在的 broker 是否是可用的
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    // 如果可用就直接返回这个 MessageQueue
                    return mq;
            }

            // 2. 如果上面没选出 broker,也就是说当前所有 broker 都是不可用的状态,下面会随机选择一个不可用的 broker,因为毕竟要
            // 选择一个 broker 来发送的
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            // 获取这个 broker 下面的写队列个数,生产者是往队列里面写入的所以获取写队列数
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            // 如果写队列个数大于 0,就从 sendWhichQueue 开始负载均衡选择一个 MessageQueue 返回,不考虑 broker 可用了
            if (writeQueueNums > 0) {
                // 根据负载均衡选择一个 MessageQueue,到这里就说明所有 broker 都不可用,也就是目前所有的队列都不可用,所以这里
                // 可以随机选择一个队列,然后直接修改队列的 brokerName 和 queueId,这里修改的 brokerName 也是只对当前生产者有影响,
                // 对其他生产者没有影响,所以直接修改就行了
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                // 这里就是没找到这个 broker 的写队列个数或者说写队列个数为 0,说明这个 broker 可能不可用了或者这个 topic 没有存到
                // 这个 broker 上,这种情况下就从容错集合里面删掉这个 broker 的容错信息
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        // 3.上面两步都没有选出来,这里就默认根据负载均衡选择一个 MQ
        return tpInfo.selectOneMessageQueue();
    }

    // 没用开启容错开关,那么上一次给什么 broker 发,这一次避开这个 broker, 选其他 broker 发
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

这个就是生产者选择消息队列的方法,可以看到如果 sendLatencyFaultEnable 开关打开了,根据负载均衡策略选择一个 MessageQueue,index 是这个 MQ 的下标,sendWhichQueue 是 TopicPublishInfo 的属性,相当于轮循递增选择。

下面 for 循环从 0 开始遍历,注意这里从 0 开始遍历不是说从下标 0 开始选,而是从 index 开始选,只是确保能遍历到所有队列,选出一个可用的 MessageQueue。

如果上面没选出 broker,也就是说当前所有 broker 都是不可用的状态,下面会随机选择一个不可用的 broker,因为毕竟要选择一个 broker 来发送的,所以就通过 3.3 小节的 pickOneAtLeast 方法挑选一个 broker,然后获取这个 broker 的写队列数量,如果写队列个数大于 0,就从 sendWhichQueue 开始负载均衡选择一个 MessageQueue。

/**
 * 获取这个 broker 下面的队列信息的写队列个数
 * @param brokerName
 * @return
 */
public int getQueueIdByBroker(final String brokerName) {
    for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
        final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
        if (queueData.getBrokerName().equals(brokerName)) {
            return queueData.getWriteQueueNums();
        }
    }

    return -1;
}

如果在 topic 路由信息里面没找到这个 broker 的写队列数或者说写队列个数为 0,说明这个 broker 可能不可用了或者这个 topic 没有存到这个 broker 上,这种情况下就从容错集合里面删掉这个 broker 的容错信息。

如果上面都没有选出来,就从 topic 路由信息里面随机选一个队列,不考虑容错机制了,也就是不考虑 latencyFaultTolerance.pickOneAtLeast()

那如果没有开启容错机关,那么上一次给什么 broker 发,这一次避开这个 broker, 选其他 broker 发。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        // 根据 sendWhichQueue 递增选择下一个队列发送
        return selectOneMessageQueue();
    } else {
        // 如果传入了上一次的 broker
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            // 选择一个和上次 broker 不同的队列来发送
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        // 如果所有队列都是存到 lastBrokerName 的, 就还是根据 sendWhichQueue 递增选择下一个队列发送
        return selectOneMessageQueue();
    }
}

/**
 * 从 sendWhichQueue 开始顺序选择 MessageQueue
 * @return
 */
public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.incrementAndGet();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

但是上面这个方法找到 writeQueueNums 之后选择了一个消息队列,直接就把 notBestBroker 设置到这个队列里面,大家可能有疑问这里直接改这个队列的 broker 行不行,我的理解是没问题的,首先生产者的队列信息是存储到 DefaultMQProducerImpl#topicPublishInfoTable 集合中的,而这个集合的 topic 路由信息也来源于上一篇文章的定时任务,也就是从 NameServer 拉取的 topic 路由信息。

这里修改了队列的 broker 之后,由于 topicPublishInfoTable 是生产者独有的,所以消费者拉取消息的时候队列消息是不变的,比如原来是:Queue1 -> Broker1,生产者修改成 Broker2 之后对消费者没影响,因为消费者处理的是读队列,同时其他生产者也有自己的 topicPublishInfoTable,所以也没有影响,有影响的是这个生产者修改完之后后续就会向另外的 broker 发送消息,不过如果大家有不同看法也可以提出交流。


5. 小结

这篇文章我们探讨了生产者发送故障延时策略,同时这也是生产者发送消息的前置文章,我们介绍了 RocketMQ 针对消息发送延时的处理,对于发送延时每一个等级都会给 broker 设置一个不可用的时间,避免 broker 延时比较大的情况下还不断向这个 broker 发送消息,而如果要开启故障延时处理,就需要把 sendLatencyFaultEnable 打开,可以通过 set 方法来设置。

public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
    this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable);
}





如有错误,欢迎指出!!!


网站公告

今日签到

点亮在社区的每一天
去签到