RocketMQ 中 TopicManager 的属性与方法深度剖析

发布于:2025-03-25 ⋅ 阅读:(93) ⋅ 点赞:(0)

在消息队列领域,RocketMQ 凭借其高吞吐量、低延迟、高可靠性等特性占据着重要地位。而 TopicManager 作为 RocketMQ 中负责管理主题(Topic)的核心组件,对整个消息系统的正常运转起着关键作用。本文将深入探讨 TopicManager 的属性和方法,帮助开发者更好地理解和运用这一组件。

1. TopicManager 概述

TopicManager 是 RocketMQ 中用于管理主题相关信息的核心类,它维护着系统中所有主题的配置信息,包括主题的名称、权限、队列数量等。通过 TopicManager,可以实现主题的创建、删除、查询等操作,为消息的生产和消费提供基础支持。

2. 核心属性解析

//继承于ConfigManager,他会把topic元数据信息持久化到本地
public class TopicConfigManager extends ConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    //锁超时的时间
    private static final long LOCK_TIMEOUT_MILLIS = 3000;
    //调度topic队列的数量
    private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
    //topicConfigTable的一把锁
    private transient final Lock topicConfigTableLock = new ReentrantLock();
    //topicConfigTable,topic的元数据 这个是一个ConcurrentHashMap,用来存储topic的配置信息
    private final ConcurrentMap<String, TopicConfig> topicConfigTable =
        new ConcurrentHashMap<String, TopicConfig>(1024);
    //数据版本号
    private final DataVersion dataVersion = new DataVersion();
    //broker控制的组件
    private transient BrokerController brokerController;

    
  //省略大量的代码

}

3. 重要方法详解

3.1构造函数

