【Elasticsearch】Master选举流程

Elasticsearch中Discovery模块负责发现集群中的节点以及Master的选举,其默认的实现称为Zen Discovery。

在Elasticsearch的配置文件中,有一项为node.master,如果将该配置设为true,该节点即可参与Master选举,获得被选举为Master节点的资格。

Master选举算法

(1)Paxos算法

Paxos算法在分布式系统中是一种比较著名的选举算法,并且非常强大,但是它实现比较复杂,这里不过多讲解。

(2)Bully算法

Bully算法假设集群中所有的节点都有一个唯一的ID,通过对ID进行排序,选取ID最大的节点作为Master节点。

brain-split问题

一个集群建立起之后会选出一个master,负责管理整个集群,当master负载比较大时或者产生网络分区时,导致其他节点可能认为master节点已失效,从而选举新的节点,出现多个master的情况,这就是brain-split问题。 ES在选举master时,获得的投票数必须要达到quorum也就是参选人数需要过半,才能确认Master,quorum的数量可以在配置文件中配置discovery.zen.minimum_master_nodes,一般配置数量为集群中具有master资格的节点数除以2加1:

discovery.zen.minimum_master_nodes: 1

Elasticsearch Master选举流程

Elasticsearch基于Bully算法,选举流程如下:

1.Ping所有的节点,选举临时的Master

  • fullPingResponses

选举过程的实现位于ZenDiscovery类的findMaster方法中,在该方法中首先Ping集群中的所有节点,得到返回结果fullPingResponses,fullPingResponses是由集群中的节点组成的列表,但是不包含当前的节点,当前节点单独被添加到fullPingResponses中。接着,将discovery.zen.master_election.ignore_non_master_pings为true并且不具备Master资格的节点过滤掉,并放入pingResponses中。

  • activeMasters

activeMasters存储当前活跃的Master列表,它是通过遍历pingResponses,将每个节点认为的Master节点(并且不是当前节点)加入到activeMasters列表中。

  • masterCandidates

masterCandidates存储Master候选者列表,也就是具有Master资格的节点,它也是通过遍历pingResponses,判断每个节点是否具有Master资格得到的候选者列表。

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

    private void innerJoinCluster() {
        DiscoveryNode masterNode = null;
        final Thread currentThread = Thread.currentThread();
        nodeJoinController.startElectionContext();
        // 如果master为空
        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
            // 选举临时的Master
            masterNode = findMaster();
        }
        ......
    }

    private DiscoveryNode findMaster() {
        logger.trace("starting to ping");
        //ping所有的节点,得到节点列表,列表中不包含当前节点
        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
        if (fullPingResponses == null) {
            logger.trace("No full ping responses");
            return null;
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            if (fullPingResponses.size() == 0) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.trace("full ping responses:{}", sb);
        }
        // 获取当前节点
        final DiscoveryNode localNode = transportService.getLocalNode();

        // add our selves
        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;
        // 将当前节点加入节点列表
        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));

        // 过滤节点,将discovery.zen.master_election.ignore_non_master_pings为true并且不具备Master资格的节点过滤掉
        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
        // activeMasters存储当前活跃的Master列表
        List<DiscoveryNode> activeMasters = new ArrayList<>();
        // 遍历ping后得到的节点列表
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // 如果每个节点选举出的Master节点不为空并且不是当前节点
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                // 将推举出的master节点加入到activeMasters中
                activeMasters.add(pingResponse.master());
            }
        }

        // masterCandidates存储Master候选者列表
        List<ElectMastermasterCandidatesService.MasterCandidate> masterCandidates = new ArrayList<>();
        // 再次遍历集群中的节点列表
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // 如果节点具有Master资格
            if (pingResponse.node().isMasterNode()) {
                // 加入到masterCandidates候选者列表中
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }
        // 如果当前活跃的Master列表为空
        if (activeMasters.isEmpty()) {
            // 从候选者Master列表中判断是否达到法定人数
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                // 选举出Master节点
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                // 返回Master节点
                return winner.getNode();
            } else {
                // 如果没有足够的候选者,返回空
                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                            masterCandidates, electMaster.minimumMasterNodes());
                return null;
            }
        } else {
            // 判断当前节点是否在活跃的Master列表中
            assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
            // 选举Master节点
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
    }
}
具体选举过程

