ES源码五:写操作流程(从Es到底层Luence,全网最细的一篇,全是硬货)

发布于:2024-04-22 ⋅ 阅读:(37) ⋅ 点赞:(0)

今天是玩转es的一天

创建索引

image.png

写入文档

image.png

入口BaseRestHandler

BaseRestHandler是Rest请求的入口,你可以理解为spring mvc里面的controller一样
image.png
prepareRequest是一个抽象方法,实际上是由各种Rest*Action来重写的,比如这里我们是对索引文档的处理,会执行到RestIndexAction.prepareRequest

**RestIndexAction.**prepareRequest

这个方法最终会被文档写入的RestIndexAction处理

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    // todo 1. 索引操作请求
    IndexRequest indexRequest;
    // todo 2. 拿出Rest请求中的{Type} 信息,一般情况咱们 这里都是_doc
    final String type = request.param("type");
    if (type != null && type.equals(MapperService.SINGLE_MAPPING_NAME) == false) {
        deprecationLogger.deprecate(DeprecationCategory.TYPES, "index_with_types", TYPES_DEPRECATION_MESSAGE);
        indexRequest = new IndexRequest(request.param("index"), type, request.param("id"));
    } else {
        // todo 3.创建索取请求对象,参数“索取结构的名称”
        indexRequest = new IndexRequest(request.param("index"));
        // todo 4. 索引文档的Id
        indexRequest.id(request.param("id"));
    }

    // todo 5. 索取操作指定的路由信息,默认情况下可以不指定,指定之后,你的寻找分片的操作,会使用指定的routing值做计算
    indexRequest.routing(request.param("routing"));
    // todo 6. ES内部提供了一套ingest逻辑,类似于ETL 数据清洗
    /**
         * todo
         *  如果你的索取请求配置了pipline的话,那么在服务端会先对数据进行“数据清洗”的操作
         *  索取请求 会进入到管道中,管道中有多个“Filter”,对你的doc进行数据整理,比如说
         *  doc并没有指定 “时间戳”的话,那么你可以在这一步给doc添加上“时间戳”
         *  功能远不止这些,详细的用法大家可以找官网看资料
         *  在服务端pipline是单独“存储”,每个pipeline都有自己的名称
         */
    indexRequest.setPipeline(request.param("pipeline"));
    // todo 7.设置索引请求的数据
    /**
         * todo
         *  参数一:二进制数据引用的对象,通过它可以读取到 索取请求 的消息体的二进制数据
         *  参数二:二进制数据类型是上面,一般情况这里都是json
         */
    indexRequest.source(request.requiredContent(), request.getXContentType());
    // todo 8. 超时设置,默认情况都是1分钟
    indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
    // todo 9. 刷盘策略
    /**
         * todo
         *  索取操作的大概流程:请求先写入到ES buffer中,之后再写translog。默认情况下 buffer 内的数据 1秒钟 刷一磁盘
         *  刷盘做什么那?其实就是调用Lucene的IndexWriter,将buffer内的indexRequest数据,写入到Lucene的段缓冲区中(os cache)
         *  refreshPolicy:你可以通过这个字段,来决策 你这个请求 是不是要强制刷盘
         *  buffer 什么时候会刷盘那? 1. 强刷 2.满了(500mb)3.1秒钟会刷1次
         */
    indexRequest.setRefreshPolicy(request.param("refresh"));
    // todo 10. 这2个参数是一个过期参数, es现在有新的字段去控制“并发修改了”
    indexRequest.version(RestActions.parseVersion(request));
    indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
    // todo 11. 类似于学号,每个文档都会有一个独一无二的学号
    indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
    // todo 12. 类似于班级,在ES中Shard中,可以有1个PrimaryShard和多个ReplicationShard,当PrimaryShard发生变化之后,PrimaryTerm就会自增
    indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
    indexRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, indexRequest.isRequireAlias()));
    // todo 13. 操作类型
    String sOpType = request.param("op_type");
    // todo 14. 默认等待多少个存活分片,本次的 索取请求 才可以去写那
    /**
         * todo
         *  默认情况下1个分片存活即可,即PrimaryShard是在线的就行
         *  如果你指定成2的话,那么需要有一个PrimaryShard在线和一个ReplicationShard在线,这次的索引请求才会被处理
         */
        String waitForActiveShards = request.param("wait_for_active_shards");
        if (waitForActiveShards != null) {
            indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
        }
        if (sOpType != null) {
            indexRequest.opType(sOpType);
        }

        // todo 15.返回一个function
        /**
         * todo
         *  调用nodeClient -> index方法
         *  参数1: indexRequest 上面构建出来的这个索取请求对象
         *  参数2: RestStatusToXContentListener 这个实例包装了 RestChannel对象,同这个listener可以将响应发送到 rest请求的 请求端
         */
        return channel ->
            client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing())));
    }

总结一下一共做了哪些事:

  1. 将RestRequest转换为IndexRequest
    1. 设置文档id
    2. 设置routing路由
    3. 设置pipline,做数据清洗
    4. 设置超时
    5. 设置刷盘策略
    6. 设置文档的seqNo,类似于学号
    7. 设置文档的primaryTerm,类似于班级(这些参数后面都有用)

