主动触发Bundle分裂服务端流程源码解析

发布于:2024-04-16 ⋅ 阅读:(23) ⋅ 点赞:(0)

一、正文

Namespace下的Topic是分Bundle进行管理的,每个Namespace都是一个哈希环,而Bundle负责管理环上某个范围上的Topic。通过这种方式可以更好的进行Topic的管理。当某个Bundle上负责的Topic越来越多时,会导致负责该Bundle的Broker节点压力变大。因此Pulsar还提供了Bundle分裂的机制,这个机制支持自动触发以及手动触发,今天这篇文章就从源码的角度分析手动触发Bundle分裂时服务端会发生什么
在这里插入图片描述

主动触发Bundle分裂操作方式

bin/pulsar-admin namespaces bundles public/default

//输出如下
{
  "boundaries" : [ "0x00000000", "0x08000000", "0x10000000", "0x20000000", "0x30000000", "0x40000000", "0x50000000", "0x60000000", "0x70000000", "0x80000000", "0x90000000", "0xa0000000", "0xb0000000", "0xc0000000", "0xd0000000", "0xe0000000", "0xf0000000", "0xffffffff" ],
  "numBundles" : 17
}

//指定某个bundle进行分裂
bin/pulsar-admin namespaces split-bundle --bundle 0x00000000_0x10000000 public/default

二、源码解析

Pulsar管理流相关的操作都是通过HTTP的方式,因为需要支持多种客户端类型(http、client、cli)。服务端处理这些操作都在admin模块下,如下图,本次要聊的bundle分裂就在Namespaces方法中
在这里插入图片描述

首先从Namespaces的splitNamespaceBundle进行跟踪

    public void splitNamespaceBundle(
            ....
            @QueryParam("splitAlgorithmName") String splitAlgorithmName, //指定bundle分裂算法
            @ApiParam("splitBoundaries") List<Long> splitBoundaries) {
         //校验参数格式以及是否存在对应的namespace
          validateNamespaceName(tenant, namespace);
          //异步进行分裂操作
          internalSplitNamespaceBundleAsync(bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries)
                .thenAccept(__ -> {
                    ....
                })
                .exceptionally(ex -> {
                    ....
                });
    }

可以看到最外层只是做些参数校验,那么就继续跟踪internalSplitNamespaceBundleAsync方法,如下

protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundleName,
                                                                        boolean authoritative, 
                                                                    		boolean unload,
                                                                        String splitAlgorithmName,
                                                                        List<Long> splitBoundaries) {
        return validateSuperUserAccessAsync()   //权限校验
                .thenAccept(__ -> {
                    checkNotNull(bundleName, "BundleRange should not be null");
                    log.info("[{}] Split namespace bundle {}/{}", clientAppId(), namespaceName, bundleName);
                    //获取当前集群所支持的bundle分裂算法,这里是硬编码的固定四种
                    List<String> supportedNamespaceBundleSplitAlgorithms =
                            pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
                  					//此处省去参数检查逻辑
     												....
                    }
                })
                .thenCompose(__ -> {
                    //此处省去参数检查逻辑
     								....
                })
                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())   //权限校验
                .thenCompose(__ -> getBundleRangeAsync(bundleName)) //获取要分裂的bundle的范围
                .thenCompose(bundleRange -> {
                    return getNamespacePoliciesAsync(namespaceName)
                            .thenCompose(policies ->
                                  //1. 校验Bundle的范围是否有效 
                                  //2.判断当前Broker节点是否负责这个bundle的管理,如果不是则重定向
                                	validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,authoritative, false))
                            //核心方法就是这里的 NamespaceService.splitAndOwnBundle
                            .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
                                    pulsar().getNamespaceService()
                                            .getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
                                    splitBoundaries));
                });
    }

接下来就是进入NamespaceService类的splitAndOwnBundle方法,NamespaceService也是Pulsar比较重要的一个类,这里先继续跟踪分割bundle的逻辑

public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
                                                     NamespaceBundleSplitAlgorithm splitAlgorithm,
                                                     List<Long> boundaries) {
        //如果实现了自定义的分割逻辑则使用自定义的
        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
            return ExtensibleLoadManagerImpl.get(loadManager.get())
                    .splitNamespaceBundleAsync(bundle, splitAlgorithm, boundaries);
        }

        final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
        final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
        //核心流程流程
        splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, boundaries);

        return unloadFuture;
    }


