publicvoidconfigure(InetSocketAddress addr, int maxcc, boolean secure)throws IOException { if (secure) { thrownew UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } //配置sasl,安全相关的,暂时忽略。 configureSaslLogin();
//从名字看是最大客户端连接数,默认60 maxClientCnxns = maxcc; //session过期时间,先从System Property获取数值,没有的话默认10000,第一次知道Integer还有这样的方法。 sessionlessCnxnTimeout = Integer.getInteger( ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); // We also use the sessionlessCnxnTimeout as expiring interval for // cnxnExpiryQueue. These don't need to be the same, but the expiring // interval passed into the ExpiryQueue() constructor below should be // less than or equal to the timeout. //初始化队列和线程,从名字看都是跟过期时间相关的。 cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread(); //获取核数 int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads //初始化selector线程,如果32位核数的话是4个线程,不知道为什么要用这么多线程,后面再看。 numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1)); if (numSelectorThreads < 1) { thrownew IOException("numSelectorThreads must be at least 1"); } //worker线程数=核数*2 numWorkerThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong( ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
//NIOServerCnxn和过期时间的map privatefinal ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>(); /** * The maximum number of buckets is equal to max timeout/expirationInterval, * so the expirationInterval should not be too small compared to the * max timeout that this expiry queue needs to maintain. */ //过期时间和Set<NIOServerCnxn>的map privatefinal ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
//下次过期时间 privatefinal AtomicLong nextExpirationTime = new AtomicLong(); //过期间隔 privatefinalint expirationInterval;
privatelongroundToNextInterval(long time){ return (time / expirationInterval + 1) * expirationInterval; } //更新元素的过期时间 public Long update(E elem, int timeout){ //获取元素的过期时间 Long prevExpiryTime = elemMap.get(elem); long now = Time.currentElapsedTime(); //计算新的过期时间,将timeout按过期间隔取整 Long newExpiryTime = roundToNextInterval(now + timeout);
if (newExpiryTime.equals(prevExpiryTime)) { // No change, so nothing to update returnnull; } //下面是将过期时间和elem存到expiryMap和elemMap中
// First add the elem to the new expiry time bucket in expiryMap. Set<E> set = expiryMap.get(newExpiryTime); if (set == null) { // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap( new ConcurrentHashMap<E, Boolean>()); // Put the new set in the map, but only if another thread // hasn't beaten us to it Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set); if (existingSet != null) { set = existingSet; } } set.add(elem);
// Map the elem to the new expiry time. If a different previous // mapping was present, clean up the previous expiry bucket. prevExpiryTime = elemMap.put(elem, newExpiryTime); if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) { Set<E> prevSet = expiryMap.get(prevExpiryTime); if (prevSet != null) { prevSet.remove(elem); } } return newExpiryTime; } public Set<E> poll(){ //获取当前时间获取过期的元素并remove long now = Time.currentElapsedTime(); long expirationTime = nextExpirationTime.get(); if (now < expirationTime) { return Collections.emptySet(); }
Set<E> set = null; long newExpirationTime = expirationTime + expirationInterval; if (nextExpirationTime.compareAndSet( expirationTime, newExpirationTime)) { set = expiryMap.remove(expirationTime); } if (set == null) { return Collections.emptySet(); } return set; }
/** * This will be used by the uncaught exception handler and just log a * warning message and return. * * @param thName * - thread name * @param e * - exception object */ protectedvoidhandleException(String thName, Throwable e){ LOG.warn("Exception occurred from thread {}", thName, e); } }
publicvoidrun(){ try { //运行状况下死循环执行这3个方法 while (!stopped) { try { select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } //如果跳出循环,关闭所有连接。 // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } }
if (!key.isValid()) { cleanupSelectionKey(key); continue; } //处理readable和writable的连接 if (key.isReadable() || key.isWritable()) { handleIO(key); } else { LOG.warn("Unexpected ops in select " + key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } }
/** * Schedule I/O for processing on the connection associated with * the given SelectionKey. If a worker thread pool is not being used, * I/O is run directly by this thread. */ privatevoidhandleIO(SelectionKey key){ //封装workRequest,交由workerPool处理。 IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
// Stop selecting this key while processing on its // connection //在处理过程中,停止注册SelectionKey cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); workerPool.schedule(workRequest); }
/** * Iterate over the queue of accepted connections that have been * assigned to this thread but not yet placed on the selector. */ privatevoidprocessAcceptedConnections(){ //从acceptedQueue中取出SelectionKey,注册READ事件 SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } }
/** * Iterate over the queue of connections ready to resume selection, * and restore their interest ops selection mask. */ privatevoidprocessInterestOpsUpdateRequests(){ //从updateQueue中取出SelectionKey,注册READ或WRITE事件 SelectionKey key; while (!stopped && (key = updateQueue.poll()) != null) { if (!key.isValid()) { cleanupSelectionKey(key); } NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { key.interestOps(cnxn.getInterestOps()); } } }
publicvoidstart(){ stopped = false; if (workerPool == null) { workerPool = new WorkerService( "NIOWorker", numWorkerThreads, false); } for(SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }
publicvoidstart(){ if (numWorkerThreads > 0) { if (threadsAreAssignable) { for(int i = 1; i <= numWorkerThreads; ++i) { workers.add(Executors.newFixedThreadPool( 1, new DaemonThreadFactory(threadNamePrefix, i))); } } else { workers.add(Executors.newFixedThreadPool( numWorkerThreads, new DaemonThreadFactory(threadNamePrefix))); } } stopped = false; }
WorkerService的初始化,主要是初始化了线程池workers。
1 2 3 4 5 6 7 8 9 10 11 12 13
publicstaticabstractclassWorkRequest{ /** * Must be implemented. Is called when the work request is run. */ publicabstractvoiddoWork()throws Exception;
/** * (Optional) If implemented, is called if the service is stopped * or unable to schedule the request. */ publicvoidcleanup(){ } }
publicvoidschedule(WorkRequest workRequest, long id){ if (stopped) { workRequest.cleanup(); return; }
ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
// If we have a worker thread pool, use that; otherwise, do the work // directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.run(); } }
publicvoiddoWork()throws InterruptedException { if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; }
if (key.isReadable() || key.isWritable()) { cnxn.doIO(key);
// Check if we shutdown or doIO() closed this connection if (stopped) { cnxn.close(); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } //更新过期时间 touchCnxn(cnxn); }
// Mark this connection as once again ready for selection cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(); } }
voiddoIO(SelectionKey k)throws InterruptedException { try { if (isSocketOpen() == false) { LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId));
return; } if (k.isReadable()) { //一开始incomingBuffer=lenBuffer int rc = sock.read(incomingBuffer); if (rc < 0) { thrownew EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } //incomingBuffer读满,也就是读取了4个字节的长度 if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); //读取长度,incomingBuffer重新赋值 isPayload = readLength(k); incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword //处理payload readPayload(); } else { // four letter words take care // need not do anything else return; } } } if (k.isWritable()) { //write 暂时跳过 handleWrite(k);
if (!initialized && !getReadInterest() && !getWriteInterest()) { thrownew CloseRequestException("responded to info probe"); } } } catch (CancelledKeyException e) { LOG.warn("CancelledKeyException causing close of session 0x" + Long.toHexString(sessionId)); if (LOG.isDebugEnabled()) { LOG.debug("CancelledKeyException stack trace", e); } close(); } catch (CloseRequestException e) { // expecting close to log session closure close(); } catch (EndOfStreamException e) { LOG.warn(e.getMessage()); // expecting close to log session closure close(); } catch (IOException e) { LOG.warn("Exception causing close of session 0x" + Long.toHexString(sessionId) + ": " + e.getMessage()); if (LOG.isDebugEnabled()) { LOG.debug("IOException stack trace", e); } close(); } }
privatebooleanreadLength(SelectionKey k)throws IOException { // 读取长度,重新新建incomingBuffer int len = lenBuffer.getInt(); if (!initialized && checkFourLetterWord(sk, len)) { returnfalse; } if (len < 0 || len > BinaryInputArchive.maxBuffer) { thrownew IOException("Len error " + len); } if (!isZKServerRunning()) { thrownew IOException("ZooKeeperServer not running"); } incomingBuffer = ByteBuffer.allocate(len); returntrue; }
privatevoidreadPayload()throws IOException, InterruptedException { if (incomingBuffer.remaining() != 0) { //读取payload int rc = sock.read(incomingBuffer); if (rc < 0) { thrownew EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } } //读满 if (incomingBuffer.remaining() == 0) { packetReceived(); incomingBuffer.flip(); if (!initialized) { readConnectRequest(); } else { //处理请求 readRequest(); } lenBuffer.clear(); //复位incomingBuffer incomingBuffer = lenBuffer; } }
publicvoidsubmitRequest(Request si){ //如果firstProcessor为空,等待初始化 if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { thrownew RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }
submitRequest方法调用请求处理链来处理请求。
1 2 3 4 5 6 7 8
protectedvoidsetupRequestProcessors(){ RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
try { switch (request.type) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: CreateRequest create2Request = new CreateRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break; case OpCode.createTTL: CreateTTLRequest createTtlRequest = new CreateTTLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true); break; case OpCode.deleteContainer: case OpCode.delete: DeleteRequest deleteRequest = new DeleteRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); break; case OpCode.setData: SetDataRequest setDataRequest = new SetDataRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); break; case OpCode.reconfig: ReconfigRequest reconfigRequest = new ReconfigRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest); pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true); break; case OpCode.setACL: SetACLRequest setAclRequest = new SetACLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true); break; case OpCode.check: CheckVersionRequest checkRequest = new CheckVersionRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true); break; case OpCode.multi: MultiTransactionRecord multiRequest = new MultiTransactionRecord(); try { ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest); } catch(IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; } List<Txn> txns = new ArrayList<Txn>(); //Each op in a multi-op must have the same zxid! long zxid = zks.getNextZxid(); KeeperException ke = null;
//Store off current pending change records in case we need to rollback Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
for(Op op: multiRequest) { Record subrequest = op.toRequestRecord(); int type; Record txn;
/* If we've already failed one of the ops, don't bother * trying the rest as we know it's going to fail and it * would be confusing in the logfiles. */ if (ke != null) { type = OpCode.error; txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue()); }
/* Prep the request and convert to a Txn */ else { try { pRequest2Txn(op.getType(), zxid, request, subrequest, false); type = request.getHdr().getType(); txn = request.getTxn(); } catch (KeeperException e) { ke = e; type = OpCode.error; txn = new ErrorTxn(e.code().intValue());
if (e.code().intValue() > Code.APIERROR.intValue()) { LOG.info("Got user-level KeeperException when processing {} aborting" + " remaining multi ops. Error Path:{} Error:{}", request.toString(), e.getPath(), e.getMessage()); }
request.setException(e);
/* Rollback change records from failed multi-op */ rollbackPendingChanges(zxid, pendingChanges); } }
//FIXME: I don't want to have to serialize it here and then // immediately deserialize in next processor. But I'm // not sure how else to get the txn stored into our list. ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); txn.serialize(boa, "request") ; ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
//create/close session don't require request record case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); } break;
//All the rest don't need to create a Txn - just verify session case OpCode.sync: case OpCode.exists: case OpCode.getData: case OpCode.getACL: case OpCode.getChildren: case OpCode.getChildren2: case OpCode.ping: case OpCode.setWatches: case OpCode.checkWatches: case OpCode.removeWatches: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); break; default: LOG.warn("unknown type " + request.type); break; } } catch (KeeperException e) { if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(e.code().intValue())); }
if (e.code().intValue() > Code.APIERROR.intValue()) { LOG.info("Got user-level KeeperException when processing {} Error Path:{} Error:{}", request.toString(), e.getPath(), e.getMessage()); } request.setException(e); } catch (Exception e) { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process " + request, e);
StringBuilder sb = new StringBuilder(); ByteBuffer bb = request.request; if(bb != null){ bb.rewind(); while (bb.hasRemaining()) { sb.append(Integer.toHexString(bb.get() & 0xff)); } } else { sb.append("request buffer is null"); }
@Override publicvoidrun(){ try { int logCount = 0;
// we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time // 确保所有的服务器在同一时间不是使用的同一个快照 int randRoll = r.nextInt(snapCount/2); while (true) { Request si = null; if (toFlush.isEmpty()) { //如果待刷新的队列为空,用take方法,会阻塞 si = queuedRequests.take(); } else { //如果待刷新的队列不为空,用poll方法,如果队列里面没有内容,返回null。 //如果返回null就刷新流到硬盘,否则继续处理请求。可以理解为空闲的时候flush si = queuedRequests.poll(); if (si == null) { flush(toFlush); continue; } } if (si == requestOfDeath) { break; } if (si != null) { // 将请求添加至日志文件,只有事务性请求返回true if (zks.getZKDatabase().append(si)) { logCount++; //文件过大则roll日志文件,并创建线程处理快照。 if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { publicvoidrun(){ try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } } elseif (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor //查看此时toFlush是否为空,如果为空,说明近段时间读多写少,直接响应 if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } toFlush.add(si); if (toFlush.size() > 1000) { flush(toFlush); } } } } catch (Throwable t) { handleException(this.getName(), t); } finally{ running = false; } LOG.info("SyncRequestProcessor exited!"); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
privatevoidflush(LinkedList<Request> toFlush) throws IOException, RequestProcessorException { if (toFlush.isEmpty()) return; //刷新磁盘 zks.getZKDatabase().commit(); while (!toFlush.isEmpty()) { //将请求交给下一个处理器处理 Request i = toFlush.remove(); if (nextProcessor != null) { nextProcessor.processRequest(i); } } if (nextProcessor != null && nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } }