【Elasticsearch】写入流程-中间件专区论坛-技术-SpringForAll社区

【Elasticsearch】写入流程

Elasticsearch写入分为单个文档的写入和批量写入,单个文档的写入称作Index请求,批量写入称为Bulk请求,他们具有相同的处理逻辑,请求被统一封装为BuklRequest。

img

单个文档的写入流程:

1.客户端向Node1发送写入请求

2.Node1使用文档的id确定文档属于分片0,因为分片0的主分片不在当前节点,此时Node1充当协调节点,通过集群状态中的路由表可知分片0的主分片位于Node3,请求将会被转发到Node3

3.Node3上的主分片执行写入操作(需要有足够的活跃分片数),如果写入成功,将请求转发到Node1和Node2上,因为这两个节点都有分片0的副本分片,Node3等待所有副本分片的响应结果,当所有的分片都写入成功时,Node3向协调节点发送响应,再由协调节点Node1向客户端报告写入成功

1.活跃的分片数

public final Request waitForActiveShards(ActiveShardCount waitForActiveShards) {
     this.waitForActiveShards = waitForActiveShards;
     return (Request) this;
 }

ES写入一致性的默认策略是quorum,即多数的分片在写入操作时处于可用状态,计算方式如下: quorum = int((primary + number_of_replicas)/ 2)+ 1

primary是主分片数量,number_of_replicas是副本分片数量,假设一个索引,每个主分片有3个副本分片,那么quorum =(1+3)/2+1 = 3,往这个索引的主分片写入数据时,活跃的分片数至少为3才能写入成功,如果有两个副本分片失效,活跃分片数为1个主分片和1个副本分片。此时活跃分片数为2,那么此时就不允许向此分片写入数据。

可以通过配置文件index.write.wait_for_active_shards设置活跃分片数,也可以在请求中添加参数,默认是1,只要主分片可以即可写入。

详细信息可参考官方文档:

https://www.elastic.co/guide/en/elasticsearch/reference/6.1/docs-index_.html#index-wait-for-active-shards

2.路由算法

一般情况下,路由计算方式如下:

shard_num = hash(_routing) % num_primary_shards

_routing:默认情况下就是文档的id

num_primary_shards:主分片的数量

协调节点流程

  • 参数检查,遇到异常拒绝当前请求
  • 写入时,如果索引未创建,自动创建对应的index,具体在TransportBulkAction的doExecute方法中
  • 协调节点开始处理请求,入口为TransportBulkAction的executeBulk方法,之后进入TransportBulkAction的内部类BulkOperation中的doRun方法
  • doRun方法中,检查集群状态,如果集群有异常如Master节点不存在,写入请求会阻塞等待Master节点.
  • 从集群中获取集群的元数据信息metaData.
  • 遍历bulkRequest,从请求中获取每一个索引concreteIndex,判断请求类型,如果是写入操作,将请求转为IndexRequest,根据concreteIndex从metaData中获取该索引的元数据,之后获取索引的Mapping、创建版本号,并检查Mapping、ID等信息,如果ID为空将自动生成ID.
  • 再次遍历bulkRequest,将每一个请求重新封装为基于shard的请求列表,这么做是为了将路由到同一个分片的所有请求封装到同一个Request中。在这个过程中,会使用路由算法计算文档应该存储到哪个分片上,得到分片的ID,然后将写入到同一个分片的请求合并到同一个shardRequests中.
  • 遍历requestsByShard,里面记录了每个分片对应的所有写入请求,然后将分片请求封装为BulkShardRequest,等待有足够活跃的分片数,之后向分片执行请求,并在listener中等待响应,每个响应也是以shard为单位的,响应信息被设置到Response中.
  • 向分片转发请求具体的实现位于TransportReplicationAction中,转发前同样获取最新的集群状态,根据集群状态中的内容路由表找到目的shard对应主分片所在的节点,如果主分片在当前节点中,直接在本地执行,否则转发到相应的节点中执行.

TransportBulkAction

TransportBulkAction中BulkOperation的doRun方法解析用户请求:

public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {

    @Override
    protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
        ......

        if (needToCheck()) {
            // 获取请求中所有的索引
            final Set<String> indices = bulkRequest.requests.stream()
                .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
                        || request.versionType() == VersionType.EXTERNAL
                        || request.versionType() == VersionType.EXTERNAL_GTE)
                .map(DocWriteRequest::index)
                .collect(Collectors.toSet());
            final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
            Set<String> autoCreateIndices = new HashSet<>();
            ClusterState state = clusterService.state();
            // 遍历所有的索引
            for (String index : indices) {
                boolean shouldAutoCreate;
                try {
                    // 是否自动创建索引
                    shouldAutoCreate = shouldAutoCreate(index, state);
                } catch (IndexNotFoundException e) {
                    shouldAutoCreate = false;
                    indicesThatCannotBeCreated.put(index, e);
                }
                if (shouldAutoCreate) {
                    // 自动创建索引
                    autoCreateIndices.add(index);
                }
            }
           
            ......
            
        } else {
            executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
        }
    }
    private final class BulkOperation extends AbstractRunnable {
        // 处理请求
        protected void doRun() throws Exception {
            // 检查集群状态,如果master节点不存在,会阻塞等待Master节点,甚至超时
            final ClusterState clusterState = observer.setAndGetObservedState();
            if (handleBlockExceptions(clusterState)) {
                return;
            }
            final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
            // 获取集群元数据
            MetaData metaData = clusterState.metaData();
            // 遍历请求
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);
                //the request can only be null because we set it to null in the previous step, so it gets ignored
                if (docWriteRequest == null) {
                    continue;
                }
                if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {
                    continue;
                }
                // 获取索引信息
                Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
                try {
                    // 判断操作类型
                    switch (docWriteRequest.opType()) {
                        case CREATE:
                        case INDEX://写入操作
                            // 转为IndexReqeust
                            IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                            // 获取索引的元数据
                            final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                            // 获取mapping的元数据
                            MappingMetaData mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                            // 获取创建版本号
                            Version indexCreated = indexMetaData.getCreationVersion();
                            // 处理路由
                            indexRequest.resolveRouting(metaData);
                            // 检查mapping、id等信息
                            indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
                            break;
                        case UPDATE://更新操作
                            TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
                            break;
                        case DELETE://删除操作
                            docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index()));
                            // check if routing is required, if so, throw error if routing wasn't specified
                            if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
                                throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
                            }
                            break;
                        default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                    }
                } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);
                    BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
                    responses.set(i, bulkItemResponse);
                    // make sure the request gets never processed again
                    bulkRequest.requests.set(i, null);
                }
            }

            // 将用户的请求重新封装为基于shard的请求列表
            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest request = bulkRequest.requests.get(i);
                if (request == null) {
                    continue;
                }
                // 获取索引
                String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
                // 根据路由算法计算文档应该存储到哪个分片上,得到分片的ID
                ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
                // 根据分片id判断是否已经存入到requestsByShard中,如果存在就取出放入shardRequests,如果不存在将当前的分片ID作为Key,创建一个空的ArrayList作为value
                List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                // 当前封装的请求添加到shardRequests中,为了将路由到同一个分片的文档封装到一个shardRequest中
                shardRequests.add(new BulkItemRequest(i, request));
            }
            // 如果requestsByShard为空
            if (requestsByShard.isEmpty()) {
                listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
                return;
            }

            final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
            // 当前节点的id
            String nodeId = clusterService.localNode().getId();
            // 遍历requestsByShard
            for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
                // 获取分片ID
                final ShardId shardId = entry.getKey();
                // 获取该分片ID对应的所有请求
                final List<BulkItemRequest> requests = entry.getValue();
                // 将请求封装为BulkShardRequest
                BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                        requests.toArray(new BulkItemRequest[requests.size()]));
                // 等待有足够活跃的分片数
                bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                bulkShardRequest.timeout(bulkRequest.timeout());
                if (task != null) {
                    bulkShardRequest.setParentTask(nodeId, task.getId());
                }
                // 向分片执行请求,在listener中等待响应
                shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
                    @Override
                    public void onResponse(BulkShardResponse bulkShardResponse) {
                        for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                            // we may have no response if item failed
                            if (bulkItemResponse.getResponse() != null) {
                                bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                            }
                            responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    @Override
                    public void onFailure(Exception e) {
                        // create failures for all relevant requests
                        for (BulkItemRequest request : requests) {
                            final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
                            DocWriteRequest docWriteRequest = request.request();
                            responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
                                    new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
                        }
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    private void finishHim() {
                        listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
                    }
                });
            }
        }
    }
}