我们来看一下最终的indexRequest:
image.png

  1. 创建一个function(nodeClient.index(indexRequest))

action.accept =>nodeClient.index

image.png
这里最终就会执行上面说的那个function
image.png

NodeClient.index

这里先会调用到父类的AbstractClient.index方法
image.png
这里会创建一个indexAction的实例,接下来会调用到execute方法
image.png
这里的几个参数也看一下:

  1. action:之前创建的indexAction

image.png

  1. request:还是之前的indexRequest

image.png

NodeClient.doExecute

image.png
接下来调用doExecute方法会调用到executeLocal

NodeClient.executeLocal

image.png
这里会调用transportAction,通过IndexAction来获取transportIndexAction(也就是最终找到了Index的RPC处理逻辑入口):
image.png

下一步进入TransportIndexAction处理逻辑

TransportIndexAction.execute

image.png
里面是空的,也就是会调用父类TransportAction.execute方法

TransportAction.execute

image.png
这里会调用到execute(task,request, actionListener)
image.png
这里最终会调用到TransportBulkAction

到这里总结一下:

  1. nodeClient 通过IndexAction PRC类型找出对应的PRC处理器(TransportIndexAction)
  2. TransportIndexAction 将请求 转发给 另外一个“PRC处理器”(TransportBulkAction)

TransportBulkAction.doExecute

image.png
到这里的时候,会发现请求从indexRequest已经变成bulkRequest了,已经帮你打好包了,我们看一下bulkRequest里面是什么?
image.png

TransportBulkAction.doInternalExecute

//todo  参数1:task
    // 参数2:bulkRequest 批量索引请求对象
    // 参数3:executorName 执行器名称
    // 参数4:releasingListener 会在执行 onResponse 或者 onFailure 方法之前 先执行 releasable#close 方法,去更新
    // IndexingPressure 对象内的字段..
    protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
        // todo 1. 开始时间戳
        final long startTime = relativeTime();
        // todo 2. 响应数组,大小为 “bulk中的indexRequest”的数量,也就是说 响应是一对一的
        final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
        // todo 3. pipeline 相关代码
        boolean hasIndexRequestsWithPipelines = false;
        final Metadata metadata = clusterService.state().getMetadata();
        final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
        for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
            IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
            if (indexRequest != null) {
                // Each index request needs to be evaluated, because this method also modifies the IndexRequest
                boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata);
                hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
            }

            if (actionRequest instanceof IndexRequest) {
                IndexRequest ir = (IndexRequest) actionRequest;
                ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
                if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
                    throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
                }
            }
        }

        if (hasIndexRequestsWithPipelines) {
            // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
            // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
            // this path is never taken.
            try {
                if (Assertions.ENABLED) {
                    final boolean arePipelinesResolved = bulkRequest.requests()
                        .stream()
                        .map(TransportBulkAction::getIndexWriteRequest)
                        .filter(Objects::nonNull)
                        .allMatch(IndexRequest::isPipelineResolved);
                    assert arePipelinesResolved : bulkRequest;
                }
                if (clusterService.localNode().isIngestNode()) {
                    processBulkIndexIngestRequest(task, bulkRequest, executorName, listener);
                } else {
                    ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
                }
            } catch (Exception e) {
                listener.onFailure(e);
            }
            return;
        }
        // todo pipline结束

        // Attempt to create all the indices that we're going to need during the bulk before we start.
        // Step 1: collect all the indices in the request
        // todo 4. 获取出bulk请求所有涉及到的索引信息
        //  key:索引名称  value:别名.. 暂时先不考虑 别名的情况.
        final Map<String, Boolean> indices = bulkRequest.requests.stream()
            // delete requests should not attempt to create the index (if the index does not
            // exists), unless an external versioning is used
            .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
                || request.versionType() == VersionType.EXTERNAL
                || request.versionType() == VersionType.EXTERNAL_GTE)
            .collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2));

        // todo 5. 保存无法创建的索引
        // Step 2: filter the list of indices to find those that don't currently exist.
        final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
        // todo 6. 保存本地bulk请求 自动创建的那部分索引
        Set<String> autoCreateIndices = new HashSet<>();
        // todo 7. 集群状态对象
        ClusterState state = clusterService.state();
        // todo 8. 遍历bulk涉及的索引map
        for (Map.Entry<String, Boolean> indexAndFlag : indices.entrySet()) {
            final String index = indexAndFlag.getKey();
            // todo 这一步去判断是否需要去创建“index”的索引结构
            boolean shouldAutoCreate = indexNameExpressionResolver.hasIndexAbstraction(index, state) == false;
            // We should only auto create if we are not requiring it to be an alias
            if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
                // todo 将需要被创建 索引结构 的索引名称 保存下来..
                autoCreateIndices.add(index);
            }
        }

        // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
        // todo 如果索引结构都存在
        if (autoCreateIndices.isEmpty()) {
            executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
        } else {
            // todo 如果索引结构不存在
            // todo 计数器:初始值需要自动创建索引的数量
            final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
            // todo 遍历需要自动创建索引的结构的索引,去创建
            for (String index : autoCreateIndices) {
                // todo createIndex 底层会向Master发起 RPC 请求,让主节点去做创建索引结构的逻辑..
                //  成功或者失败 都会调用到 本机的  ActionListener 内部封装的逻辑...
                createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener<CreateIndexResponse>() {
                    @Override
                    public void onResponse(CreateIndexResponse result) {
                        if (counter.decrementAndGet() == 0) {
                            // todo 什么条件才会成立呢?所有的待创建索引结构,都创建了(成功或者失败),这种情况下会走这里..

                            //  根据executorName拿到一组线程资源,然后执行 事件...
                            threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(listener) {

                                @Override
                                protected void doRun() {
                                    // todo 9. 执行批量,比较重要
                                    executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
                                }
                            });
                        }
                    }

                    @Override
                    public void onFailure(Exception e) {
                        final Throwable cause = ExceptionsHelper.unwrapCause(e);
                        if (cause instanceof IndexNotFoundException) {
                            indicesThatCannotBeCreated.put(index, (IndexNotFoundException) cause);
                        }
                        else if ((cause instanceof ResourceAlreadyExistsException) == false) {
                            // fail all requests involving this index, if create didn't work
                            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                                DocWriteRequest<?> request = bulkRequest.requests.get(i);
                                if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
                                    bulkRequest.requests.set(i, null);
                                }
                            }
                        }
                        if (counter.decrementAndGet() == 0) {
                            final ActionListener<BulkResponse> wrappedListener = ActionListener.wrap(listener::onResponse, inner -> {
                                inner.addSuppressed(e);
                                listener.onFailure(inner);
                            });
                            threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(wrappedListener) {
                                @Override
                                protected void doRun() {
                                    executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated);
                                }

                                @Override
                                public void onRejection(Exception rejectedException) {
                                    rejectedException.addSuppressed(e);
                                    super.onRejection(rejectedException);
                                }
                            });
                        }
                    }
                });
            }
        }
    }

