[toc]

zookeeper 集群启动流程、选举流程3.5.5

1.QuorumPeerMain

Zookeeper的启动入口是QuorumPeerMainmain方法。启动时需要传入zoo.cfg文件参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(2);
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.err.println("Unable to access datadir, exiting abnormally");
System.exit(3);
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.err.println("Unable to start AdminServer, exiting abnormally");
System.exit(4);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}

protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
//解析配置文件
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}

// Start and schedule the the purge task
//定时清理日志文件
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//单机模式使用ZooKeeperServerMain
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

解析配置文件,如果是单机模式使用ZooKeeperServerMain,否则调runFromConfig。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
//初始化cnxnFactory,默认NIOServerCnxnFactory
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
//初始化secureCnxnFactory
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
//new QuorumPeer(),各种配置
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

quorumPeer.start();
//quorumPeer继承ZooKeeperThread,这里调用线程的join方法,阻塞直到线程执行结束。
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}

初始化quorumPeer并初始化各种配置,最后调用initializestart方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void initialize() throws SaslException {
// init quorum auth server & learner
if (isQuorumSaslAuthEnabled()) {
Set<String> authzHosts = new HashSet<String>();
for (QuorumServer qs : getView().values()) {
authzHosts.add(qs.hostname);
}
authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(),
quorumServerLoginContext, authzHosts);
authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(),
quorumServicePrincipal, quorumLearnerLoginContext);
} else {
authServer = new NullQuorumAuthServer();
authLearner = new NullQuorumAuthLearner();
}
}

看名字是初始化安全认证的服务,因为跟主流程无关就不细看了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//加载数据库,从snapshot恢复数据,从文件读取currentEpoch和acceptedEpoch
loadDataBase();
//启动ServerCnxnFactory
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//开启选举
startLeaderElection();
//启动线程
super.start();
}

start方法主要做的几件事情:

  1. 加载数据库,从snapshot恢复数据,从文件读取currentEpoch和acceptedEpoch。
  2. 启动ServerCnxnFactory
  3. startLeaderElection开启选举。
  4. 启动线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
synchronized public void startLeaderElection() {
try {
//初始化默认looking状态,初始化自己的选票
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//electionType默认为3,这里已经废弃了。
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(getQuorumAddress().getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
//创建选举算法,Election对象
this.electionAlg = createElectionAlgorithm(electionType);
}

protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;

switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
//electionAlgorithm默认为3,所以使用FastLeaderElection选举算法
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

在看最核心的run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public void run() {

...

try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
//如果开启了readonly模式,启动ReadOnlyZooKeeperServer
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");

// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//选举
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
}
...
}

run方法循环处理服务器的4种状态。

2.FastLeaderElection

当服务器处于looking状态时,会调用FastLeaderElectionlookForLeader开启新一轮的选举。FastLeaderElection有2个比较重要的内部类WorkerReceiverWorkerSender,先看这两个内部类的代码。

2.1 WorkerReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class WorkerReceiver extends ZooKeeperThread  {
// 是否终止
volatile boolean stop;
// 服务器之间的连接
QuorumCnxManager manager;

WorkerReceiver(QuorumCnxManager manager) {
super("WorkerReceiver");
this.stop = false;
this.manager = manager;
}

public void run() {

Message response;
while (!stop) {
// Sleeps on receive
try {
// 从recvQueue中取出一个选举投票消息(从其他服务器发送过来)
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
// 无投票,跳过
if(response == null) continue;

...

/*
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
if(!validVoter(response.sid)) {
//当前的投票者集合不包含服务器,获取自己的投票加入sendqueue
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());

sendqueue.offer(notmsg);
} else {
// Receive new message
if (LOG.isDebugEnabled()) {
LOG.debug("Receive new notification message. My id = "
+ self.getId());
}

// State of peer that sent this message
//获取对面服务器状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}

n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
/*
* Print notification info
*/
if(LOG.isInfoEnabled()){
printNotification(n);
}

/*
* If this server is looking, then send proposed leader
*/
//自己为looking的状态下
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);

/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
//推选者服务器为LOOKING状态&&选举周期小于逻辑时钟
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
// 创建新的投票
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
// 构造新的发送消息(本服务器自己的投票)
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
// 将发送消息放置于队列,等待发送
sendqueue.offer(notmsg);
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
// 本服务器状态不为LOOKING,推送服务器是LOOKING状态,发送自己的选票
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Long.toHexString(self.getQuorumVerifier().getVersion()));
}

QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
}

WorkerReceiver实现了Runnable接口,是选票接收器。其会不断地从QuorumCnxManager中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票。

2.2 WorkerSender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class WorkerSender extends ZooKeeperThread {
volatile boolean stop;
QuorumCnxManager manager;

WorkerSender(QuorumCnxManager manager){
super("WorkerSender");
this.stop = false;
this.manager = manager;
}

public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;

process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}

/**
* Called by run() once there is a new message to send.
*
* @param m message to send
*/
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);