首先判断activeMasters是否为空,如果不为空,从activeMasters选举。如果为空,判断masterCandidates中候选者是否达到了法定人数 quorum,如果达到,从中选举Master节点,如果未到达,重新获取节点。

(1)从masterCandidates选举

具体的实现在ElectMasterService中:

判断是否到达法定人数

  • 判断masterCandidates是否为空,为空返回false
  • 判断discovery.zen.minimum_master_nodes配置的值(默认值为-1)是否小于1,确保单节点情况下的正常选主
  • 判断masterCandidates中具备master资格的节点数据是否大于等于minimum_master_nodes

选举临时Master

  • 首先对masterCandidates中的节点进行排序,优先使用版本号,版本号最高的排在前面,如果版本号一致,再跟进节点ID进行排序,节点ID小的排在前面
  • 返回排序后最前面的节点,也就是第一个节点作为Master节点
public class ElectMasterService extends AbstractComponent {
    
    // 是否达到法定人数
    public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
        // 如果为空返回false
        if (candidates.isEmpty()) {
            return false;
        }
        // minimumMasterNodes默认值为-1,确保单节点情况下正常选主
        if (minimumMasterNodes < 1) {
            return true;
        }
        assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
            "duplicates ahead: " + candidates;
        // 判断是否达到法定人数
        return candidates.size() >= minimumMasterNodes;
    }
    
    // 选举Master
    public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
        assert hasEnoughCandidates(candidates);
        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
        // 对节点进行排序
        sortedCandidates.sort(MasterCandidate::compare);
        // 获取排序后的第一个节点作为Master节点
        return sortedCandidates.get(0);
    }
    
    // 内部类,自定义比较函数在此类中实现
     public static class MasterCandidate {

        public static final long UNRECOVERED_CLUSTER_VERSION = -1;

        final DiscoveryNode node;

        final long clusterStateVersion;

        public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {
            Objects.requireNonNull(node);
            assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;
            assert node.isMasterNode();
            this.node = node;
            this.clusterStateVersion = clusterStateVersion;
        }

        public DiscoveryNode getNode() {
            return node;
        }

        public long getClusterStateVersion() {
            return clusterStateVersion;
        }

        @Override
        public String toString() {
            return "Candidate{" +
                "node=" + node +
                ", clusterStateVersion=" + clusterStateVersion +
                '}';
        }

        /**
         * 自定义比较函数,早期版本中对节点ID排序,现在优先使集群状态版本号排序,版本号高的放在前面,版本号一致的再对比节点ID,节点ID小的排前面
         */
        public static int compare(MasterCandidate c1, MasterCandidate c2) {
            // 比较版本号,注意这里c2在前
            int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
            if (ret == 0) {
                // 如果版本号一致比较节点的ID
                ret = compareNodes(c1.getNode(), c2.getNode());
            }
            return ret;
        }
    }
    
    /** 
     *  根据节点ID比较大小
     */
     private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
        // 判断是否具有master节点资格是为了给其他函数调用的,masterCandidates中的节点都已经具备了master节点资格
        // 如果o1具备Master节点资格,o2不具备,返回-1,也就是o1排前面
        if (o1.isMasterNode() && !o2.isMasterNode()) {
            return -1;
        }
        // 如果o1不具备master节点资格而o2有,返回1,o2排前面
        if (!o1.isMasterNode() && o2.isMasterNode()) {
            return 1;
        }
        // 比较节点的ID
        return o1.getId().compareTo(o2.getId());
    }
}

(2)从activeMasters选举

从activeMasters选举的过程比较简单,具体的实现也在ElectMasterService中。首先通过compareNodes方法对节点ID排序,然后取节点ID最小的作为临时的Master。