总结一下这个方法做了哪些事:

  1. 遍历bulk涉及到的“索引结构名称”

image.png

  1. 找出不存在的“索引结构”,通过RPC ->Master 进行创建索引结构。

image.png

  1. 执行transportBulkAction.executeBulk方法(也就是批量方法)

这里有一个比较重要的对象:ClusterState(集群状态对象)
image.png

  • nodes:节点信息: 包含了集群中所有节点的信息。它是一个映射,键是节点的ID。

image.png

  • routing table: 包含了所有索引的分片和副本的分配情况,即哪些节点包含哪些分片。

image.png
创建了几个索引,里面就是几个routingTable值
其中my_index就是我在使用的索引,我们创建的时候设置为5,这里正好创建了5个indexShardRoutingTable(这是单个分片的路由信息,包含一个主分片和多个副本分片的具体分布,每一个分片是通过 ShardRouting对象表示)
每一个ShardRouting对象就表示单个分片中主分片、副本分片、分片id、分片所在的节点,分片的当前状态

  • Metadata: 这部分存储了关于集群中所有索引的元数据,包括索引名称、 设置、映射、别名等

image.png


### TransportBulkAction.executeBulk ![image.png](https://img-blog.csdnimg.cn/img_convert/5b611969fb9c3153f6179150acab5ef0.png)
这里创建了BulkOperation,并且会调用它的run方法, **它的run方法最终会跑到BulkOperation.doRun方法中**