void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                       boolean unload,
                                       AtomicInteger counter,
                                       CompletableFuture<Void> completionFuture,
                                       NamespaceBundleSplitAlgorithm splitAlgorithm,
                                       List<Long> boundaries) {
        //获取bundle分割配置
        BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);

        //根据配置来选择对应的分割算法进行分割
        splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
            CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
            if (ex == null) {
    						....
                try {
                    //进行bundle分割操作
                    bundleFactory.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries)
                            .thenAccept(splitBundles -> {
																....
                                Objects.requireNonNull(splitBundles.getLeft());
                                Objects.requireNonNull(splitBundles.getRight());
                                checkArgument(splitBundles.getRight().size() == splitBoundaries.size() + 1,
                                        "bundle has to be split in " + (splitBoundaries.size() + 1) + " bundles");
                                NamespaceName nsname = bundle.getNamespaceObject();
                                ....
                                try {
                                    // 检查确保每个Bundle都有对应的Broker负责
                                    for (NamespaceBundle sBundle : splitBundles.getRight()) {
                                        Objects.requireNonNull(ownershipCache.tryAcquiringOwnership(sBundle));
                                    }
                                  	//更新Bundle信息,毕竟Bundle已经分裂好了,相关的一些元数据要同步更新
                                    updateNamespaceBundles(nsname, splitBundles.getLeft()).thenCompose(__ ->
                                        updateNamespaceBundlesForPolicies(nsname, splitBundles.getLeft()))
                                            .thenRun(() -> {
 	                                          bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
                                                updateFuture.complete(splitBundles.getRight());
                                    }).exceptionally(ex1 -> {
                                       	....
                                    });
                                } catch (Exception e) {
                                    ....
                                }
                            });
                } catch (Exception e) {
                    ....
                }
            } else {
                updateFuture.completeExceptionally(ex);
            }

            
            updateFuture.whenCompleteAsync((r, t)-> {
                if (t != null) {
                    // 失败则重试几次
                    if ((t.getCause() instanceof MetadataStoreException.BadVersionException)
                            && (counter.decrementAndGet() >= 0)) {
                        pulsar.getExecutor().schedule(() -> pulsar.getOrderedExecutor()
                                .execute(() -> splitAndOwnBundleOnceAndRetry(
                                        bundle, unload, counter, completionFuture, splitAlgorithm, boundaries)),
                                100, MILLISECONDS);
                    } else if (t instanceof IllegalArgumentException) {
                        completionFuture.completeExceptionally(t);
                    } else {
                        // Retry enough, or meet other exception
                        String msg2 = format(" %s not success update nsBundles, counter %d, reason %s",
                            bundle.toString(), counter.get(), t.getMessage());
                        LOG.warn(msg2);
                        completionFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
                    }
                    return;
                }

                //更新bundle的状态
                getOwnershipCache().updateBundleState(bundle, false)
                        .thenRun(() -> {
                            // update bundled_topic cache for load-report-generation
                            pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
                            loadManager.get().setLoadReportForceUpdateFlag();
                            // release old bundle from ownership cache
                            pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
                            completionFuture.complete(null);
                            if (unload) {
                                // Unload new split bundles, in background. This will not
                                // affect the split operation which is already safely completed
                                r.forEach(this::unloadNamespaceBundle);
                            }
                            onNamespaceBundleSplit(bundle);
                        })
                        .exceptionally(e -> {
                            ....
                        });
            }, pulsar.getOrderedExecutor());
        });
    }

上面这个方法的逻辑比较丰富,但核心的分割流程实际上是调用的NamespaceBundleFactory的splitBundles进行的,这里就继续跟踪NamespaceBundleFactory的逻辑

    public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> splitBundles(
            NamespaceBundle targetBundle, int argNumBundles, List<Long> splitBoundaries) {
        //判断当前bundle是否支持分裂
        checkArgument(canSplitBundle(targetBundle),
                "%s bundle can't be split further since range not larger than 1", targetBundle);
      	....   
        NamespaceName nsname = targetBundle.getNamespaceObject();

        final int numBundles = argNumBundles;

        return bundlesCache.get(nsname).thenApply(sourceBundle -> {
            final int lastIndex = sourceBundle.partitions.length - 1;
						//重新创建数组保存哈希环的节点,因为bundle分裂后环上的节点会增多
            final long[] partitions = new long[sourceBundle.partitions.length + (numBundles - 1)];
            int pos = 0;
            int splitPartition = -1;
            final Range<Long> range = targetBundle.getKeyRange();
            for (int i = 0; i < lastIndex; i++) {
              	//遍历当前Namespace的所有Bundle来找到需要进行分裂的目标Bundle
                if (sourceBundle.partitions[i] == range.lowerEndpoint()
                        && (range.upperEndpoint() == sourceBundle.partitions[i + 1])) {
                    splitPartition = i;
                    long minVal = sourceBundle.partitions[i];
                    partitions[pos++] = minVal;
                    if (splitBoundaries == null || splitBoundaries.size() == 0) {
                        long maxVal = sourceBundle.partitions[i + 1];
                      	//numBundles就是要分割成的份数,这里相当于将原先Bundle负责的范围平均分给多个新的Bundle
                        long segSize = (maxVal - minVal) / numBundles;
                        long curPartition = minVal + segSize;
                        for (int j = 0; j < numBundles - 1; j++) {
                            partitions[pos++] = curPartition;
                            curPartition += segSize;
                        }
                    } else {
                        for (long splitBoundary : splitBoundaries) {
                            partitions[pos++] = splitBoundary;
                        }
                    }

                } else {
                    partitions[pos++] = sourceBundle.partitions[i];
                }
            }
            partitions[pos] = sourceBundle.partitions[lastIndex];
            if (splitPartition != -1) {
                // keep version of sourceBundle
                //根据上面旧的Bundle范围划分来分裂出多个新的bundle
                NamespaceBundles splitNsBundles =
                        new NamespaceBundles(nsname, this, sourceBundle.getLocalPolicies(), partitions);
                List<NamespaceBundle> splitBundles = splitNsBundles.getBundles().subList(splitPartition,
                        (splitPartition + numBundles));
                return new ImmutablePair<>(splitNsBundles, splitBundles);
            }

            return null;
        });
    }

到这里基本就结束,这条链路主要是Broker将原先的哈希环中的某一个范围拆分成多个范围的逻辑,这里保留几个问题给读者思考

  1. Bundle分裂后是否会涉及到数据的迁移?
  2. Bundle分裂算法有四种,区别是什么?

三、参考文献

  1. 官方文档