[toc]

zookeeper 单机流程3.5.5

ZooKeeperServerMain的启动中,会启动NIOServerCnxnFactory

1
2
3
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
cnxnFactory.startup(zkServer);

ServerCnxnFactory.createFactory默认创建NIOServerCnxnFactory,首先调用configure方法,然后调用startup方法。

1.configure

首先看NIOServerCnxnFactory.configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
//配置sasl,安全相关的,暂时忽略。
configureSaslLogin();

//从名字看是最大客户端连接数,默认60
maxClientCnxns = maxcc;
//session过期时间,先从System Property获取数值,没有的话默认10000,第一次知道Integer还有这样的方法。
sessionlessCnxnTimeout = Integer.getInteger(
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
//初始化队列和线程,从名字看都是跟过期时间相关的。
cnxnExpiryQueue =
new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();
//获取核数
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
//初始化selector线程,如果32位核数的话是4个线程,不知道为什么要用这么多线程,后面再看。
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores/2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
}
//worker线程数=核数*2
numWorkerThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(
ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

LOG.info("Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout/1000) + "s sessionless connection"
+ " timeout, " + numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." :
("" + (directBufferBytes/1024) + " kB direct buffers.")));
//初始化selectorThreads
for(int i=0; i<numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}
//创建2181端口服务
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
//创建一个AcceptThread线程
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

configure方法主要做的几件事情:

  1. 创建ExpiryQueueConnectionExpirerThread
  2. 创建SelectorThread
  3. 监听2181端口。
  4. 创建AcceptThread

1.1 ExpiryQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public class ExpiryQueue<E> {

//NIOServerCnxn和过期时间的map
private final ConcurrentHashMap<E, Long> elemMap =
new ConcurrentHashMap<E, Long>();
/**
* The maximum number of buckets is equal to max timeout/expirationInterval,
* so the expirationInterval should not be too small compared to the
* max timeout that this expiry queue needs to maintain.
*/

//过期时间和Set<NIOServerCnxn>的map
private final ConcurrentHashMap<Long, Set<E>> expiryMap =
new ConcurrentHashMap<Long, Set<E>>();

//下次过期时间
private final AtomicLong nextExpirationTime = new AtomicLong();
//过期间隔
private final int expirationInterval;

public ExpiryQueue(int expirationInterval) {
this.expirationInterval = expirationInterval;
nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime()));
}

private long roundToNextInterval(long time) {
return (time / expirationInterval + 1) * expirationInterval;
}

//更新元素的过期时间
public Long update(E elem, int timeout) {
//获取元素的过期时间
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
//计算新的过期时间,将timeout按过期间隔取整
Long newExpiryTime = roundToNextInterval(now + timeout);

if (newExpiryTime.equals(prevExpiryTime)) {
// No change, so nothing to update
return null;
}

//下面是将过期时间和elem存到expiryMap和elemMap中

// First add the elem to the new expiry time bucket in expiryMap.
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
set = Collections.newSetFromMap(
new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(elem);

// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
prevExpiryTime = elemMap.put(elem, newExpiryTime);
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
prevSet.remove(elem);
}
}
return newExpiryTime;
}

public Set<E> poll() {
//获取当前时间获取过期的元素并remove
long now = Time.currentElapsedTime();
long expirationTime = nextExpirationTime.get();
if (now < expirationTime) {
return Collections.emptySet();
}

Set<E> set = null;
long newExpirationTime = expirationTime + expirationInterval;
if (nextExpirationTime.compareAndSet(
expirationTime, newExpirationTime)) {
set = expiryMap.remove(expirationTime);
}
if (set == null) {
return Collections.emptySet();
}
return set;
}

分析ExpiryQueue中最主要的updatepoll函数可以看到ExpiryQueue主要作用是向外输出到达过期时间的NIOServerCnxn