BulkOperation.doRun

    @Override
        protected void doRun() {
            assert bulkRequest != null;

            // todo 1. 通过observer 集群状态,observer这个方法 在内部 通过 clusterService 拿到集群状态,并保存到它内部字段中..
            final ClusterState clusterState = observer.setAndGetObservedState();

            // todo 2. 通过集群状态 判断集群是否可提供正常服务..
            if (handleBlockExceptions(clusterState)) {
                return;
            }

            //todo 3 当做一个工具,用于获取出索引请求的真实的索引名称(因为ES支持别名...内部做了解析别名的逻辑..)
            // 参数1:集群状态
            // 参数2:索引名称解析器
            final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);

            // todo 4. 集群元数据信息..(集群有哪些索引,索引有哪些Shard,Shard都在哪里..)
            Metadata metadata = clusterState.metadata();

            // todo 5.循环进行分组 等逻辑
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
                //the request can only be null because we set it to null in the previous step, so it gets ignored
                // todo 获取出当前索引请求对象
                if (docWriteRequest == null) {
                    continue;
                }
                if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
                    continue;
                }
                // todo 处理 索引结构 不在线 等等情况,将这些情况的 写请求跳过,,并设置好 对应的response
                if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) {
                    continue;
                }
                // todo 通过“工具”获取出来 当前请求的索引去名称
                Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
                try {
                    // The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether
                    // an operation is allowed in index into a data stream, but this isn't done when resolve call is cached, so
                    // the validation needs to be performed here too.
                    // todo 通过集群状态 获取 元数据 信息 -> 通过元数据 再拿到 索引映射表 -> 通过映射表 拿到当前 indexRequest 的indexAbstraction
                    //      indexAbstraction:通过它可以拿到当前这个索引的 元数据信息。比如说 有哪些分片...
                    IndexAbstraction indexAbstraction = clusterState.getMetadata().getIndicesLookup().get(concreteIndex.getName());
                    if (indexAbstraction.getParentDataStream() != null &&
                        // avoid valid cases when directly indexing into a backing index
                        // (for example when directly indexing into .ds-logs-foobar-000001)
                        concreteIndex.getName().equals(docWriteRequest.index()) == false &&
                        docWriteRequest.opType() != DocWriteRequest.OpType.CREATE) {
                        throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
                    }

                    switch (docWriteRequest.opType()) {
                        //todo CREATE 和 INDEX 有什么区别呢?
                        // POST /CREATE/...
                        // POST /INDEX/....
                        // CREATE:它会考虑文档是否存在的情况,如果存在一个ID相同的文档的话,CREATE请求会拒绝本次索引操作
                        // INDEX 则会覆盖..
                        case CREATE:
                        case INDEX:
                            prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
                            prohibitCustomRoutingOnDataStream(docWriteRequest, metadata);
                            IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                            // todo 获取当前请求的索引元数据
                            final IndexMetadata indexMetadata = metadata.index(concreteIndex);
                            // todo 获取索引的mapping信息
                            MappingMetadata mappingMd = indexMetadata.mappingOrDefault();
                            // todo 获取版本信息
                            Version indexCreated = indexMetadata.getCreationVersion();
                            // todo 解析routing,暂时咱们认为 routing 就是 POST 时传递的routing即可。
                            //  它里面的逻辑 还是跟别名有关系
                            indexRequest.resolveRouting(metadata);
                            // todo 当你未指定文档ID的情况下,内部会给indexRequest生成一个文档ID
                            indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
                            break;
                        case UPDATE:
                            TransportUpdateAction.resolveAndValidateRouting(metadata, concreteIndex.getName(),
                                (UpdateRequest) docWriteRequest);
                            break;
                        case DELETE:
                            docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
                            // check if routing is required, if so, throw error if routing wasn't specified
                            if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) {
                                throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
                            }
                            break;
                        default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                    }
                } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
                        docWriteRequest.id(), e);
                    BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
                    responses.set(i, bulkItemResponse);
                    // make sure the request gets never processed again
                    bulkRequest.requests.set(i, null);
                }
            }

            // todo 6.
            // first, go over all the requests and create a ShardId -> Operations mapping
            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest<?> request = bulkRequest.requests.get(i);
                if (request == null) {
                    continue;
                }
                String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
                //todo 计算本次索引请求要写入的分片ID
                // 第一步拿到了一个 OperationRouting 对象,这个对象内部封装了 路由算法..
                // 参数1:clusterState 集群状态
                // 参数2:索引名称
                // 参数3:文档id
                // 参数4:routing信息
                ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
                    request.routing()).shardId();
                // todo 如果map中不存在shardId的k-v的情况,那么使用参数2 shard-> new arraylist 在map中插入一条 k-v
                List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                // todo 将当前索引请求 加入到 分片的请求组 中,再次包装了“索取请求”,包装对象记录了 索引请求 在bulkRequest中的位置
                // todo 方便后续设置 响应结果
                shardRequests.add(new BulkItemRequest(i, request));
            }

            if (requestsByShard.isEmpty()) {
                // todo 如果分组之后,分组结果是空map的话,直接对端发送 响应
                listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                    buildTookInMillis(startTimeNanos)));
                return;
            }

            // todo counter:计数器,初始值为 bulk 请求分组之后,分组数量,也就是涉及到的 分片数量
            final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
            // todo 本机的 nodeId值
            String nodeId = clusterService.localNode().getId();
            // todo 遍历bulk 按照shard 分组的结果
            for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
                // todo 当前分片id
                final ShardId shardId = entry.getKey();
                // todo 当前分片的请求列表
                final List<BulkItemRequest> requests = entry.getValue();
                // todo 创建 批量分片请求对象 ,内部封装了 打在该分片上的 索取请求
                BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                        requests.toArray(new BulkItemRequest[requests.size()]));
                // todo 需要等待在线的分片数量 (默认1)
                bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                // todo 超时限制
                bulkShardRequest.timeout(bulkRequest.timeout());
                // todo 路由结果基于的集群状态版本号
                bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
                if (task != null) {
                    bulkShardRequest.setParentTask(nodeId, task.getId());
                }
                // todo 参数1: 分片批量请求对象 参数二: 响应监听器
                shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
                    @Override
                    public void onResponse(BulkShardResponse bulkShardResponse) {
                        for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                            // we may have no response if item failed
                            if (bulkItemResponse.getResponse() != null) {
                                bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                            }
                            responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    @Override
                    public void onFailure(Exception e) {
                        // create failures for all relevant requests
                        for (BulkItemRequest request : requests) {
                            final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
                            DocWriteRequest<?> docWriteRequest = request.request();
                            responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
                                    new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    private void finishHim() {
                        listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
                            buildTookInMillis(startTimeNanos)));
                    }
                });
            }
            bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
        }

这段代码非常多,总结一下做了这些事:

  1. 遍历bulkRequest中的全部“DocWriteRequest”,对于未指定“_id”的写请求,这一步自动生成。
  2. 根据“id”进行路由算法,找到当前“DocWriteRequest”命中的“shard”,加入到Shard到请求分组中。