public class ElectMasterService extends AbstractComponent {
    public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
        // 取activeMasters中节点ID最小的作为Master,同样使用compareNodes进行排序
        return activeMasters.stream().min(ElectMasterService::compareNodes).get();
    }
    
     /** 
     *  根据节点ID比较大小
     */
     private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
        // 判断是否具有master节点资格是为了给其他函数调用的,masterCandidates中的节点都已经具备了master节点资格
        // 如果o1具备Master节点资格,o2不具备,返回-1,也就是o1排前面
        if (o1.isMasterNode() && !o2.isMasterNode()) {
            return -1;
        }
        // 如果o1不具备master节点资格而o2有,返回1,o2排前面
        if (!o1.isMasterNode() && o2.isMasterNode()) {
            return 1;
        }
        // 比较节点的ID
        return o1.getId().compareTo(o2.getId());
    }
}

2.确立Master

上一步选举出的临时的Master有两种情况,临时Master就是当前节点或者临时Master不是当前节点。 在ES中,向节点发送join请求就是发送投票,被发送请求的节点将会得到一票。

(1)临时Master节点为当前节点

  • 等待足够多的具有Master资格的节点加入本节点,达到法定人数的投票数量时完成选主
  • 如果等待超时后还没有满足法定数量,选举失败,将会进行新一轮的选举
  • 如果选主成功,将会发布新的cluster state version

*ZenDiscovery*

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

    private void innerJoinCluster() {
        DiscoveryNode masterNode = null;
        final Thread currentThread = Thread.currentThread();
        nodeJoinController.startElectionContext();
        // 如果master为空
        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
            // 选举临时的Master
            masterNode = findMaster();
        }

        if (!joinThreadControl.joinThreadActive(currentThread)) {
            logger.trace("thread is no longer in currentJoinThread. Stopping.");
            return;
        }
        // 如果选举出的Master节点是当前节点
        if (transportService.getLocalNode().equals(masterNode)) {
            // 因为选举出的Master节点是当前节点,所以minimum_master_nodes数量-1,得到被选举为Master需要的最少节点数量,也就是法定人数
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); 
            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
            // 根据法定人数等待其他节点的加入(投票),等待完成选举
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {// 回调函数
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            synchronized (stateMutex) {
                               // 选举完成 joinThreadControl.markThreadAsDone(currentThread);
                            }
                        }

                        @Override
                        public void onFailure(Throwable t) {// 如果选举失败
                            logger.trace("failed while waiting for nodes to join, rejoining", t);
                            synchronized (stateMutex) {
                               // 重新选举 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                            }
                        }
                    }

            );
        } else {
            ......
        }
    }
}