1.2 ConnectionExpirerThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private class ConnectionExpirerThread extends ZooKeeperThread {
ConnectionExpirerThread() {
super("ConnnectionExpirer");
}

public void run() {
try {
while (!stopped) {
long waitTime = cnxnExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
conn.close();
}
}

} catch (InterruptedException e) {
LOG.info("ConnnectionExpirerThread interrupted");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ZooKeeperThread extends Thread {

private static final Logger LOG = LoggerFactory
.getLogger(ZooKeeperThread.class);

private UncaughtExceptionHandler uncaughtExceptionalHandler = new UncaughtExceptionHandler() {

@Override
public void uncaughtException(Thread t, Throwable e) {
handleException(t.getName(), e);
}
};

public ZooKeeperThread(String threadName) {
super(threadName);
setUncaughtExceptionHandler(uncaughtExceptionalHandler);
}

/**
* This will be used by the uncaught exception handler and just log a
* warning message and return.
*
* @param thName
* - thread name
* @param e
* - exception object
*/
protected void handleException(String thName, Throwable e) {
LOG.warn("Exception occurred from thread {}", thName, e);
}
}

ConnectionExpirerThreadNIOServerCnxnFactory的子类,继承自ZooKeeperThreadZooKeeperThread主要就是设置了线程名和线程的UncaughtExceptionHandler
ConnectionExpirerThread的功能是从cnxnExpiryQueue中获取过期的NIOServerCnxnclose

1.1.3 AcceptThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
private class AcceptThread extends AbstractSelectThread {
private final ServerSocketChannel acceptSocket;
private final SelectionKey acceptKey;
private final RateLogger acceptErrorLogger = new RateLogger(LOG);
private final Collection<SelectorThread> selectorThreads;
private Iterator<SelectorThread> selectorIterator;
private volatile boolean reconfiguring = false;

public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
Set<SelectorThread> selectorThreads) throws IOException {
super("NIOServerCxnFactory.AcceptThread:" + addr);
this.acceptSocket = ss;
// 注册OP_ACCEPT
this.acceptKey =
acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
this.selectorThreads = Collections.unmodifiableList(
new ArrayList<SelectorThread>(selectorThreads));
selectorIterator = this.selectorThreads.iterator();
}

public void run() {
try {
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
select();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
} finally {
closeSelector();
// This will wake up the selector threads, and tell the
// worker thread pool to begin shutdown.
if (!reconfiguring) {
NIOServerCnxnFactory.this.stop();
}
LOG.info("accept thread exitted run method");
}
}

private void select() {
try {
selector.select();

Iterator<SelectionKey> selectedKeys =
selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();

if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
//如果有accept的连接,调用doAccept方法
if (!doAccept()) {
pauseAccept(10);
}
} else {
LOG.warn("Unexpected ops in accept select "
+ key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}

private boolean doAccept() {
boolean accepted = false;
SocketChannel sc = null;
try {
sc = acceptSocket.accept();
accepted = true;
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
//判断连接上限
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
throw new IOException("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
}

LOG.debug("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);

// Round-robin assign this connection to a selector thread
//从迭代器里面获取一个SelectorThread,如果没有的话,重新初始化迭代器然后获取。
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException(
"Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
acceptErrorLogger.flush();
} catch (IOException e) {
// accept, maxClientCnxns, configureBlocking
acceptErrorLogger.rateLimitLog(
"Error accepting new connection: " + e.getMessage());
fastCloseSock(sc);
}
return accepted;
}
}

AcceptThread做的主要事情就是accept连接,然后分配给SelectorThread

1.1.4 SelectorThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class SelectorThread extends AbstractSelectThread {
private final int id;
private final Queue<SocketChannel> acceptedQueue;
private final Queue<SelectionKey> updateQueue;

public SelectorThread(int id) throws IOException {
super("NIOServerCxnFactory.SelectorThread-" + id);
this.id = id;
acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
updateQueue = new LinkedBlockingQueue<SelectionKey>();
}

public boolean addAcceptedConnection(SocketChannel accepted) {
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
wakeupSelector();
return true;
}

public boolean addInterestOpsUpdateRequest(SelectionKey sk) {
if (stopped || !updateQueue.offer(sk)) {
return false;
}
wakeupSelector();
return true;
}

...
}

SelectorThread同样继承AbstractSelectThread,有两个队列acceptedQueueupdateQueue,提供两个方法往队列里塞数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
public void run() {
try {
//运行状况下死循环执行这3个方法
while (!stopped) {
try {
select();
processAcceptedConnections();
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
//如果跳出循环,关闭所有连接。
// Close connections still pending on the selector. Any others
// with in-flight work, let drain out of the work queue.
for (SelectionKey key : selector.keys()) {
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
cnxn.close();
}
cleanupSelectionKey(key);
}
SocketChannel accepted;
while ((accepted = acceptedQueue.poll()) != null) {
fastCloseSock(accepted);
}
updateQueue.clear();
} finally {
closeSelector();
// This will wake up the accept thread and the other selector
// threads, and tell the worker thread pool to begin shutdown.
NIOServerCnxnFactory.this.stop();
LOG.info("selector thread exitted run method");
}
}

private void select() {
try {
selector.select();

Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList =
new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
while(!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selected.remove(key);

if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
//处理readable和writable的连接
if (key.isReadable() || key.isWritable()) {
handleIO(key);
} else {
LOG.warn("Unexpected ops in select " + key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}

/**
* Schedule I/O for processing on the connection associated with
* the given SelectionKey. If a worker thread pool is not being used,
* I/O is run directly by this thread.
*/
private void handleIO(SelectionKey key) {
//封装workRequest,交由workerPool处理。
IOWorkRequest workRequest = new IOWorkRequest(this, key);
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();

// Stop selecting this key while processing on its
// connection
//在处理过程中,停止注册SelectionKey
cnxn.disableSelectable();
key.interestOps(0);
touchCnxn(cnxn);
workerPool.schedule(workRequest);
}

/**
* Iterate over the queue of accepted connections that have been
* assigned to this thread but not yet placed on the selector.
*/
private void processAcceptedConnections() {
//从acceptedQueue中取出SelectionKey,注册READ事件
SocketChannel accepted;
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
try {
key = accepted.register(selector, SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(accepted, key, this);
key.attach(cnxn);
addCnxn(cnxn);
} catch (IOException e) {
// register, createConnection
cleanupSelectionKey(key);
fastCloseSock(accepted);
}
}
}

/**
* Iterate over the queue of connections ready to resume selection,
* and restore their interest ops selection mask.
*/
private void processInterestOpsUpdateRequests() {
//从updateQueue中取出SelectionKey,注册READ或WRITE事件
SelectionKey key;
while (!stopped && (key = updateQueue.poll()) != null) {
if (!key.isValid()) {
cleanupSelectionKey(key);
}
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
key.interestOps(cnxn.getInterestOps());
}
}
}

SelectorThread的功能就是注册WRITE和READ事件的连接,并处理IO。

2.start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void start() {
stopped = false;

if (workerPool == null) {
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
for(SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}

NIOServerCnxnFactory的start方法,初始化WorkerService,并启动SelectorThreadacceptThreadexpirerThread线程。

2.1 WorkerService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private final ArrayList<ExecutorService> workers =
new ArrayList<ExecutorService>();
public WorkerService(String name, int numThreads,
boolean useAssignableThreads) {
this.threadNamePrefix = (name == null ? "" : name) + "Thread";
this.numWorkerThreads = numThreads;
this.threadsAreAssignable = useAssignableThreads;
start();
}

public void start() {
if (numWorkerThreads > 0) {
if (threadsAreAssignable) {
for(int i = 1; i <= numWorkerThreads; ++i) {
workers.add(Executors.newFixedThreadPool(
1, new DaemonThreadFactory(threadNamePrefix, i)));
}
} else {
workers.add(Executors.newFixedThreadPool(
numWorkerThreads, new DaemonThreadFactory(threadNamePrefix)));
}
}
stopped = false;
}

WorkerService的初始化,主要是初始化了线程池workers

1
2
3
4
5
6
7
8
9
10
11
12
13
public static abstract class WorkRequest {
/**
* Must be implemented. Is called when the work request is run.
*/
public abstract void doWork() throws Exception;

/**
* (Optional) If implemented, is called if the service is stopped
* or unable to schedule the request.
*/
public void cleanup() {
}
}

内部类WorkRequest,提供了doWorkcleanup方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private class ScheduledWorkRequest implements Runnable {
private final WorkRequest workRequest;

ScheduledWorkRequest(WorkRequest workRequest) {
this.workRequest = workRequest;
}

@Override
public void run() {
try {
// Check if stopped while request was on queue
if (stopped) {
workRequest.cleanup();
return;
}
workRequest.doWork();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
workRequest.cleanup();
}
}
}

内部类ScheduledWorkRequest,继承Runnablerun方法中执行workRequest.doWork

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void schedule(WorkRequest workRequest, long id) {
if (stopped) {
workRequest.cleanup();
return;
}

ScheduledWorkRequest scheduledWorkRequest =
new ScheduledWorkRequest(workRequest);

// If we have a worker thread pool, use that; otherwise, do the work
// directly.
int size = workers.size();
if (size > 0) {
try {
// make sure to map negative ids as well to [0, size-1]
int workerNum = ((int) (id % size) + size) % size;
ExecutorService worker = workers.get(workerNum);
worker.execute(scheduledWorkRequest);
} catch (RejectedExecutionException e) {
LOG.warn("ExecutorService rejected execution", e);
workRequest.cleanup();
}
} else {
// When there is no worker thread pool, do the work directly
// and wait for its completion
scheduledWorkRequest.run();
}
}

最主要的方法schedule,将workRequest交由线程池执行,如果线程池满,由当前线程执行。

2.2 IOWorkRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private class IOWorkRequest extends WorkerService.WorkRequest {
private final SelectorThread selectorThread;
private final SelectionKey key;
private final NIOServerCnxn cnxn;

IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
this.selectorThread = selectorThread;
this.key = key;
this.cnxn = (NIOServerCnxn) key.attachment();
}

public void doWork() throws InterruptedException {
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}

if (key.isReadable() || key.isWritable()) {
cnxn.doIO(key);

// Check if we shutdown or doIO() closed this connection
if (stopped) {
cnxn.close();
return;
}
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
//更新过期时间
touchCnxn(cnxn);
}

// Mark this connection as once again ready for selection
cnxn.enableSelectable();
// Push an update request on the queue to resume selecting
// on the current set of interest ops, which may have changed
// as a result of the I/O operations we just performed.
if (!selectorThread.addInterestOpsUpdateRequest(key)) {
cnxn.close();
}
}

@Override
public void cleanup() {
cnxn.close();
}
}

IOWorkRequestNIOServerCnxnFactory的内部类,主要是调用org.apache.zookeeper.server.NIOServerCnxn#doIO方法来处理IO。

3. NIOServerCnxn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

//lenBuffer 4个字节,用于读取长度
private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);

//incomingBuffer默认等于lenBuffer
private ByteBuffer incomingBuffer = lenBuffer;

void doIO(SelectionKey k) throws InterruptedException {
try {
if (isSocketOpen() == false) {
LOG.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));

return;
}
if (k.isReadable()) {
//一开始incomingBuffer=lenBuffer
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
//incomingBuffer读满,也就是读取了4个字节的长度
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
//读取长度,incomingBuffer重新赋值
isPayload = readLength(k);
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
if (isPayload) { // not the case for 4letterword
//处理payload
readPayload();
}
else {
// four letter words take care
// need not do anything else
return;
}
}
}
if (k.isWritable()) {
//write 暂时跳过
handleWrite(k);

if (!initialized && !getReadInterest() && !getWriteInterest()) {
throw new CloseRequestException("responded to info probe");
}
}
} catch (CancelledKeyException e) {
LOG.warn("CancelledKeyException causing close of session 0x"
+ Long.toHexString(sessionId));
if (LOG.isDebugEnabled()) {
LOG.debug("CancelledKeyException stack trace", e);
}
close();
} catch (CloseRequestException e) {
// expecting close to log session closure
close();
} catch (EndOfStreamException e) {
LOG.warn(e.getMessage());
// expecting close to log session closure
close();
} catch (IOException e) {
LOG.warn("Exception causing close of session 0x"
+ Long.toHexString(sessionId) + ": " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("IOException stack trace", e);
}
close();
}
}

private boolean readLength(SelectionKey k) throws IOException {
// 读取长度,重新新建incomingBuffer
int len = lenBuffer.getInt();
if (!initialized && checkFourLetterWord(sk, len)) {
return false;
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
incomingBuffer = ByteBuffer.allocate(len);
return true;
}

private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) {
//读取payload
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}
//读满
if (incomingBuffer.remaining() == 0) {
packetReceived();
incomingBuffer.flip();
if (!initialized) {
readConnectRequest();
} else {
//处理请求
readRequest();
}
lenBuffer.clear();
//复位incomingBuffer
incomingBuffer = lenBuffer;
}
}

private void readRequest() throws IOException {
//交由ZooKeeperServer来处理请求
zkServer.processPacket(this, incomingBuffer);
}

NIOServerCnxndoIO方法,主要是处理读写数据。写数据先跳过,读数据是先读取长度,然后读取payload,最后交由ZooKeeperServer来处理请求。

4.ZooKeeperServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
//从流里读取RequestHeader,其实就读取了2个字段,xid和type
h.deserialize(bia, "header");
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
//处理auth请求,跳过
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if(ap != null) {
try {
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch(RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication succeeded for scheme: " + scheme);
}
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn("No authentication provider for scheme: "
+ scheme + " has "
+ ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: " + scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else {
////处理sasl请求,跳过
if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
}
else {
//封装请求Request,调用submitRequest
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
setLocalSessionFlag(si);
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void submitRequest(Request si) {
//如果firstProcessor为空,等待初始化
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}

submitRequest方法调用请求处理链来处理请求。

1
2
3
4
5
6
7
8
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}

从启动方法中可以看出,ZooKeeperServer的请求处理链是PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor

5.RequestProcessor

zookeeper有一堆请求处理链,其中父接口为RequestProcessor,框架图如下:

各个子类的作用列一下:

  • AckRequestProcessor,将前一阶段的请求作为ACK转发给Leader。
  • CommitProcessor,将到来的请求与本地提交的请求进行匹配,这是因为改变系统状态的本地请求的返回结果是到来的请求。
  • FinalRequestProcessor,通常是请求处理链的最后一个处理器。
  • FollowerRequestProcessor,将修改了系统状态的请求转发给Leader。
  • ObserverRequestProcessor,同FollowerRequestProcessor一样,将修改了系统状态的请求转发给Leader。
  • PrepRequestProcessor,通常是请求处理链的第一个处理器。
  • ProposalRequestProcessor,将请求转发给AckRequestProcessor和SyncRequestProcessor。
  • ReadOnlyRequestProcessor,是ReadOnlyZooKeeperServer请求处理链的第一个处理器,将只读请求传递给下个处理器,抛弃改变状态的请求。
  • SendAckRequestProcessor,发送ACK请求的处理器。
  • SyncRequestProcessor,发送Sync请求的处理器。
  • ToBeAppliedRequestProcessor,维护toBeApplied列表,下个处理器必须是FinalRequestProcessor并且FinalRequestProcessor必须同步处理请求。
  • UnimplementedRequestProcessor,用于管理未知请求。
1
2
3
4
5
6
7
8
9
10
11
12
public interface RequestProcessor {
@SuppressWarnings("serial")
public static class RequestProcessorException extends Exception {
public RequestProcessorException(String msg, Throwable t) {
super(msg, t);
}
}

void processRequest(Request request) throws RequestProcessorException;

void shutdown();
}

RequestProcessor定义了2个接口和一个异常内部类。

5.1 PrepRequestProcessor

PrepRequestProcessor顾名思义,主要在处理链里面做一些准备工作,运行在处理链的第一个。

1
2
3
public void processRequest(Request request) {
submittedRequests.add(request);
}

processRequest方法将请求放入队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
@Override
public void run() {
try {
while (true) {
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
pRequest(request);
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
LOG.info(e.getCause().getMessage());
}
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}

protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null);

try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = new CreateTTLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
break;
case OpCode.deleteContainer:
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch(IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;

//Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

for(Op op: multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn;

/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
}

/* Prep the request and convert to a Txn */
else {
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = request.getHdr().getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());

if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting" +
" remaining multi ops. Error Path:{} Error:{}",
request.toString(), e.getPath(), e.getMessage());
}

request.setException(e);

/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}

//FIXME: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request") ;
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

txns.add(new Txn(type, bb.array()));
}

request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
request.setTxn(new MultiTxn(txns));

break;

//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request,
null, true);
}
break;

//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.checkWatches:
case OpCode.removeWatches:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
}

