一、正文
Namespace下的Topic是分Bundle进行管理的,每个Namespace都是一个哈希环,而Bundle负责管理环上某个范围上的Topic。通过这种方式可以更好的进行Topic的管理。当某个Bundle上负责的Topic越来越多时,会导致负责该Bundle的Broker节点压力变大。因此Pulsar还提供了Bundle分裂的机制,这个机制支持自动触发以及手动触发,今天这篇文章就从源码的角度分析手动触发Bundle分裂时服务端会发生什么
主动触发Bundle分裂操作方式
bin/pulsar-admin namespaces bundles public/default
//输出如下
{
"boundaries" : [ "0x00000000", "0x08000000", "0x10000000", "0x20000000", "0x30000000", "0x40000000", "0x50000000", "0x60000000", "0x70000000", "0x80000000", "0x90000000", "0xa0000000", "0xb0000000", "0xc0000000", "0xd0000000", "0xe0000000", "0xf0000000", "0xffffffff" ],
"numBundles" : 17
}
//指定某个bundle进行分裂
bin/pulsar-admin namespaces split-bundle --bundle 0x00000000_0x10000000 public/default
二、源码解析
Pulsar管理流相关的操作都是通过HTTP的方式,因为需要支持多种客户端类型(http、client、cli)。服务端处理这些操作都在admin模块下,如下图,本次要聊的bundle分裂就在Namespaces方法中
首先从Namespaces的splitNamespaceBundle进行跟踪
public void splitNamespaceBundle(
....
@QueryParam("splitAlgorithmName") String splitAlgorithmName, //指定bundle分裂算法
@ApiParam("splitBoundaries") List<Long> splitBoundaries) {
//校验参数格式以及是否存在对应的namespace
validateNamespaceName(tenant, namespace);
//异步进行分裂操作
internalSplitNamespaceBundleAsync(bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries)
.thenAccept(__ -> {
....
})
.exceptionally(ex -> {
....
});
}
可以看到最外层只是做些参数校验,那么就继续跟踪internalSplitNamespaceBundleAsync方法,如下
protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundleName,
boolean authoritative,
boolean unload,
String splitAlgorithmName,
List<Long> splitBoundaries) {
return validateSuperUserAccessAsync() //权限校验
.thenAccept(__ -> {
checkNotNull(bundleName, "BundleRange should not be null");
log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
//获取当前集群所支持的bundle分裂算法,这里是硬编码的固定四种
List<String> supportedNamespaceBundleSplitAlgorithms =
pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
//此处省去参数检查逻辑
....
}
})
.thenCompose(__ -> {
//此处省去参数检查逻辑
....
})
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) //权限校验
.thenCompose(__ -> getBundleRangeAsync(bundleName)) //获取要分裂的bundle的范围
.thenCompose(bundleRange -> {
return getNamespacePoliciesAsync(namespaceName)
.thenCompose(policies ->
//1. 校验Bundle的范围是否有效
//2.判断当前Broker节点是否负责这个bundle的管理,如果不是则重定向
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,authoritative, false))
//核心方法就是这里的 NamespaceService.splitAndOwnBundle
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
pulsar().getNamespaceService()
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
splitBoundaries));
});
}
接下来就是进入NamespaceService类的splitAndOwnBundle方法,NamespaceService也是Pulsar比较重要的一个类,这里先继续跟踪分割bundle的逻辑
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
//如果实现了自定义的分割逻辑则使用自定义的
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.splitNamespaceBundleAsync(bundle, splitAlgorithm, boundaries);
}
final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
//核心流程流程
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, boundaries);
return unloadFuture;
}
void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
boolean unload,
AtomicInteger counter,
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
//获取bundle分割配置
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
//根据配置来选择对应的分割算法进行分割
splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
if (ex == null) {
....
try {
//进行bundle分割操作
bundleFactory.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries)
.thenAccept(splitBundles -> {
....
Objects.requireNonNull(splitBundles.getLeft());
Objects.requireNonNull(splitBundles.getRight());
checkArgument(splitBundles.getRight().size() == splitBoundaries.size() + 1,
"bundle has to be split in " + (splitBoundaries.size() + 1) + " bundles");
NamespaceName nsname = bundle.getNamespaceObject();
....
try {
// 检查确保每个Bundle都有对应的Broker负责
for (NamespaceBundle sBundle : splitBundles.getRight()) {
Objects.requireNonNull(ownershipCache.tryAcquiringOwnership(sBundle));
}
//更新Bundle信息,毕竟Bundle已经分裂好了,相关的一些元数据要同步更新
updateNamespaceBundles(nsname, splitBundles.getLeft()).thenCompose(__ ->
updateNamespaceBundlesForPolicies(nsname, splitBundles.getLeft()))
.thenRun(() -> {
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
updateFuture.complete(splitBundles.getRight());
}).exceptionally(ex1 -> {
....
});
} catch (Exception e) {
....
}
});
} catch (Exception e) {
....
}
} else {
updateFuture.completeExceptionally(ex);
}
updateFuture.whenCompleteAsync((r, t)-> {
if (t != null) {
// 失败则重试几次
if ((t.getCause() instanceof MetadataStoreException.BadVersionException)
&& (counter.decrementAndGet() >= 0)) {
pulsar.getExecutor().schedule(() -> pulsar.getOrderedExecutor()
.execute(() -> splitAndOwnBundleOnceAndRetry(
bundle, unload, counter, completionFuture, splitAlgorithm, boundaries)),
100, MILLISECONDS);
} else if (t instanceof IllegalArgumentException) {
completionFuture.completeExceptionally(t);
} else {
// Retry enough, or meet other exception
String msg2 = format(" %s not success update nsBundles, counter %d, reason %s",
bundle.toString(), counter.get(), t.getMessage());
LOG.warn(msg2);
completionFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
}
return;
}
//更新bundle的状态
getOwnershipCache().updateBundleState(bundle, false)
.thenRun(() -> {
// update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.get().setLoadReportForceUpdateFlag();
// release old bundle from ownership cache
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
completionFuture.complete(null);
if (unload) {
// Unload new split bundles, in background. This will not
// affect the split operation which is already safely completed
r.forEach(this::unloadNamespaceBundle);
}
onNamespaceBundleSplit(bundle);
})
.exceptionally(e -> {
....
});
}, pulsar.getOrderedExecutor());
});
}
上面这个方法的逻辑比较丰富,但核心的分割流程实际上是调用的NamespaceBundleFactory的splitBundles进行的,这里就继续跟踪NamespaceBundleFactory的逻辑
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> splitBundles(
NamespaceBundle targetBundle, int argNumBundles, List<Long> splitBoundaries) {
//判断当前bundle是否支持分裂
checkArgument(canSplitBundle(targetBundle),
"%s bundle can't be split further since range not larger than 1", targetBundle);
....
NamespaceName nsname = targetBundle.getNamespaceObject();
final int numBundles = argNumBundles;
return bundlesCache.get(nsname).thenApply(sourceBundle -> {
final int lastIndex = sourceBundle.partitions.length - 1;
//重新创建数组保存哈希环的节点,因为bundle分裂后环上的节点会增多
final long[] partitions = new long[sourceBundle.partitions.length + (numBundles - 1)];
int pos = 0;
int splitPartition = -1;
final Range<Long> range = targetBundle.getKeyRange();
for (int i = 0; i < lastIndex; i++) {
//遍历当前Namespace的所有Bundle来找到需要进行分裂的目标Bundle
if (sourceBundle.partitions[i] == range.lowerEndpoint()
&& (range.upperEndpoint() == sourceBundle.partitions[i + 1])) {
splitPartition = i;
long minVal = sourceBundle.partitions[i];
partitions[pos++] = minVal;
if (splitBoundaries == null || splitBoundaries.size() == 0) {
long maxVal = sourceBundle.partitions[i + 1];
//numBundles就是要分割成的份数,这里相当于将原先Bundle负责的范围平均分给多个新的Bundle
long segSize = (maxVal - minVal) / numBundles;
long curPartition = minVal + segSize;
for (int j = 0; j < numBundles - 1; j++) {
partitions[pos++] = curPartition;
curPartition += segSize;
}
} else {
for (long splitBoundary : splitBoundaries) {
partitions[pos++] = splitBoundary;
}
}
} else {
partitions[pos++] = sourceBundle.partitions[i];
}
}
partitions[pos] = sourceBundle.partitions[lastIndex];
if (splitPartition != -1) {
// keep version of sourceBundle
//根据上面旧的Bundle范围划分来分裂出多个新的bundle
NamespaceBundles splitNsBundles =
new NamespaceBundles(nsname, this, sourceBundle.getLocalPolicies(), partitions);
List<NamespaceBundle> splitBundles = splitNsBundles.getBundles().subList(splitPartition,
(splitPartition + numBundles));
return new ImmutablePair<>(splitNsBundles, splitBundles);
}
return null;
});
}
到这里基本就结束,这条链路主要是Broker将原先的哈希环中的某一个范围拆分成多个范围的逻辑,这里保留几个问题给读者思考
- Bundle分裂后是否会涉及到数据的迁移?
- Bundle分裂算法有四种,区别是什么?