(2)临时Master节点不是当前节点

  • 当前节点停止接收其他节点的join请求
  • 最终当选的Master会先发布集群状态,之后确认其他节点的join请求。
  • 当前节点向Master节点发送join请求,并且等待Master的回复(默认为1分钟),如果失败重新发送join请求(默认重试3次),如果回复成功,从集群中获取Master节点,判断与临时Master是否一致,如果不一致重新选举。
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

    private void innerJoinCluster() {
    
        ......
        
        // 如果选举出的Master节点是当前节点
        if (transportService.getLocalNode().equals(masterNode)) {
           ......
           
        } else { // 选举出的Master节点不是当前节点
            // 停止其他节点的join请求
            nodeJoinController.stopElectionContext(masterNode + " elected");

            // 因为当前节点不是当选的临时master节点,临时Master节点当做最终的Master节点,向Master节点发出join请求
            final boolean success = joinElectedMaster(masterNode);

            synchronized (stateMutex) {
                if (success) {// 如果成功
                    // 从集群中获取Master节点
                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                    if (currentMasterNode == null) {
                        // 如果master为空,重新选举
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    } else if (currentMasterNode.equals(masterNode) == false) { //如果当选的master节点不是之前选出的临时Master节点
                        
                        // 停止当前线程并且重新join joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                    }
                    //选举
                    joinThreadControl.markThreadAsDone(currentThread);
                } else {
                    // 如果请求失败,重新发出join请求
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            }
        }
    }
}

投票与得票的实现

在确定Master节点的时候,如果推选的临时Master为当前节点,有一步是调用waitToBeElectedAsMaster方法,等待当前节点被推举为真正的Master节点,借助waitToBeElectedAsMaster方法看一下投票与得票的实现。

NodeJoinController

public class NodeJoinController extends AbstractComponent {

   public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
        // 创建一个计数器
        final CountDownLatch done = new CountDownLatch(1);
        // 创建回调函数
        final ElectionCallback wrapperCallback = new ElectionCallback() {
            @Override
            public void onElectedAsMaster(ClusterState state) {
                // 选举成功,计数器减1
                done.countDown();
                callback.onElectedAsMaster(state);
            }

            @Override
            public void onFailure(Throwable t) {
                // 如果选举失败,计数器减1
                done.countDown();
                callback.onFailure(t);
            }
        };

        ElectionContext myElectionContext = null;

        try {
            synchronized (this) {
                // 判断ElectionContext是否为空
                assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
                myElectionContext = electionContext;
                // 设置回调函数和法定人数,为选举做准备
                electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
                // 这里判断是否到达法定人数,如果达到,被选为Master
                checkPendingJoinsAndElectIfNeeded();
            }

            try {
                // 选主成功或失败都会将计数器减1,这里等待选举结束(设置了超时时间,如果在这段时间内选举还没有结束,放弃等待)
                if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
                    // callback handles everything
                    return;
                }
            } catch (InterruptedException e) {

            }
            if (logger.isTraceEnabled()) {
                // 再次收集投票数量
                final int pendingNodes = myElectionContext.getPendingMasterJoinsCount();
                logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes);
            }
            // 停止选举
            failContextIfNeeded(myElectionContext, "timed out waiting to be elected");
        } catch (Exception e) {
            logger.error("unexpected failure while waiting for incoming joins", e);
            if (myElectionContext != null) {
                failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + e.getMessage() + "]");
            }
        }
    }
    
    /**
     * 检查发送请求的数量(投票数)是否达到成为Master的条件
     */
    private synchronized void checkPendingJoinsAndElectIfNeeded() {
        assert electionContext != null : "election check requested but no active context";
        // 获取投票数
        final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
        // 判断投票数是否达到法定人数
        if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
            if (logger.isTraceEnabled()) {
                logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
                    electionContext.requiredMasterJoins);
            }
        } else {// 如果达到了法定人数
            if (logger.isTraceEnabled()) {
                logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
                    electionContext.requiredMasterJoins);
            }
            // 成为Master节点
            electionContext.closeAndBecomeMaster();
            electionContext = null; // clear this out so future joins won't be accumulated
        }
    }
    
    // 选举为Master
    public synchronized void closeAndBecomeMaster() {
            assert callback != null : "becoming a master but the callback is not yet set";
            assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master but pending joins of "
                + getPendingMasterJoinsCount() + " are not enough. needs [" + requiredMasterJoins + "];";

            innerClose();

            Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
            final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";

            tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
            tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
            // 这里更新cluster state version
            masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
        }
        
    // 是否达到法定人数
    public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) {
            final boolean hasEnough;
            // 如果法定人数小于0
            if (requiredMasterJoins < 0) {
                hasEnough = false;
            } else {
                assert callback != null : "requiredMasterJoins is set but not the callback";
                // 判断投票数是否达到法定人数
                hasEnough = pendingMasterJoins >= requiredMasterJoins;
            }
            return hasEnough;
        }
    
    class ElectionContext {
        private ElectionCallback callback = null;
        private int requiredMasterJoins = -1;
        private final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> joinRequestAccumulator = new HashMap<>();

        final AtomicBoolean closed = new AtomicBoolean();

        public synchronized void onAttemptToBeElected(int requiredMasterJoins, ElectionCallback callback) {
            ensureOpen();
            assert this.requiredMasterJoins < 0;
            assert this.callback == null;
            this.requiredMasterJoins = requiredMasterJoins;
            this.callback = callback;
        }

       ......

        // 获取投票数
        public synchronized int getPendingMasterJoinsCount() {
            int pendingMasterJoins = 0;
            // 节点收到的join连接被存储在joinRequestAccumulator,遍历joinRequestAccumulator.
            for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
                // 如果发送请求的节点具有master节点资格
                if (node.isMasterNode()) {
                    // 投票数+1
                    pendingMasterJoins++;
                }
            }
            return pendingMasterJoins;
        }
        
        ......
    }
}

参考:

Elasticsearch源码解析与优化实战【张超】

Elasticsearch分布式一致性原理剖析(一)-节点篇

【Elasticsearch】Master选举流程 | SHAN (shan-ml.github.io)

请登录后发表评论

    没有回复内容