if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} Error Path:{} Error:{}",
request.toString(), e.getPath(), e.getMessage());
}
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process " + request, e);

StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if(bb != null){
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
}

LOG.error("Dumping request buffer: 0x" + sb.toString());
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}

run函数是对Thread类run函数的重写,其核心逻辑相对简单,即不断从队列中取出request进行处理,其会调用pRequest函数。

pRequest会确定请求类型,并根据请求类型不同生成不同的请求对象,然后调用pRequest2Txn函数。

pRequest2Txn会根据不同的请求类型进行不同的验证,如对创建节点而言,其会进行会话验证,ACL列表验证,节点路径验证及判断创建节点的类型(顺序节点、临时节点等)而进行不同操作,同时还会使父节点的子节点数目加1,之后会再调用addChangeRecord函数将ChangeRecord添加至ZooKeeperServer的outstandingChanges和outstandingChangesForPath中。

在处理完后,交给下一个处理链来处理request。

5.2 SyncRequestProcessor

1
2
3
4
public void processRequest(Request request) {
// request.addRQRec(">sync");
queuedRequests.add(request);
}

SyncRequestProcessor同样将请求放入队列来异步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Override
public void run() {
try {
int logCount = 0;

// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
// 确保所有的服务器在同一时间不是使用的同一个快照
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
//如果待刷新的队列为空,用take方法,会阻塞
si = queuedRequests.take();
} else {
//如果待刷新的队列不为空,用poll方法,如果队列里面没有内容,返回null。
//如果返回null就刷新流到硬盘,否则继续处理请求。可以理解为空闲的时候flush
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// 将请求添加至日志文件,只有事务性请求返回true
if (zks.getZKDatabase().append(si)) {
logCount++;
//文件过大则roll日志文件,并创建线程处理快照。
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
//查看此时toFlush是否为空,如果为空,说明近段时间读多写少,直接响应
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
//刷新磁盘
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
//将请求交给下一个处理器处理
Request i = toFlush.remove();
if (nextProcessor != null) {
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}

SyncRequestProcessor主要作用包含将事务性请求刷新到磁盘,并且对请求进行快照处理。

5.3 FinalRequestProcessor

FinalRequestProcessor主要是对DataTree的操作,先不写了。