在TopicManager的构造函数中会有很多系统自带的Topic进行创建。

   public TopicConfigManager() {
    }

    public TopicConfigManager(BrokerController brokerController) {
        this.brokerController = brokerController;
        //搞了一个一个的代码块 每个代码块里会进行初始化一些数据
        //这里会进行初始化一些内置的topic信息
        {
            //SELF_TEST_TOPIC 用于自我测试的一个topic 快速进行测试mq是否可以跑通
            String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC;
            TopicConfig topicConfig = new TopicConfig(topic);
            TopicValidator.addSystemTopic(topic);
            topicConfig.setReadQueueNums(1);
            topicConfig.setWriteQueueNums(1);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }

        {
            //判断是否开启自动创建topic的机制 如果为true 会进行创建一个“TBW102”的topic
            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                //默认的读写的队列数量为8
                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
        }
        //BenchmarkTest topic的创建 会进行设置读写队列的数量为1024 是用于进行压力测试
        {
            String topic = TopicValidator.RMQ_SYS_BENCHMARK_TOPIC;
            TopicConfig topicConfig = new TopicConfig(topic);
            TopicValidator.addSystemTopic(topic);
            topicConfig.setReadQueueNums(1024);
            topicConfig.setWriteQueueNums(1024);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }

        //会进行创建当前这个broker所有集群名字的一个topic
        //是用于写入和消费当前broker集群自身一些元数据的topic
        {
            String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
            TopicConfig topicConfig = new TopicConfig(topic);
            TopicValidator.addSystemTopic(topic);
            int perm = PermName.PERM_INHERIT;
            if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
                perm |= PermName.PERM_READ | PermName.PERM_WRITE;
            }
            topicConfig.setPerm(perm);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
        //会创建当前broker分组名字的一个topic 管理一些broker分组的元数据
        {
            String topic = this.brokerController.getBrokerConfig().getBrokerName();
            TopicConfig topicConfig = new TopicConfig(topic);
            TopicValidator.addSystemTopic(topic);
            int perm = PermName.PERM_INHERIT;
            if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
                perm |= PermName.PERM_READ | PermName.PERM_WRITE;
            }
            topicConfig.setReadQueueNums(1);
            topicConfig.setWriteQueueNums(1);
            topicConfig.setPerm(perm);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
        //OFFSET_MOVED_EVENT offset迁移事件的topic
        {
            String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT;
            TopicConfig topicConfig = new TopicConfig(topic);
            TopicValidator.addSystemTopic(topic);
            topicConfig.setReadQueueNums(1);
            topicConfig.setWriteQueueNums(1);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
        //SCHEDULE_TOPIC_XXXX 类似于定时调度的topic
        {
            String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            TopicConfig topicConfig = new TopicConfig(topic);
            TopicValidator.addSystemTopic(topic);
            topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
            topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
        {
            //如果启用了追踪的topic之后会创建一个 trace topic名字 默认是不进行启用的
            if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
                String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                topicConfig.setReadQueueNums(1);
                topicConfig.setWriteQueueNums(1);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
        }
        // broker集群reply消息的topic
        {
            String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
            TopicConfig topicConfig = new TopicConfig(topic);
            TopicValidator.addSystemTopic(topic);
            topicConfig.setReadQueueNums(1);
            topicConfig.setWriteQueueNums(1);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
    }

3.2 createTopicInSendMessageMethod

在发送消息时,如果主题不存在,则创建该主题。该方法会根据默认主题的配置信息来创建新主题。当生产者发送消息到一个不存在的主题时,RocketMQ 会调用该方法自动创建主题,确保消息能够正常发送。

  /**
     * 在发送消息的方法中进行创建一个topic
     *  如果说你开启了自动创建topic的功能,发送消息的时候针对一个不存在的topic发送消息的时候
     *  就会进行调用此方法进行创建topic。
     *  但是此时这个topic是不存在的,会尝试把默认的topic:TBW102这个topic拿出来
     *  如果说没有开启自动创建topic,此时默认的topic只有read write 没有inherit
     *  如果说开启了自动创建topic 默认是有inherit,此时要创建的这个新的topic可以继承TBW102元数据的配置
     * @param topic topic的名称 发送到哪个topic中
     * @param defaultTopic 默认的topic
     * @param remoteAddress 远程机器的地址
     * @param clientDefaultTopicQueueNums 客户端默认的队列数量
     * @param topicSysFlag 是否是系统的topic的标识
     * @return
     */
    public TopicConfig createTopicInSendMessageMethod(final String topic,
                                                      final String defaultTopic,
                                                      final String remoteAddress,
                                                      final int clientDefaultTopicQueueNums,
                                                      final int topicSysFlag) {
        TopicConfig topicConfig = null;
        boolean createNew = false;

        try {
            //获取一把锁
            if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    //判断当前topic是否存在 存在就直接返回topic的元数据
                    topicConfig = this.topicConfigTable.get(topic);
                    if (topicConfig != null) {
                        return topicConfig;
                    }
                    //尝试获取默认topic的元数据
                    TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
                    if (defaultTopicConfig != null) {
                        //默认的topic是否是TBW102
                        if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                            //当前topic是否启用了自动创建的机制 未启动的设置对应的权限为读写
                            if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                                defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
                            }
                        }
                        //如果开启了自动创建Topic 就会有继承的权限 就会走下面的代码
                        //默认的topic的元数据是否是继承的
                        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                            topicConfig = new TopicConfig(topic);
                            //创建topic的队列数量 客户端默认的queue数量和默认的topic的队列数量取最小值
                            int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());

                            if (queueNums < 0) {
                                queueNums = 0;
                            }
                            //把读写队列都设置成数量
                            topicConfig.setReadQueueNums(queueNums);
                            topicConfig.setWriteQueueNums(queueNums);
                            //获取默认topic的权限
                            int perm = defaultTopicConfig.getPerm();
                            //把权限的继承权限去掉
                            perm &= ~PermName.PERM_INHERIT;
                            topicConfig.setPerm(perm);
                            topicConfig.setTopicSysFlag(topicSysFlag);
                            //topicFilterType 这个也是通过默认的topic来进行获取
                            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                        } else {
                            log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
                                defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
                        }
                    } else {
                        log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
                            defaultTopic, remoteAddress);
                    }
                    //这块topic的元数据不为空,证明已经创建出来了一个topic
                    if (topicConfig != null) {
                        log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
                            defaultTopic, topicConfig, remoteAddress);
                        //把新创建的topic添加到topicConfigTable中
                        this.topicConfigTable.put(topic, topicConfig);
                        //数据版本号进行新增
                        this.dataVersion.nextVersion();

                        createNew = true;
                        //持久化
                        this.persist();
                    }
                } finally {
                    this.topicConfigTableLock.unlock();
                }
            }
        } catch (InterruptedException e) {
            log.error("createTopicInSendMessageMethod exception", e);
        }

        if (createNew) {
            //如果是新增的topic 给注册到brokerController里
            // 传入了三个参数 checkOrderConfig 是否检查orderConfig  是否oneWay(单向通信) forceRegister 是否强制注册 为true
            // registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister)
            this.brokerController.registerBrokerAll(false
                    , true, true);
        }

        return topicConfig;
    }

如果是新创建的Topic,会进行org.apache.rocketmq.broker.BrokerController#registerBrokerAll方法,判断是否需要进行注册到NameServer中。

 /**
     * 调用此方法的场景:
     * 场景一:topic元数据进行了变更 需要进行注册到nameServer服务器上
     * @param checkOrderConfig 是否需要检查顺序消息的配置
     * @param oneway  是否是单向调用
     * @param forceRegister 是否强制刷新
     */
    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        // 调用了topicConfigManager用本地的topic元数据构建了一个Mapper
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

        //权限的判断 判断当前broker的权限是否是一个写入或者读的权限
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            //broker的权限不是可读的也不是可写的
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }
        //这块是正儿八经的要进行注册的代码的位置
        //如果是强制刷新 或者需要注册
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), //broker集群的名字
            this.getBrokerAddr(), //broker的地址
            this.brokerConfig.getBrokerName(), //broker的名字
            this.brokerConfig.getBrokerId(), //broker的id
            this.brokerConfig.getRegisterBrokerTimeoutMills()) //注册broker的超时时间
         ) {
            //真正发起注册请求
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

如果需要进行注册的话,就会调用注册方法 doRegisterBrokerAll

 private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
        //这里就是向所有的NameServer进行注册
        //所以这块的返回值就是list类型的
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(), //broker集群的名字
            this.getBrokerAddr(), //broker的地址
            this.brokerConfig.getBrokerName(), //broker分组的名字
            this.brokerConfig.getBrokerId(), //broker的id
            this.getHAServerAddr(), //broker高可用的地址
            topicConfigWrapper, //topic 元数据的信息
            Lists.newArrayList(), //filterServerList
            oneway,  //是否是单向通信
            this.brokerConfig.getRegisterBrokerTimeoutMills(), //注册请求的超时时间
            this.brokerConfig.isCompressedRegister()); //是否压缩注册

        //如果说这里的返回值大于0 证明有注册成功的
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }

                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }

createTopicOfTranCheckMaxTime,createTopicInSendMessageBackMethod跟createTopicInSendMessageMethod方法类似,这里就不展开细说了。

3.3修改类的方法

updateTopicUnitFlag(更新topic 的topicSysFlag 修改topic的系统标识),updateTopicUnitSubFlag(更新topic的unitSubFlag),updateTopicConfig(更新topic的元数据),updateOrderTopicConfig(更新顺序消息的配置信息)

  //更新topic 的topicSysFlag 修改topic的系统标识
    public void updateTopicUnitFlag(final String topic, final boolean unit) {

        TopicConfig topicConfig = this.topicConfigTable.get(topic);
        if (topicConfig != null) {
            int oldTopicSysFlag = topicConfig.getTopicSysFlag();
            if (unit) {
                topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
            } else {
                topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
            }

            log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag={}", oldTopicSysFlag,
                topicConfig.getTopicSysFlag());

            this.topicConfigTable.put(topic, topicConfig);

            this.dataVersion.nextVersion();

            this.persist();
            this.brokerController.registerBrokerAll(false, true, true);
        }
    }

    //更新topic的unitSubFlag
    public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
        TopicConfig topicConfig = this.topicConfigTable.get(topic);
        if (topicConfig != null) {
            int oldTopicSysFlag = topicConfig.getTopicSysFlag();
            if (hasUnitSub) {
                topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
            } else {
                topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitSubFlag(oldTopicSysFlag));
            }

            log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag={}", oldTopicSysFlag,
                topicConfig.getTopicSysFlag());

            this.topicConfigTable.put(topic, topicConfig);

            this.dataVersion.nextVersion();

            this.persist();
            this.brokerController.registerBrokerAll(false, true, true);
        }
    }

    //更新topic的元数据
    public void updateTopicConfig(final TopicConfig topicConfig) {
        TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        if (old != null) {
            log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
        } else {
            log.info("create new topic [{}]", topicConfig);
        }

        this.dataVersion.nextVersion();

        this.persist();
    }

    //更新顺序消息的配置信息
    // 将orderKVTableFromNs中的topic中在TopicConfigTable中存在但不是顺序消息的话给设置成顺序消息
    // TopicConfigTable中不属于orderKVTableFromNs中的topic中在TopicConfigTable中存在且是顺序消息的话给设置成非顺序消息
    public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {

        if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
            boolean isChange = false;
            Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
            for (String topic : orderTopics) {
                TopicConfig topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null && !topicConfig.isOrder()) {
                    topicConfig.setOrder(true);
                    isChange = true;
                    log.info("update order topic config, topic={}, order={}", topic, true);
                }
            }

            for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
                String topic = entry.getKey();
                if (!orderTopics.contains(topic)) {
                    TopicConfig topicConfig = entry.getValue();
                    if (topicConfig.isOrder()) {
                        topicConfig.setOrder(false);
                        isChange = true;
                        log.info("update order topic config, topic={}, order={}", topic, false);
                    }
                }
            }

            if (isChange) {
                this.dataVersion.nextVersion();
                this.persist();
            }
        }
    }

3.4 删除topic的配置信息

//删除topic的配置信息
    public void deleteTopicConfig(final String topic) {
        TopicConfig old = this.topicConfigTable.remove(topic);
        if (old != null) {
            log.info("delete topic config OK, topic: {}", old);
            this.dataVersion.nextVersion();
            this.persist();
        } else {
            log.warn("delete topic config failed, topic: {} not exists", topic);
        }
    }

3.5 selectTopicConfig

   // 根据topic的名字查询元数据信息
    public TopicConfig selectTopicConfig(final String topic) {
        return this.topicConfigTable.get(topic);
    }    

4. 总结

TopicManager 在 RocketMQ 中扮演着至关重要的角色,通过其丰富的属性和方法,实现了对主题的高效管理。开发者在使用 RocketMQ 时,深入理解 TopicManager 的工作原理和使用方法,能够更好地利用 RocketMQ 的功能,构建出稳定、高效的消息系统。无论是主题的创建、查询还是删除,TopicManager 都提供了相应的接口,为开发者提供了便利。同时,通过合理使用其并发控制机制,能够确保在高并发场景下主题配置信息的一致性和安全性。希望本文能够帮助读者对 RocketMQ 中的 TopicManager 有更深入的认识和理解。


网站公告

今日签到

点亮在社区的每一天
去签到