在消息队列领域,RocketMQ 凭借其高吞吐量、低延迟、高可靠性等特性占据着重要地位。而 TopicManager
作为 RocketMQ 中负责管理主题(Topic)的核心组件,对整个消息系统的正常运转起着关键作用。本文将深入探讨 TopicManager
的属性和方法,帮助开发者更好地理解和运用这一组件。
1. TopicManager 概述
TopicManager
是 RocketMQ 中用于管理主题相关信息的核心类,它维护着系统中所有主题的配置信息,包括主题的名称、权限、队列数量等。通过 TopicManager
,可以实现主题的创建、删除、查询等操作,为消息的生产和消费提供基础支持。
2. 核心属性解析
//继承于ConfigManager,他会把topic元数据信息持久化到本地
public class TopicConfigManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
//锁超时的时间
private static final long LOCK_TIMEOUT_MILLIS = 3000;
//调度topic队列的数量
private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
//topicConfigTable的一把锁
private transient final Lock topicConfigTableLock = new ReentrantLock();
//topicConfigTable,topic的元数据 这个是一个ConcurrentHashMap,用来存储topic的配置信息
private final ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(1024);
//数据版本号
private final DataVersion dataVersion = new DataVersion();
//broker控制的组件
private transient BrokerController brokerController;
//省略大量的代码
}
3. 重要方法详解
3.1构造函数
在TopicManager的构造函数中会有很多系统自带的Topic进行创建。
public TopicConfigManager() {
}
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
//搞了一个一个的代码块 每个代码块里会进行初始化一些数据
//这里会进行初始化一些内置的topic信息
{
//SELF_TEST_TOPIC 用于自我测试的一个topic 快速进行测试mq是否可以跑通
String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
//判断是否开启自动创建topic的机制 如果为true 会进行创建一个“TBW102”的topic
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
//默认的读写的队列数量为8
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
//BenchmarkTest topic的创建 会进行设置读写队列的数量为1024 是用于进行压力测试
{
String topic = TopicValidator.RMQ_SYS_BENCHMARK_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1024);
topicConfig.setWriteQueueNums(1024);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
//会进行创建当前这个broker所有集群名字的一个topic
//是用于写入和消费当前broker集群自身一些元数据的topic
{
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
int perm = PermName.PERM_INHERIT;
if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
}
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
//会创建当前broker分组名字的一个topic 管理一些broker分组的元数据
{
String topic = this.brokerController.getBrokerConfig().getBrokerName();
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
int perm = PermName.PERM_INHERIT;
if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
}
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
//OFFSET_MOVED_EVENT offset迁移事件的topic
{
String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT;
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
//SCHEDULE_TOPIC_XXXX 类似于定时调度的topic
{
String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
//如果启用了追踪的topic之后会创建一个 trace topic名字 默认是不进行启用的
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
// broker集群reply消息的topic
{
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
TopicConfig topicConfig = new TopicConfig(topic);
TopicValidator.addSystemTopic(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
3.2 createTopicInSendMessageMethod
在发送消息时,如果主题不存在,则创建该主题。该方法会根据默认主题的配置信息来创建新主题。当生产者发送消息到一个不存在的主题时,RocketMQ 会调用该方法自动创建主题,确保消息能够正常发送。
/**
* 在发送消息的方法中进行创建一个topic
* 如果说你开启了自动创建topic的功能,发送消息的时候针对一个不存在的topic发送消息的时候
* 就会进行调用此方法进行创建topic。
* 但是此时这个topic是不存在的,会尝试把默认的topic:TBW102这个topic拿出来
* 如果说没有开启自动创建topic,此时默认的topic只有read write 没有inherit
* 如果说开启了自动创建topic 默认是有inherit,此时要创建的这个新的topic可以继承TBW102元数据的配置
* @param topic topic的名称 发送到哪个topic中
* @param defaultTopic 默认的topic
* @param remoteAddress 远程机器的地址
* @param clientDefaultTopicQueueNums 客户端默认的队列数量
* @param topicSysFlag 是否是系统的topic的标识
* @return
*/
public TopicConfig createTopicInSendMessageMethod(final String topic,
final String defaultTopic,
final String remoteAddress,
final int clientDefaultTopicQueueNums,
final int topicSysFlag) {
TopicConfig topicConfig = null;
boolean createNew = false;
try {
//获取一把锁
if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
//判断当前topic是否存在 存在就直接返回topic的元数据
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) {
return topicConfig;
}
//尝试获取默认topic的元数据
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
//默认的topic是否是TBW102
if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
//当前topic是否启用了自动创建的机制 未启动的设置对应的权限为读写
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
}
//如果开启了自动创建Topic 就会有继承的权限 就会走下面的代码
//默认的topic的元数据是否是继承的
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
topicConfig = new TopicConfig(topic);
//创建topic的队列数量 客户端默认的queue数量和默认的topic的队列数量取最小值
int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());
if (queueNums < 0) {
queueNums = 0;
}
//把读写队列都设置成数量
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
//获取默认topic的权限
int perm = defaultTopicConfig.getPerm();
//把权限的继承权限去掉
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
//topicFilterType 这个也是通过默认的topic来进行获取
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
}
} else {
log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
defaultTopic, remoteAddress);
}
//这块topic的元数据不为空,证明已经创建出来了一个topic
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
defaultTopic, topicConfig, remoteAddress);
//把新创建的topic添加到topicConfigTable中
this.topicConfigTable.put(topic, topicConfig);
//数据版本号进行新增
this.dataVersion.nextVersion();
createNew = true;
//持久化
this.persist();
}
} finally {
this.topicConfigTableLock.unlock();
}
}
} catch (InterruptedException e) {
log.error("createTopicInSendMessageMethod exception", e);
}
if (createNew) {
//如果是新增的topic 给注册到brokerController里
// 传入了三个参数 checkOrderConfig 是否检查orderConfig 是否oneWay(单向通信) forceRegister 是否强制注册 为true
// registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister)
this.brokerController.registerBrokerAll(false
, true, true);
}
return topicConfig;
}
如果是新创建的Topic,会进行org.apache.rocketmq.broker.BrokerController#registerBrokerAll方法,判断是否需要进行注册到NameServer中。
/**
* 调用此方法的场景:
* 场景一:topic元数据进行了变更 需要进行注册到nameServer服务器上
* @param checkOrderConfig 是否需要检查顺序消息的配置
* @param oneway 是否是单向调用
* @param forceRegister 是否强制刷新
*/
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
// 调用了topicConfigManager用本地的topic元数据构建了一个Mapper
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
//权限的判断 判断当前broker的权限是否是一个写入或者读的权限
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
//broker的权限不是可读的也不是可写的
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
//这块是正儿八经的要进行注册的代码的位置
//如果是强制刷新 或者需要注册
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), //broker集群的名字
this.getBrokerAddr(), //broker的地址
this.brokerConfig.getBrokerName(), //broker的名字
this.brokerConfig.getBrokerId(), //broker的id
this.brokerConfig.getRegisterBrokerTimeoutMills()) //注册broker的超时时间
) {
//真正发起注册请求
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
如果需要进行注册的话,就会调用注册方法 doRegisterBrokerAll
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
//这里就是向所有的NameServer进行注册
//所以这块的返回值就是list类型的
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(), //broker集群的名字
this.getBrokerAddr(), //broker的地址
this.brokerConfig.getBrokerName(), //broker分组的名字
this.brokerConfig.getBrokerId(), //broker的id
this.getHAServerAddr(), //broker高可用的地址
topicConfigWrapper, //topic 元数据的信息
Lists.newArrayList(), //filterServerList
oneway, //是否是单向通信
this.brokerConfig.getRegisterBrokerTimeoutMills(), //注册请求的超时时间
this.brokerConfig.isCompressedRegister()); //是否压缩注册
//如果说这里的返回值大于0 证明有注册成功的
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}
createTopicOfTranCheckMaxTime,createTopicInSendMessageBackMethod跟createTopicInSendMessageMethod方法类似,这里就不展开细说了。
3.3修改类的方法
updateTopicUnitFlag(更新topic 的topicSysFlag 修改topic的系统标识),updateTopicUnitSubFlag(更新topic的unitSubFlag),updateTopicConfig(更新topic的元数据),updateOrderTopicConfig(更新顺序消息的配置信息)
//更新topic 的topicSysFlag 修改topic的系统标识
public void updateTopicUnitFlag(final String topic, final boolean unit) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) {
int oldTopicSysFlag = topicConfig.getTopicSysFlag();
if (unit) {
topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
} else {
topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
}
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag={}", oldTopicSysFlag,
topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true, true);
}
}
//更新topic的unitSubFlag
public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null) {
int oldTopicSysFlag = topicConfig.getTopicSysFlag();
if (hasUnitSub) {
topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
} else {
topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitSubFlag(oldTopicSysFlag));
}
log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag={}", oldTopicSysFlag,
topicConfig.getTopicSysFlag());
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
this.persist();
this.brokerController.registerBrokerAll(false, true, true);
}
}
//更新topic的元数据
public void updateTopicConfig(final TopicConfig topicConfig) {
TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
if (old != null) {
log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
} else {
log.info("create new topic [{}]", topicConfig);
}
this.dataVersion.nextVersion();
this.persist();
}
//更新顺序消息的配置信息
// 将orderKVTableFromNs中的topic中在TopicConfigTable中存在但不是顺序消息的话给设置成顺序消息
// TopicConfigTable中不属于orderKVTableFromNs中的topic中在TopicConfigTable中存在且是顺序消息的话给设置成非顺序消息
public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
boolean isChange = false;
Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
for (String topic : orderTopics) {
TopicConfig topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null && !topicConfig.isOrder()) {
topicConfig.setOrder(true);
isChange = true;
log.info("update order topic config, topic={}, order={}", topic, true);
}
}
for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
String topic = entry.getKey();
if (!orderTopics.contains(topic)) {
TopicConfig topicConfig = entry.getValue();
if (topicConfig.isOrder()) {
topicConfig.setOrder(false);
isChange = true;
log.info("update order topic config, topic={}, order={}", topic, false);
}
}
}
if (isChange) {
this.dataVersion.nextVersion();
this.persist();
}
}
}
3.4 删除topic的配置信息
//删除topic的配置信息
public void deleteTopicConfig(final String topic) {
TopicConfig old = this.topicConfigTable.remove(topic);
if (old != null) {
log.info("delete topic config OK, topic: {}", old);
this.dataVersion.nextVersion();
this.persist();
} else {
log.warn("delete topic config failed, topic: {} not exists", topic);
}
}
3.5 selectTopicConfig
// 根据topic的名字查询元数据信息
public TopicConfig selectTopicConfig(final String topic) {
return this.topicConfigTable.get(topic);
}
4. 总结
TopicManager
在 RocketMQ 中扮演着至关重要的角色,通过其丰富的属性和方法,实现了对主题的高效管理。开发者在使用 RocketMQ 时,深入理解 TopicManager
的工作原理和使用方法,能够更好地利用 RocketMQ 的功能,构建出稳定、高效的消息系统。无论是主题的创建、查询还是删除,TopicManager
都提供了相应的接口,为开发者提供了便利。同时,通过合理使用其并发控制机制,能够确保在高并发场景下主题配置信息的一致性和安全性。希望本文能够帮助读者对 RocketMQ 中的 TopicManager
有更深入的认识和理解。