manager.toSend(m.sid, requestBuffer);

}
}

WorkerSender也实现了Runnable接口,为选票发送器,其会不断地从sendqueue中获取待发送的选票,并将其传递到底层QuorumCnxManager中

2.3 sendNotifications

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
// 遍历投票参与者集合
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
// 构造发送消息
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
// 将发送消息放置于队列
sendqueue.offer(notmsg);
}
}

其会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,其并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送。

2.4 totalOrderPredicate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Check if a pair (server id, zxid) succeeds our
* current vote.
*
* @param id Server identifier
* @param zxid Last zxid observed by the issuer of this vote
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}

/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
// 1\. 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader
// 2\. 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader
// 3\. 如果前面两个都相等那就比较服务器id,如果大,则其就是leader
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

说明:该函数将接收的投票与自身投票进行PK,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。

2.5 termPredicate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Termination predicate. Given a set of votes, determines if have
* sufficient to declare the end of the election round.
*
* @param votes
* Set of votes
* @param vote
* Identifier of the vote received last
*/
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
//统计set,查看投某个id的票数是否超过一半
return voteSet.hasAllQuorums();
}

该函数用于判断Leader选举是否结束,即是否有一半以上的服务器选出了相同的Leader,其过程是将收到的选票与当前选票进行对比,选票相同的放入同一个集合,之后判断选票相同的集合是否超过了半数。

2.6 checkLeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

* In the case there is a leader elected, and a quorum supporting
* this leader, we have to check if the leader has voted and acked
* that it is leading. We need this check to avoid that peers keep
* electing over and over a peer that has crashed and it is no
* longer leading.
*
* @param votes set of votes
* @param leader leader id
* @param electionEpoch epoch id
*/
protected boolean checkLeader(
Map<Long, Vote> votes,
long leader,
long electionEpoch){

boolean predicate = true;

/*
* If everyone else thinks I'm the leader, I must be the leader.
* The other two checks are just for the case in which I'm not the
* leader. If I'm not the leader and I haven't received a message
* from leader stating that it is leading, then predicate is false.
*/

if(leader != self.getId()){// 自己不为leader
if(votes.get(leader) == null) predicate = false;// 还未选出leader
else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;// 选出的leader还未给出ack信号,其他服务器还不知道leader
} else if(logicalclock.get() != electionEpoch) {// 逻辑时钟不等于选举周期
predicate = false;
}

return predicate;
}

2.7 lookForLeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

int notTimeout = finalizeWait;

synchronized(this){
// 更新逻辑时钟,每进行一轮选举,都需要更新逻辑时钟
logicalclock.incrementAndGet();
// 更新选票
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 向其他服务器发送自己的选票
sendNotifications();

/*
* Loop in which we exchange notifications until we find a leader
*/

while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){// 本服务器状态为LOOKING并且还未选出leader
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
// 从recvqueue接收队列中取出投票
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);

/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){// 如果没有收到足够多的选票,则发送选票
if(manager.haveDelivered()){// manager已经发送了所有选票消息
// 向所有其他服务器发送消息
sendNotifications();
} else {// 还未发送所有消息
// 连接其他每个服务器
manager.connectAll();
}

/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if (validVoter(n.sid) && validVoter(n.leader)) {// 投票者集合中包含接收到消息中的服务器id
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {// 其选举周期大于逻辑时钟
// 重新赋值逻辑时钟
logicalclock.set(n.electionEpoch);
recvset.clear();
//投票PK,并更新自己的选票
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//发送自己的投票结果
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {// 选举周期小于逻辑时钟,不做处理
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {//选举周期等于逻辑时钟,PK选票
// 更新选票
updateProposal(n.leader, n.zxid, n.peerEpoch);
// 发送消息
sendNotifications();
}

if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}

// recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {// 若能选出leader

// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){// 遍历已经接收的投票集合
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){// 能够选出较优的服务器
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:// 处于LEADING状态
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){// 与逻辑时钟相等
// 将该服务器和选票信息放入recvset中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 判断是否完成了leader选举
if(termPredicate(recvset, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
// 设置本服务器的状态
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
// 创建投票信息
Vote endVote = new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}

/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.version, n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state
+ " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}

该函数用于开始新一轮的Leader选举。