image.png

  1. 之后,根据Shard分组的请求,交给“TransportShardBulkAction”处理,其实这一步事交给“TransportShardBulkAction”的爷爷“TransportReplicaitonAction”来作为处理入口,从TransportBulkAction将数据 根据shardId分组,然后让调用TransportReplicationAction来完成分组 数据的插入、更新或者删除操作, 完成了很漂亮的解耦

image.png

另外说明一下为什么TransportBulkAction处理完成之后又给到TransportShardBulkAction?
TransportBulkAction批量请求中的各个操作(如索引、更新、删除操作)根据目标索引和分片进行分组。这是为了确保每个操作能够被路由到正确的分片上进行处理
TransportShardBulkAction:负责在特定分片上执行这些操作。它处理的是已经被TransportBulkAction分解和分派到具体分片的操作,对于每个分片,TransportShardBulkAction 执行实际的文档索引、更新或删除操作。这包括与Lucene索引交互、处理文档版本控制和执行任何必要的索引映射更新。

TransportReplicaitonAction.doExecute

image.png
image.png
这里又创建了一个ReroutePhase(从这个类也大概知道,它要重写路由,为什么要重新路由,因为当前节点不一定是主分片节点),然后调用了它的run方法,当然最终也是会执行到doRun方法

ReroutePhase.doRun

  @Override
        public void onFailure(Exception e) {
            finishWithUnexpectedFailure(e);
        }

        @Override
        protected void doRun() {
            setPhase(task, "routing");
            final ClusterState state = observer.setAndGetObservedState();
            final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
            if (blockException != null) {
                if (blockException.retryable()) {
                    logger.trace("cluster is blocked, scheduling a retry", blockException);
                    retry(blockException);
                } else {
                    finishAsFailed(blockException);
                }
            } else {
                // todo 1. 获取当前分片归属的索引的 索引元数据信息
                final IndexMetadata indexMetadata = state.metadata().index(request.shardId().getIndex());
                if (indexMetadata == null) {
                    // ensure that the cluster state on the node is at least as high as the node that decided that the index was there
                    // todo 2.什么时候会发生state.version() < request。routedBasedOnClusterVersion()
                    //  ReroutePhase调用点存在两个:
                    //  1. 本地TransportBulkAction 通过 TransportShardBulkAction.execute
                    //  2. 远程RPC发起 indices:data/write/bulk[s][p] 事件,最终也会在这里创建 ReroutePhase 去执行逻辑..
                    if (state.version() < request.routedBasedOnClusterVersion()) {
                        logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
                                "Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
                            request.shardId().getIndex(), request, state.version(), request.routedBasedOnClusterVersion());
                        retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
                            "] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]",
                            request.shardId().getIndexName()));
                        return;
                    } else {
                        finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
                        return;
                    }
                }

                if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
                    finishAsFailed(new IndexClosedException(indexMetadata.getIndex()));
                    return;
                }

                if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
                    // if the wait for active shard count has not been set in the request,
                    // resolve it from the index settings
                    request.waitForActiveShards(indexMetadata.getWaitForActiveShards());
                }
                assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
                    "request waitForActiveShards must be set in resolveRequest";

                // todo 3. 获取“主分片”的路由信息
                final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
                if (primary == null || primary.active() == false) {
                    logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
                        + "cluster state version [{}]", request.shardId(), actionName, request, state.version());
                    retryBecauseUnavailable(request.shardId(), "primary shard is not active");
                    return;
                }
                if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
                    logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
                        + "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
                    retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
                    return;
                }
                // todo 4. 获取主分片 所在节点信息
                final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
                if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
                    // todo 5.说明主分片 就是本地负责的 一个分片
                    performLocalAction(state, primary, node, indexMetadata);
                } else {
                    // todo 5. 说明主分片是有其他data node负责的一个分片,这里就需要走RPC转发请求了
                    performRemoteAction(state, primary, node);
                }
            }
        }

这段代码也是极其的长,总结一下做了哪些事:

  1. 通过集群状态读取PrimaryShard的路由信息,获取出primaryShard归属的Node节点信息。根据Node是否为当前节点来决定后续逻辑
  2. 如果是本机负责的分片,就执行performLocalAction
  3. 如果是其他机器负责的分片,就执行performRemoteAction

ReroutePhase.performLocalAction

image.png
一直往performAction里面跟,会发现执行RPC “indices:data/write/bulk [p]”,其实就是一次本地调用,rpcHandler为TransportReplicationAction.handlerPrimaryRequest(), 这个rpcHandler是在构造方法里面设置上的
image.png
在这个handlerPrimaryRequest里面会创建一个AsyncPrimaryAction:
image.png

ReroutePhase.performRemoteAction

image.png
这个performRemoteAction 会执行PRC“indices:data/write/bulk”,这是一次远程RPC调用,请求会发送到PrimaryShard归属节点。那这个PRC请求由哪个rpcHandler处理那?TransportReplicationAction.handleOperationRequest(),在这个方法会再次调用runReroutePhase逻辑,最终也是会走到performLocalAction😄

