Elasticsearch中Discovery模块负责发现集群中的节点以及Master的选举,其默认的实现称为Zen Discovery。
在Elasticsearch的配置文件中,有一项为node.master,如果将该配置设为true,该节点即可参与Master选举,获得被选举为Master节点的资格。
Master选举算法
(1)Paxos算法
Paxos算法在分布式系统中是一种比较著名的选举算法,并且非常强大,但是它实现比较复杂,这里不过多讲解。
(2)Bully算法
Bully算法假设集群中所有的节点都有一个唯一的ID,通过对ID进行排序,选取ID最大的节点作为Master节点。
brain-split问题
一个集群建立起之后会选出一个master,负责管理整个集群,当master负载比较大时或者产生网络分区时,导致其他节点可能认为master节点已失效,从而选举新的节点,出现多个master的情况,这就是brain-split问题。 ES在选举master时,获得的投票数必须要达到quorum也就是参选人数需要过半,才能确认Master,quorum的数量可以在配置文件中配置discovery.zen.minimum_master_nodes,一般配置数量为集群中具有master资格的节点数除以2加1:
discovery.zen.minimum_master_nodes: 1
Elasticsearch Master选举流程
Elasticsearch基于Bully算法,选举流程如下:
1.Ping所有的节点,选举临时的Master
- fullPingResponses
选举过程的实现位于ZenDiscovery类的findMaster方法中,在该方法中首先Ping集群中的所有节点,得到返回结果fullPingResponses,fullPingResponses是由集群中的节点组成的列表,但是不包含当前的节点,当前节点单独被添加到fullPingResponses中。接着,将discovery.zen.master_election.ignore_non_master_pings为true并且不具备Master资格的节点过滤掉,并放入pingResponses中。
- activeMasters
activeMasters存储当前活跃的Master列表,它是通过遍历pingResponses,将每个节点认为的Master节点(并且不是当前节点)加入到activeMasters列表中。
- masterCandidates
masterCandidates存储Master候选者列表,也就是具有Master资格的节点,它也是通过遍历pingResponses,判断每个节点是否具有Master资格得到的候选者列表。
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
// 如果master为空
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
// 选举临时的Master
masterNode = findMaster();
}
......
}
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
//ping所有的节点,得到节点列表,列表中不包含当前节点
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
if (fullPingResponses.size() == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace("full ping responses:{}", sb);
}
// 获取当前节点
final DiscoveryNode localNode = transportService.getLocalNode();
// add our selves
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
.filter(n -> n.equals(localNode)).findAny().isPresent() == false;
// 将当前节点加入节点列表
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
// 过滤节点,将discovery.zen.master_election.ignore_non_master_pings为true并且不具备Master资格的节点过滤掉
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
// activeMasters存储当前活跃的Master列表
List<DiscoveryNode> activeMasters = new ArrayList<>();
// 遍历ping后得到的节点列表
for (ZenPing.PingResponse pingResponse : pingResponses) {
// 如果每个节点选举出的Master节点不为空并且不是当前节点
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
// 将推举出的master节点加入到activeMasters中
activeMasters.add(pingResponse.master());
}
}
// masterCandidates存储Master候选者列表
List<ElectMastermasterCandidatesService.MasterCandidate> masterCandidates = new ArrayList<>();
// 再次遍历集群中的节点列表
for (ZenPing.PingResponse pingResponse : pingResponses) {
// 如果节点具有Master资格
if (pingResponse.node().isMasterNode()) {
// 加入到masterCandidates候选者列表中
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
// 如果当前活跃的Master列表为空
if (activeMasters.isEmpty()) {
// 从候选者Master列表中判断是否达到法定人数
if (electMaster.hasEnoughCandidates(masterCandidates)) {
// 选举出Master节点
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
// 返回Master节点
return winner.getNode();
} else {
// 如果没有足够的候选者,返回空
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
} else {
// 判断当前节点是否在活跃的Master列表中
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// 选举Master节点
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
}
具体选举过程
首先判断activeMasters是否为空,如果不为空,从activeMasters选举。如果为空,判断masterCandidates中候选者是否达到了法定人数 quorum,如果达到,从中选举Master节点,如果未到达,重新获取节点。
(1)从masterCandidates选举
具体的实现在ElectMasterService中:
判断是否到达法定人数
- 判断masterCandidates是否为空,为空返回false
- 判断discovery.zen.minimum_master_nodes配置的值(默认值为-1)是否小于1,确保单节点情况下的正常选主
- 判断masterCandidates中具备master资格的节点数据是否大于等于minimum_master_nodes
选举临时Master
- 首先对masterCandidates中的节点进行排序,优先使用版本号,版本号最高的排在前面,如果版本号一致,再跟进节点ID进行排序,节点ID小的排在前面
- 返回排序后最前面的节点,也就是第一个节点作为Master节点
public class ElectMasterService extends AbstractComponent {
// 是否达到法定人数
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
// 如果为空返回false
if (candidates.isEmpty()) {
return false;
}
// minimumMasterNodes默认值为-1,确保单节点情况下正常选主
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
// 判断是否达到法定人数
return candidates.size() >= minimumMasterNodes;
}
// 选举Master
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
// 对节点进行排序
sortedCandidates.sort(MasterCandidate::compare);
// 获取排序后的第一个节点作为Master节点
return sortedCandidates.get(0);
}
// 内部类,自定义比较函数在此类中实现
public static class MasterCandidate {
public static final long UNRECOVERED_CLUSTER_VERSION = -1;
final DiscoveryNode node;
final long clusterStateVersion;
public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {
Objects.requireNonNull(node);
assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;
assert node.isMasterNode();
this.node = node;
this.clusterStateVersion = clusterStateVersion;
}
public DiscoveryNode getNode() {
return node;
}
public long getClusterStateVersion() {
return clusterStateVersion;
}
@Override
public String toString() {
return "Candidate{" +
"node=" + node +
", clusterStateVersion=" + clusterStateVersion +
'}';
}
/**
* 自定义比较函数,早期版本中对节点ID排序,现在优先使集群状态版本号排序,版本号高的放在前面,版本号一致的再对比节点ID,节点ID小的排前面
*/
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// 比较版本号,注意这里c2在前
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
// 如果版本号一致比较节点的ID
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
}
/**
* 根据节点ID比较大小
*/
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
// 判断是否具有master节点资格是为了给其他函数调用的,masterCandidates中的节点都已经具备了master节点资格
// 如果o1具备Master节点资格,o2不具备,返回-1,也就是o1排前面
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
// 如果o1不具备master节点资格而o2有,返回1,o2排前面
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
// 比较节点的ID
return o1.getId().compareTo(o2.getId());
}
}
(2)从activeMasters选举
从activeMasters选举的过程比较简单,具体的实现也在ElectMasterService中。首先通过compareNodes方法对节点ID排序,然后取节点ID最小的作为临时的Master。
public class ElectMasterService extends AbstractComponent {
public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
// 取activeMasters中节点ID最小的作为Master,同样使用compareNodes进行排序
return activeMasters.stream().min(ElectMasterService::compareNodes).get();
}
/**
* 根据节点ID比较大小
*/
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
// 判断是否具有master节点资格是为了给其他函数调用的,masterCandidates中的节点都已经具备了master节点资格
// 如果o1具备Master节点资格,o2不具备,返回-1,也就是o1排前面
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
// 如果o1不具备master节点资格而o2有,返回1,o2排前面
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
// 比较节点的ID
return o1.getId().compareTo(o2.getId());
}
}
2.确立Master
上一步选举出的临时的Master有两种情况,临时Master就是当前节点或者临时Master不是当前节点。 在ES中,向节点发送join请求就是发送投票,被发送请求的节点将会得到一票。
(1)临时Master节点为当前节点
- 等待足够多的具有Master资格的节点加入本节点,达到法定人数的投票数量时完成选主
- 如果等待超时后还没有满足法定数量,选举失败,将会进行新一轮的选举
- 如果选主成功,将会发布新的cluster state version
*ZenDiscovery*
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
// 如果master为空
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
// 选举临时的Master
masterNode = findMaster();
}
if (!joinThreadControl.joinThreadActive(currentThread)) {
logger.trace("thread is no longer in currentJoinThread. Stopping.");
return;
}
// 如果选举出的Master节点是当前节点
if (transportService.getLocalNode().equals(masterNode)) {
// 因为选举出的Master节点是当前节点,所以minimum_master_nodes数量-1,得到被选举为Master需要的最少节点数量,也就是法定人数
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1);
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
// 根据法定人数等待其他节点的加入(投票),等待完成选举
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {// 回调函数
@Override
public void onElectedAsMaster(ClusterState state) {
synchronized (stateMutex) {
// 选举完成 joinThreadControl.markThreadAsDone(currentThread);
}
}
@Override
public void onFailure(Throwable t) {// 如果选举失败
logger.trace("failed while waiting for nodes to join, rejoining", t);
synchronized (stateMutex) {
// 重新选举 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
);
} else {
......
}
}
}
(2)临时Master节点不是当前节点
- 当前节点停止接收其他节点的join请求
- 最终当选的Master会先发布集群状态,之后确认其他节点的join请求。
- 当前节点向Master节点发送join请求,并且等待Master的回复(默认为1分钟),如果失败重新发送join请求(默认重试3次),如果回复成功,从集群中获取Master节点,判断与临时Master是否一致,如果不一致重新选举。
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
private void innerJoinCluster() {
......
// 如果选举出的Master节点是当前节点
if (transportService.getLocalNode().equals(masterNode)) {
......
} else { // 选举出的Master节点不是当前节点
// 停止其他节点的join请求
nodeJoinController.stopElectionContext(masterNode + " elected");
// 因为当前节点不是当选的临时master节点,临时Master节点当做最终的Master节点,向Master节点发出join请求
final boolean success = joinElectedMaster(masterNode);
synchronized (stateMutex) {
if (success) {// 如果成功
// 从集群中获取Master节点
DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
if (currentMasterNode == null) {
// 如果master为空,重新选举
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
} else if (currentMasterNode.equals(masterNode) == false) { //如果当选的master节点不是之前选出的临时Master节点
// 停止当前线程并且重新join joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
}
//选举
joinThreadControl.markThreadAsDone(currentThread);
} else {
// 如果请求失败,重新发出join请求
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
}
}
投票与得票的实现
在确定Master节点的时候,如果推选的临时Master为当前节点,有一步是调用waitToBeElectedAsMaster方法,等待当前节点被推举为真正的Master节点,借助waitToBeElectedAsMaster方法看一下投票与得票的实现。
NodeJoinController
public class NodeJoinController extends AbstractComponent {
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
// 创建一个计数器
final CountDownLatch done = new CountDownLatch(1);
// 创建回调函数
final ElectionCallback wrapperCallback = new ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
// 选举成功,计数器减1
done.countDown();
callback.onElectedAsMaster(state);
}
@Override
public void onFailure(Throwable t) {
// 如果选举失败,计数器减1
done.countDown();
callback.onFailure(t);
}
};
ElectionContext myElectionContext = null;
try {
synchronized (this) {
// 判断ElectionContext是否为空
assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
myElectionContext = electionContext;
// 设置回调函数和法定人数,为选举做准备
electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
// 这里判断是否到达法定人数,如果达到,被选为Master
checkPendingJoinsAndElectIfNeeded();
}
try {
// 选主成功或失败都会将计数器减1,这里等待选举结束(设置了超时时间,如果在这段时间内选举还没有结束,放弃等待)
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
// callback handles everything
return;
}
} catch (InterruptedException e) {
}
if (logger.isTraceEnabled()) {
// 再次收集投票数量
final int pendingNodes = myElectionContext.getPendingMasterJoinsCount();
logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes);
}
// 停止选举
failContextIfNeeded(myElectionContext, "timed out waiting to be elected");
} catch (Exception e) {
logger.error("unexpected failure while waiting for incoming joins", e);
if (myElectionContext != null) {
failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + e.getMessage() + "]");
}
}
}
/**
* 检查发送请求的数量(投票数)是否达到成为Master的条件
*/
private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert electionContext != null : "election check requested but no active context";
// 获取投票数
final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
// 判断投票数是否达到法定人数
if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
if (logger.isTraceEnabled()) {
logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
}
} else {// 如果达到了法定人数
if (logger.isTraceEnabled()) {
logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
}
// 成为Master节点
electionContext.closeAndBecomeMaster();
electionContext = null; // clear this out so future joins won't be accumulated
}
}
// 选举为Master
public synchronized void closeAndBecomeMaster() {
assert callback != null : "becoming a master but the callback is not yet set";
assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master but pending joins of "
+ getPendingMasterJoinsCount() + " are not enough. needs [" + requiredMasterJoins + "];";
innerClose();
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";
tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
// 这里更新cluster state version
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}
// 是否达到法定人数
public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) {
final boolean hasEnough;
// 如果法定人数小于0
if (requiredMasterJoins < 0) {
hasEnough = false;
} else {
assert callback != null : "requiredMasterJoins is set but not the callback";
// 判断投票数是否达到法定人数
hasEnough = pendingMasterJoins >= requiredMasterJoins;
}
return hasEnough;
}
class ElectionContext {
private ElectionCallback callback = null;
private int requiredMasterJoins = -1;
private final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> joinRequestAccumulator = new HashMap<>();
final AtomicBoolean closed = new AtomicBoolean();
public synchronized void onAttemptToBeElected(int requiredMasterJoins, ElectionCallback callback) {
ensureOpen();
assert this.requiredMasterJoins < 0;
assert this.callback == null;
this.requiredMasterJoins = requiredMasterJoins;
this.callback = callback;
}
......
// 获取投票数
public synchronized int getPendingMasterJoinsCount() {
int pendingMasterJoins = 0;
// 节点收到的join连接被存储在joinRequestAccumulator,遍历joinRequestAccumulator.
for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
// 如果发送请求的节点具有master节点资格
if (node.isMasterNode()) {
// 投票数+1
pendingMasterJoins++;
}
}
return pendingMasterJoins;
}
......
}
}
参考:
Elasticsearch源码解析与优化实战【张超】
没有回复内容