TransportReplicationAction

TransportReplicationAction的内部类ReroutePhase中的doRun方法向分片转发请求:

public abstract class TransportReplicationAction<
            Request extends ReplicationRequest<Request>,
            ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
            Response extends ReplicationResponse
        > extends TransportAction<Request, Response> {
        
    final class ReroutePhase extends AbstractRunnable {
                @Override
        protected void doRun() {
            setPhase(task, "routing");
            // 获取集群状态
            final ClusterState state = observer.setAndGetObservedState();
            if (handleBlockExceptions(state)) {
                return;
            }

            // 获取请求中的索引
            final String concreteIndex = concreteIndex(state);
            // 获取索引元数据
            final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
            // 如果元数据为空
            if (indexMetaData == null) {
                retry(new IndexNotFoundException(concreteIndex));
                return;
            }
            if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
                throw new IndexClosedException(indexMetaData.getIndex());
            }

            // 等待足够活跃的分片数
            resolveRequest(indexMetaData, request);
            assert request.shardId() != null : "request shardId must be set in resolveRequest";
            assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
            // 获取主分片所在节点
            final ShardRouting primary = primary(state);
            if (retryIfUnavailable(state, primary)) {
                return;
            }
            final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
            // 如果主分片在本机节点,在本地执行
            if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
                performLocalAction(state, primary, node, indexMetaData);
            } else {
                // 转发到主分片所在节点
                performRemoteAction(state, primary, node);
            }
        }

    }
            
}

主分片节点流程

主分片所在的节点收到协调节点发送的请求后,开始执行写入操作,入口在ReplicationOperation中的execute方法中,写入成功后转发到副本分片,等待响应并回复协调节点。

  • 主分片在收到协调节点发送的请求后首先也做校验工作,检验是否是主分片,索引是否处于关闭状态等
  • 判断请求是否需要延迟执行
  • 判断主分片是否已经发生迁移
  • 检测活跃的shard数量是否足够,只要主分片可用就执行写入
  • 具体写入的入口在ReplicationOperation中的execute方法
  • 接下来进入TransportShardBulkAction的performOnPrimary方法(中间省略了一些步骤),继续省略一些调用过程,最后进入到InternalEngine的index方法,在这里将数据写入Lucene
  • 写入Lucene后将写入操作添加到translog,如果Lucene写入失败,需要对translog进行回滚
  • 根据配置的translog flush策略进行刷盘控制
  • 主分片写入完毕后转发给副本分片,副本分片执行写入操作,主分片等待副本分片的响应
  • 收到副本分片的全部响应后,执行finish,向协调节点返回消息,告之成功与失败的操作
  • 副本分片写入失败时,主分片所在节点将发送一个shardFaild请求给Master

ReplicationOperation

ReplicationOperation的execute方法是主分片写入操作的入口:

public class ReplicationOperation<
            Request extends ReplicationRequest<Request>,
            ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
            PrimaryResultT extends ReplicationOperation.PrimaryResult<ReplicaRequest>
        > {
    
    // 主分片写入操作入口
    public void execute() throws Exception {
        // 检查活跃分片数量
        final String activeShardCountFailure = checkActiveShardCount();
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (activeShardCountFailure != null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
            return;
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
        // 开始执行写入操作
        primaryResult = primary.perform(request);
        // 更新分片的check point
        primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
        // 获取副本请求
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
            }
            // 全局check point
            final long globalCheckpoint = primary.globalCheckpoint();
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
            // 转发请求到副本分片
            performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
        }

        successfulShards.incrementAndGet();  // mark primary as successful
        decPendingAndFinishIfNeeded();
    }
}