总结一下:

  1. 通过集群状态读取出PrimaryShard的路由信息,获取出PrimaryShard归属的Node节点信息。根据Node是否为当前节点来决定后续逻辑
  2. PrimaryShard为本机节点:执行 RPC “indices:data/write/bulk [p]”,其实是一次本地调用,rpcHandler为 TransportReplicationAction->handlerPrimaryRequest(…),在这个方法创建了一个叫做“AsyncPrimaryAction”的AbstractRunnable的实现对象
  3. PrimaryShard为其它节点:执行RPC “indices:data/write/bulk”,这是一次远程RPC调用,请求会发送到PrimaryShard归属节点。

AsyncPrimaryAction.doRun

 @Override
        protected void doRun() throws Exception {
            // todo 1. 获取出请求分片的id和对象
            final ShardId shardId = primaryRequest.getRequest().shardId();
            // todo 2. 获取本机中指定分片id的索引分片对象,这个对象是一个非常核心的对象,内部封装了 索引分配的 底层操作
            //  比如:写、查询、get、刷盘
            final IndexShard indexShard = getIndexShard(shardId);
            // todo 3.获取索引分配对象的分片路由信息
            final ShardRouting shardRouting = indexShard.routingEntry();
            // we may end up here if the cluster state used to route the primary is so stale that the underlying
            // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
            // the replica will take over and a replica will be assigned to the first node.
            if (shardRouting.primary() == false) {
                throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
            }
            final String actualAllocationId = shardRouting.allocationId().getId();
            // todo 如果本机指定的shardId的IndexShard它的allocationId不是 request中定义的 allocationId,这里直接抛出异常
            if (actualAllocationId.equals(primaryRequest.getTargetAllocationID()) == false) {
                throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]",
                    primaryRequest.getTargetAllocationID(), actualAllocationId);
            }
            final long actualTerm = indexShard.getPendingPrimaryTerm();
            if (actualTerm != primaryRequest.getPrimaryTerm()) {
                // todo 如果分片实际的primaryTerm并不是请求的primaryTerm,可能在此期间 shardId 下的实例 发生过 主分片的选举
                throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]",
                    primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
            }

            // todo 4. 获取操作凭证
            //  参数一:indexShard 主分片对象
            //  参数二:request 获取的是 shardBulkRequest对象
            //  参数三:封装了一个监听器,获取完成凭证之后的处理逻辑
            acquirePrimaryOperationPermit(
                    indexShard,
                    primaryRequest.getRequest(),
                    ActionListener.wrap(
                            releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
                            e -> {
                                if (e instanceof ShardNotInPrimaryModeException) {
                                    onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
                                } else {
                                    onFailure(e);
                                }
                            }));
        }

// todo 参数 primaryShardReference封装了 主分片对象和 操作凭证释放器
        void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
            try {
                // todo 1. 获取集群状态
                final ClusterState clusterState = clusterService.state();
                // todo 2. 通过集群状态最终获取到 索引元数据 信息
                final IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(primaryShardReference.routingEntry().index());

                final ClusterBlockException blockException = blockExceptions(clusterState, indexMetadata.getIndex().getName());
                if (blockException != null) {
                    logger.trace("cluster is blocked, action failed on primary", blockException);
                    throw blockException;
                }

                // todo 如果主分片在请求期间 被转移到其他节点负责了
                if (primaryShardReference.isRelocated()) {
                    // todo 关闭,释放凭证
                    primaryShardReference.close(); // release shard operation lock as soon as possible
                    setPhase(replicationTask, "primary_delegation");
                    // delegate primary phase to relocation target
                    // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                    // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                    // todo 获取最新的主分片路由信息
                    final ShardRouting primary = primaryShardReference.routingEntry();
                    assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                    final Writeable.Reader<Response> reader = TransportReplicationAction.this::newResponseInstance;
                    // todo 获取转移到的节点
                    DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
                    // todo 转发请求
                    transportService.sendRequest(relocatingNode, transportPrimaryAction,
                        new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
                            primaryRequest.getPrimaryTerm()), transportOptions,
                        new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
                            @Override
                            public void handleResponse(Response response) {
                                setPhase(replicationTask, "finished");
                                super.handleResponse(response);
                            }

                            @Override
                            public void handleException(TransportException exp) {
                                setPhase(replicationTask, "finished");
                                super.handleException(exp);
                            }
                        });
                } else {
                    // todo 3.正常情况走这里:即主分片由当前节点负责
                    setPhase(replicationTask, "primary");
                    // todo 4. 创建监听器:“主分片写结果监听器” 调用时机:主分片写完成+ 副本分片也写完成的时候 才会调用这个监听器
                    final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
                        // todo onResponse流程
                        adaptResponse(response, primaryShardReference.indexShard);
                        // todo 默认:syncGlobalCheckpointAfterOperation 为true
                        if (syncGlobalCheckpointAfterOperation) {
                            try {
                                primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
                            } catch (final Exception e) {
                                // only log non-closed exceptions
                                if (ExceptionsHelper.unwrap(
                                    e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
                                    // intentionally swallow, a missed global checkpoint sync should not fail this operation
                                    logger.info(
                                        new ParameterizedMessage(
                                            "{} failed to execute post-operation global checkpoint sync",
                                            primaryShardReference.indexShard.shardId()), e);
                                }
                            }
                        }

                        primaryShardReference.close(); // release shard operation lock before responding to caller
                        setPhase(replicationTask, "finished");
                        onCompletionListener.onResponse(response);
                    }, e -> handleException(primaryShardReference, e));

                    // todo 6. 封装了很多参数,然后创建了ReplicationOperation,这个对象又去封装了 主分片写操作 + 副本写操作,当写操作都完成,去调用
                    /**
                     * todo
                     *  参数一:primaryRequest.getRequest()  =>  ShardBulkRequest 对象
                     *  参数二:primaryShardReference
                     *  参数三:写结果监听器
                     *  参数四:newReplicasProxy() => 拿到一个 ReplicasProxy 实例,该实例封装了 向 副本同步数据的 RPC 请求逻辑..
                     *  参数五:logger
                     *  参数六:threadPool 线程池(transportReplicationAction的资源..)
                     *  参数七:actionName 值:"indices:data/write/bulk[s]"
                     *  参数八:primaryRequest.getPrimaryTerm() => 主分片的primaryTerm
                     *  参数九:initialRetryBackoffBound 初始值50毫秒,当向副本所在Node发起RPC同步数据失败时,重试的时间间隔 初始值,
                     *  参数十:retryTimeout 向副本所在node发起rpc同步数据,重试的超时限制
                     *
                     */
                    new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
                        responseListener.map(result -> result.finalResponseIfSuccessful),
                        newReplicasProxy(), logger, threadPool, actionName, primaryRequest.getPrimaryTerm(), initialRetryBackoffBound,
                        retryTimeout)
                        .execute();
                }
            } catch (Exception e) {
                handleException(primaryShardReference, e);
            }
        }

