Elasticsearch写入分为单个文档的写入和批量写入,单个文档的写入称作Index请求,批量写入称为Bulk请求,他们具有相同的处理逻辑,请求被统一封装为BuklRequest。
单个文档的写入流程:
1.客户端向Node1发送写入请求
2.Node1使用文档的id确定文档属于分片0,因为分片0的主分片不在当前节点,此时Node1充当协调节点,通过集群状态中的路由表可知分片0的主分片位于Node3,请求将会被转发到Node3
3.Node3上的主分片执行写入操作(需要有足够的活跃分片数),如果写入成功,将请求转发到Node1和Node2上,因为这两个节点都有分片0的副本分片,Node3等待所有副本分片的响应结果,当所有的分片都写入成功时,Node3向协调节点发送响应,再由协调节点Node1向客户端报告写入成功
1.活跃的分片数
public final Request waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return (Request) this;
}
ES写入一致性的默认策略是quorum,即多数的分片在写入操作时处于可用状态,计算方式如下: quorum = int((primary + number_of_replicas)/ 2)+ 1
primary是主分片数量,number_of_replicas是副本分片数量,假设一个索引,每个主分片有3个副本分片,那么quorum =(1+3)/2+1 = 3,往这个索引的主分片写入数据时,活跃的分片数至少为3才能写入成功,如果有两个副本分片失效,活跃分片数为1个主分片和1个副本分片。此时活跃分片数为2,那么此时就不允许向此分片写入数据。
可以通过配置文件index.write.wait_for_active_shards设置活跃分片数,也可以在请求中添加参数,默认是1,只要主分片可以即可写入。
详细信息可参考官方文档:
2.路由算法
一般情况下,路由计算方式如下:
shard_num = hash(_routing) % num_primary_shards
_routing:默认情况下就是文档的id
num_primary_shards:主分片的数量
协调节点流程
- 参数检查,遇到异常拒绝当前请求
- 写入时,如果索引未创建,自动创建对应的index,具体在TransportBulkAction的doExecute方法中
- 协调节点开始处理请求,入口为TransportBulkAction的executeBulk方法,之后进入TransportBulkAction的内部类BulkOperation中的doRun方法
- doRun方法中,检查集群状态,如果集群有异常如Master节点不存在,写入请求会阻塞等待Master节点.
- 从集群中获取集群的元数据信息metaData.
- 遍历bulkRequest,从请求中获取每一个索引concreteIndex,判断请求类型,如果是写入操作,将请求转为IndexRequest,根据concreteIndex从metaData中获取该索引的元数据,之后获取索引的Mapping、创建版本号,并检查Mapping、ID等信息,如果ID为空将自动生成ID.
- 再次遍历bulkRequest,将每一个请求重新封装为基于shard的请求列表,这么做是为了将路由到同一个分片的所有请求封装到同一个Request中。在这个过程中,会使用路由算法计算文档应该存储到哪个分片上,得到分片的ID,然后将写入到同一个分片的请求合并到同一个shardRequests中.
- 遍历requestsByShard,里面记录了每个分片对应的所有写入请求,然后将分片请求封装为BulkShardRequest,等待有足够活跃的分片数,之后向分片执行请求,并在listener中等待响应,每个响应也是以shard为单位的,响应信息被设置到Response中.
- 向分片转发请求具体的实现位于TransportReplicationAction中,转发前同样获取最新的集群状态,根据集群状态中的内容路由表找到目的shard对应主分片所在的节点,如果主分片在当前节点中,直接在本地执行,否则转发到相应的节点中执行.
TransportBulkAction
TransportBulkAction中BulkOperation的doRun方法解析用户请求:
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
......
if (needToCheck()) {
// 获取请求中所有的索引
final Set<String> indices = bulkRequest.requests.stream()
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Set<String> autoCreateIndices = new HashSet<>();
ClusterState state = clusterService.state();
// 遍历所有的索引
for (String index : indices) {
boolean shouldAutoCreate;
try {
// 是否自动创建索引
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
// 自动创建索引
autoCreateIndices.add(index);
}
}
......
} else {
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}
}
private final class BulkOperation extends AbstractRunnable {
// 处理请求
protected void doRun() throws Exception {
// 检查集群状态,如果master节点不存在,会阻塞等待Master节点,甚至超时
final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
// 获取集群元数据
MetaData metaData = clusterState.metaData();
// 遍历请求
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);
//the request can only be null because we set it to null in the previous step, so it gets ignored
if (docWriteRequest == null) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {
continue;
}
// 获取索引信息
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
try {
// 判断操作类型
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX://写入操作
// 转为IndexReqeust
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
// 获取索引的元数据
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
// 获取mapping的元数据
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
// 获取创建版本号
Version indexCreated = indexMetaData.getCreationVersion();
// 处理路由
indexRequest.resolveRouting(metaData);
// 检查mapping、id等信息
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
case UPDATE://更新操作
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
break;
case DELETE://删除操作
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
}
}
// 将用户的请求重新封装为基于shard的请求列表
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
// 获取索引
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
// 根据路由算法计算文档应该存储到哪个分片上,得到分片的ID
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
// 根据分片id判断是否已经存入到requestsByShard中,如果存在就取出放入shardRequests,如果不存在将当前的分片ID作为Key,创建一个空的ArrayList作为value
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
// 当前封装的请求添加到shardRequests中,为了将路由到同一个分片的文档封装到一个shardRequest中
shardRequests.add(new BulkItemRequest(i, request));
}
// 如果requestsByShard为空
if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
return;
}
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
// 当前节点的id
String nodeId = clusterService.localNode().getId();
// 遍历requestsByShard
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
// 获取分片ID
final ShardId shardId = entry.getKey();
// 获取该分片ID对应的所有请求
final List<BulkItemRequest> requests = entry.getValue();
// 将请求封装为BulkShardRequest
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
// 等待有足够活跃的分片数
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
// 向分片执行请求,在listener中等待响应
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest docWriteRequest = request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
}
});
}
}
}
}
TransportReplicationAction
TransportReplicationAction的内部类ReroutePhase中的doRun方法向分片转发请求:
public abstract class TransportReplicationAction<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse
> extends TransportAction<Request, Response> {
final class ReroutePhase extends AbstractRunnable {
@Override
protected void doRun() {
setPhase(task, "routing");
// 获取集群状态
final ClusterState state = observer.setAndGetObservedState();
if (handleBlockExceptions(state)) {
return;
}
// 获取请求中的索引
final String concreteIndex = concreteIndex(state);
// 获取索引元数据
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
// 如果元数据为空
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}
// 等待足够活跃的分片数
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
// 获取主分片所在节点
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
// 如果主分片在本机节点,在本地执行
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
// 转发到主分片所在节点
performRemoteAction(state, primary, node);
}
}
}
}
主分片节点流程
主分片所在的节点收到协调节点发送的请求后,开始执行写入操作,入口在ReplicationOperation中的execute方法中,写入成功后转发到副本分片,等待响应并回复协调节点。
- 主分片在收到协调节点发送的请求后首先也做校验工作,检验是否是主分片,索引是否处于关闭状态等
- 判断请求是否需要延迟执行
- 判断主分片是否已经发生迁移
- 检测活跃的shard数量是否足够,只要主分片可用就执行写入
- 具体写入的入口在ReplicationOperation中的execute方法
- 接下来进入TransportShardBulkAction的performOnPrimary方法(中间省略了一些步骤),继续省略一些调用过程,最后进入到InternalEngine的index方法,在这里将数据写入Lucene
- 写入Lucene后将写入操作添加到translog,如果Lucene写入失败,需要对translog进行回滚
- 根据配置的translog flush策略进行刷盘控制
- 主分片写入完毕后转发给副本分片,副本分片执行写入操作,主分片等待副本分片的响应
- 收到副本分片的全部响应后,执行finish,向协调节点返回消息,告之成功与失败的操作
- 副本分片写入失败时,主分片所在节点将发送一个shardFaild请求给Master
ReplicationOperation
ReplicationOperation的execute方法是主分片写入操作的入口:
public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
PrimaryResultT extends ReplicationOperation.PrimaryResult<ReplicaRequest>
> {
// 主分片写入操作入口
public void execute() throws Exception {
// 检查活跃分片数量
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
// 开始执行写入操作
primaryResult = primary.perform(request);
// 更新分片的check point
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
// 获取副本请求
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
// 全局check point
final long globalCheckpoint = primary.globalCheckpoint();
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
// 转发请求到副本分片
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
}
successfulShards.incrementAndGet(); // mark primary as successful
decPendingAndFinishIfNeeded();
}
}
TransportShardBulkAction
TransportShardBulkAction的performOnPrimary方法中主分片执行写入操作:
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
@Override
public WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(
BulkShardRequest request, IndexShard primary) throws Exception {
return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer());
}
// 主分片写入操作
public static WritePrimaryResult<BulkShardRequest, BulkShardResponse> performOnPrimary(
BulkShardRequest request,
IndexShard primary,
UpdateHelper updateHelper,
LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater) throws Exception {
// 获取索引元数据
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
Translog.Location location = null;
// 遍历请求
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) {
// 执行具体的写入请求
location = executeBulkItemRequest(metaData, primary, request, location, requestIndex,
updateHelper, nowInMillisSupplier, mappingUpdater);
}
}
// 创建响应
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
return new WritePrimaryResult<>(request, response, location, null, primary, logger);
}
}
由于方法调用栈比较深,省略中间方法,直接看InternalEngine的index方法,省略的流程如下:
executeBulkItemRequest(TransportShardBulkAction)—> executeIndexRequest(TransportShardBulkAction)—> executeIndexRequestOnPrimary(TransportShardBulkAction)—> applyIndexOperationOnPrimary(IndexShard)—> applyIndexOperation(IndexShard)—> index(IndexShard)—> index(InternalEngine)
InternalEngine
InternalEngine的index方法将数据写入Luence,之后将写入操作添加到translog:
public class InternalEngine extends Engine {
@Override
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), uidField) : index.uid().field();
final boolean doThrottle = index.origin().isRecovery() == false;
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
assert assertVersionType(index);
try (Releasable ignored = acquireLock(index.uid());
......
final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.hasFailure();
} else if (plan.indexIntoLucene) {
// 调用Lucene的写入接口将文档写入Lucene
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (indexResult.hasFailure() == false) {
// 将整个写入操作加入到translog
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
......
return indexResult;
}
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
}
GlobalCheckpointSyncAction
GlobalCheckpointSyncAction的maybeSyncTranslog方法进行刷盘控制:
public class GlobalCheckpointSyncAction extends TransportReplicationAction<
GlobalCheckpointSyncAction.Request,
GlobalCheckpointSyncAction.Request,
ReplicationResponse> {
private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
final Translog translog = indexShard.getTranslog();
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.getTranslog().sync();
}
}
}
副本分片节点流程
副本分片的写入流程与主分片基本相同,写入完毕后向主分片发送响应。
参考:
Elasticsearch源码解析与优化实战【张超】
没有回复内容