TransportShardBulkAction

TransportShardBulkAction的performOnPrimary方法中主分片执行写入操作:

public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
     @Override
    public WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(
            BulkShardRequest request, IndexShard primary) throws Exception {
        return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer());
    }
    
    // 主分片写入操作
    public static WritePrimaryResult<BulkShardRequest, BulkShardResponse> performOnPrimary(
            BulkShardRequest request,
            IndexShard primary,
            UpdateHelper updateHelper,
            LongSupplier nowInMillisSupplier,
            MappingUpdatePerformer mappingUpdater) throws Exception {
        // 获取索引元数据
        final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
        Translog.Location location = null;
        // 遍历请求
        for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
            if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) {
                // 执行具体的写入请求
                location = executeBulkItemRequest(metaData, primary, request, location, requestIndex,
                    updateHelper, nowInMillisSupplier, mappingUpdater);
            }
        }
        // 创建响应
        BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
        BulkItemRequest[] items = request.items();
        for (int i = 0; i < items.length; i++) {
            responses[i] = items[i].getPrimaryResponse();
        }
        BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
        return new WritePrimaryResult<>(request, response, location, null, primary, logger);
    }
}

由于方法调用栈比较深,省略中间方法,直接看InternalEngine的index方法,省略的流程如下:

executeBulkItemRequest(TransportShardBulkAction)—> executeIndexRequest(TransportShardBulkAction)—> executeIndexRequestOnPrimary(TransportShardBulkAction)—> applyIndexOperationOnPrimary(IndexShard)—> applyIndexOperation(IndexShard)—> index(IndexShard)—> index(InternalEngine)

InternalEngine

InternalEngine的index方法将数据写入Luence,之后将写入操作添加到translog:

public class InternalEngine extends Engine {
     @Override
    public IndexResult index(Index index) throws IOException {
        assert Objects.equals(index.uid().field(), uidField) : index.uid().field();
        final boolean doThrottle = index.origin().isRecovery() == false;
        try (ReleasableLock releasableLock = readLock.acquire()) {
            ensureOpen();
            assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
            assert assertVersionType(index);
            try (Releasable ignored = acquireLock(index.uid());
            
                ......
                
                final IndexResult indexResult;
                if (plan.earlyResultOnPreFlightError.isPresent()) {
                    indexResult = plan.earlyResultOnPreFlightError.get();
                    assert indexResult.hasFailure();
                } else if (plan.indexIntoLucene) {
                    // 调用Lucene的写入接口将文档写入Lucene
                    indexResult = indexIntoLucene(index, plan);
                } else {
                    indexResult = new IndexResult(
                            plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
                }
                if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
                    final Translog.Location location;
                    if (indexResult.hasFailure() == false) {
                        // 将整个写入操作加入到translog
                        location = translog.add(new Translog.Index(index, indexResult));
                    } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                        location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
                    } else {
                        location = null;
                    }
                    indexResult.setTranslogLocation(location);
                }
                ......
                
                return indexResult;
            }
        } catch (RuntimeException | IOException e) {
            try {
                maybeFailEngine("index", e);
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw e;
        }
    }
}

GlobalCheckpointSyncAction

GlobalCheckpointSyncAction的maybeSyncTranslog方法进行刷盘控制:

public class GlobalCheckpointSyncAction extends TransportReplicationAction<
        GlobalCheckpointSyncAction.Request,
        GlobalCheckpointSyncAction.Request,
        ReplicationResponse> {
        
   private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
        final Translog translog = indexShard.getTranslog();
        if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
                translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
            indexShard.getTranslog().sync();
        }
    }
}

副本分片节点流程

副本分片的写入流程与主分片基本相同,写入完毕后向主分片发送响应。

参考:

Elasticsearch源码解析与优化实战【张超】

Elasticsearch源码分析-写入解析

Elasticsearch分布式一致性原理剖析(三)-Data篇

请登录后发表评论

    没有回复内容