这里还是总结一下:

  1. 先获取请求的分片id,通过分片id拿到“索取分片对象”:indexShard

image.png

  1. 获取索引分片对象的分片路由对象

image.png

  1. 获取操作凭证
  2. 先获取集群状态对象,然后通过**集群状态对象、分片路由对象 **来获取索引元数据

image.png

  1. 创建ReplicationOperation对象,执行execute方法

ReplicationOperation.execute

image.png
这里没什么核心的,只要关注primary.perform方法就行

PrimaryShardReference.�perform

image.png
这个一个抽象方法,最终被子类的TransportWriteAction实现

TransportWriteAction.shardOperationOnPrimary

image.png
同样,这个dispatchedShardOperationOnPrimary依旧是个抽象方法,最终会被TransportShardBulkAction实现,兜兜转转又回来了。。

TransportShardBulkAction.dispatchedShardOperationOnPrimary

image.png
接下来会调用到transportShardBulkAction.performOnPrimary方法

TransportShardBulkAciton.performOnPrimary

image.png

�TransportShardBulkAciton.executeBulkItemRequest

  static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
                                       MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
                                       ActionListener<Void> itemDoneListener) throws Exception {
        // todo 1. 获取当前子请求的操作类型:目前仅考虑 INDEX CREATE 这两种情况
        final DocWriteRequest.OpType opType = context.getCurrent().opType();

        final UpdateHelper.Result updateResult;
        if (opType == DocWriteRequest.OpType.UPDATE) {
            final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
            try {
                updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
            } catch (Exception failure) {
                // we may fail translating a update to index or delete operation
                // we use index result to communicate failure while translating update request
                final Engine.Result result =
                    new Engine.IndexResult(failure, updateRequest.version());
                context.setRequestToExecute(updateRequest);
                context.markOperationAsExecuted(result);
                context.markAsCompleted(context.getExecutionResult());
                return true;
            }
            // execute translated update request
            switch (updateResult.getResponseResult()) {
                case CREATED:
                case UPDATED:
                    IndexRequest indexRequest = updateResult.action();
                    IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();
                    MappingMetadata mappingMd = metadata.mappingOrDefault();
                    indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
                    context.setRequestToExecute(indexRequest);
                    break;
                case DELETED:
                    context.setRequestToExecute(updateResult.action());
                    break;
                case NOOP:
                    context.markOperationAsNoOp(updateResult.action());
                    context.markAsCompleted(context.getExecutionResult());
                    return true;
                default:
                    throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());
            }
        } else {
            // todo 2. 执行到这里,说明当前请求操作类型为:index create 或者delete中一个
            context.setRequestToExecute(context.getCurrent());
            updateResult = null;
        }

        assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state
        // todo 3. 主分片对象
        final IndexShard primary = context.getPrimary();
        // todo 4. 获取请求的version值,version是干嘛的那,es乐观锁
        final long version = context.getRequestToExecute().version();
        final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
        final Engine.Result result;
        if (isDelete) {
            final DeleteRequest request = context.getRequestToExecute();
            result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
                request.ifSeqNo(), request.ifPrimaryTerm());
        } else {
            // todo 从上下文中获取当前的 索取请求
            final IndexRequest request = context.getRequestToExecute();
            /**
             * todo
             *  参数一:version 版本
             *  参数二:versionType 类型(默认:INTERNAL)
             *  参数三:SourceToParse对象 (内部引擎基于该对象进行索引工作)
             *  参数四:seqNo request.ifSeqNo() 新的版本控制
             *  参数五:primaryTerm request.ifPrimaryTerm() 新的版本控制
             *  参数六:request.getAutoGeneratedTimestamp() 时间戳
             *  参数七:boolean request.isRetry() 是否为重试
             */
            result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
                    request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
                request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
        }

总结一下这里:

  1. 获取当前请求的操作类型
  2. 从上下文里面获取“主分片”对象, 因为后续要通过它完成写操作

image.png

  1. 封装SourceToParse

image.png
image.png

  1. 调用主分片的applyIndexOperationOnPrimary,进行底层写操作
  2. 根据不同的调用结果,进行处理

IndexShard.applyIndexOperationOnPrimary

image.png
继续往里面看:

private Engine.IndexResult applyIndexOperation(
    Engine engine, long seqNo, long opPrimaryTerm, long version,
    @Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm,
    long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
    SourceToParse sourceToParse) throws IOException {

    // 断言,确保操作的主术语不大于当前分片的主术语
    assert opPrimaryTerm <= getOperationPrimaryTerm()
            : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
    
    // 确保当前的操作源是允许写入的
    ensureWriteAllowed(origin);
    
    Engine.Index operation;
    try {
        // 解析文档类型
        final String resolvedType = mapperService.resolveDocumentType(sourceToParse.type());
        final SourceToParse sourceWithResolvedType;
        
        // 如果解析的文档类型与原始类型相同,使用原始的sourceToParse;否则创建一个新的sourceToParse实例
        if (resolvedType.equals(sourceToParse.type())) {
            sourceWithResolvedType = sourceToParse;
        } else {
            sourceWithResolvedType = new SourceToParse(sourceToParse.index(), resolvedType, sourceToParse.id(),
                sourceToParse.source(), sourceToParse.getXContentType(), sourceToParse.routing());
        }
        
        // 准备索引操作
        operation = prepareIndex(docMapper(resolvedType), sourceWithResolvedType,
            seqNo, opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry, ifSeqNo, ifPrimaryTerm);
        
        // 检查并更新动态映射
        Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
        if (update != null) {
            return new Engine.IndexResult(update);
        }
    } catch (Exception e) {
        // 解析或映射更新期间的任何异常都视为文档级失败,
        // 不会导致关闭分片的副作用
        verifyNotClosed(e);
        return new Engine.IndexResult(e, version, opPrimaryTerm, seqNo);
    }

    // 执行索引操作,并返回操作结果
    return index(engine, operation);
}

�继续往里面看index方法:

private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
    // 将活动状态设为真,可能用于表示开始了一个新的索引操作
    active.set(true);
    final Engine.IndexResult result;

    // 在实际索引之前,调用监听器的 preIndex 方法,可能用于执行一些预处理操作
    index = indexingOperationListeners.preIndex(shardId, index);
    try {
        // 如果启用了 Trace 日志级别,则记录详细的索引操作信息
        if (logger.isTraceEnabled()) {
            // 记录索引请求的详细信息,包括文档类型、ID、序列号等,注意这里没有将源数据转为字符串,以避免编码错误
            logger.trace("index [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
                index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
                index.origin());
        }
        // 执行索引操作,将文档索引到 Elasticsearch
        result = engine.index(index);

        // 再次检查是否启用了 Trace 日志级别,如果是,则记录索引完成后的状态
        if (logger.isTraceEnabled()) {
            logger.trace("index-done [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}] " +
                    "result-seq# [{}] result-term [{}] failure [{}]",
                index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
                index.origin(), result.getSeqNo(), result.getTerm(), result.getFailure());
        }
    } catch (Exception e) {
        // 如果索引操作过程中抛出异常,记录异常信息
        if (logger.isTraceEnabled()) {
            logger.trace(new ParameterizedMessage(
                "index-fail [{}][{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
                index.type(), index.id(), index.seqNo(), routingEntry().allocationId(), index.primaryTerm(), getOperationPrimaryTerm(),
                index.origin()
            ), e);
        }
        // 调用监听器的 postIndex 方法,可能用于处理异常后的清理工作
        indexingOperationListeners.postIndex(shardId, index, e);
        // 重新抛出异常,使得调用者能够处理或记录这个异常
        throw e;
    }
    // 如果索引操作成功,调用监听器的 postIndex 方法,可能用于执行一些后处理操作
    indexingOperationListeners.postIndex(shardId, index, result);
    // 返回索引结果
    return result;
}

�看这里的engine.index方法

InternalEngine

image.png
image.png
image.png
�最终调用到这里,到这里,是不是大彻大悟了😄,
indexWriter就是lucene的API了,想学Lucene的可以再看它的源码去

![image.png](https://cdn.nlark.com/yuque/0/2024/png/772987/1713785040929-dc113277-9f84-46dc-8fb1-343cdbfbac1e.png#averageHue=%23212226&clientId=u3c9595f3-33f7-4&from=paste&height=208&id=u4e6be745&originHeight=416&originWidth=2096&originalType=binary&ratio=2&rotation=0&s