zookeeper源码(08)请求处理及数据读写流程
ServerCnxnFactory
用于接收客户端连接、管理客户端session、处理客户端请求。
ServerCnxn抽象类
代表一个客户端连接对象:
- 从网络读写数据
- 数据编解码
- 将请求转发给上层组件或者从上层组件接收响应
- 管理连接状态,比如:enableRecv、sessionTimeout、stale、invalid等
- 保存当前的packetsReceived、packetsSent、lastCxid、lastZxid等
- 继承了Watcher接口,也可以作为监听器
两个实现类:
- NIOServerCnxn - 基于NIO
- NettyServerCnxn - 基于Netty
NIOServerCnxnFactory
基于NIO的非阻塞、多线程的ServerCnxnFactory实现类,多线程之间通过queue通信:
- 1个accept线程,用来接收客户端连接,交给selector线程处理
- 1-N个selector线程,每个线程会select 1/N个连接,多个selector线程的原因是,由于有大量连接,select()可能会成为性能瓶颈
- 0-M个socket IO worker线程,做socket读写,如果配置为0则selector线程来做IO
- 1个清理线程,用于关闭空闲连接
线程数量分配示例:32核的机器,1accept线程,1个清理线程,4个selector线程,64个worker线程。
configure方法
-
不支持ssl
-
创建ConnectionExpirerThread线程
-
根据CPU核数确定各种线程的数量
int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); // 64 numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
-
创建SelectorThread线程
-
创建ServerSocketChannel、启动监听、设置非阻塞
-
创建AcceptThread线程
start方法
启动acceptThread、selectorThreads、workerPool、expirerThread线程。
acceptThread线程
1个accept线程,用来接收客户端连接,交给selector线程处理:
-
select查找acceptable的key
-
doAccept接受连接
if (key.isAcceptable()) { if (!doAccept()) { pauseAccept(10); } }
-
给sc(SocketChannel)设置非阻塞、验证远程IP连接数不超过maxClientCnxns(60)、获取SelectorThread开始select读写事件
// Round-robin assign this connection to a selector thread if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); // 使用队列缓存SocketChannel if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue"); }
selectorThread线程
run方法select读写事件、接受客户连接、为key注册"感兴趣"的事件:
-
run方法
public void run() { try { while (!stopped) { try { select(); // select读写事件 processAcceptedConnections(); // 接受客户连接 processInterestOpsUpdateRequests(); } catch (RuntimeException e) { } catch (Exception e) { } } } // ... }
-
接受客户连接会注册OP_READ、创建NIOServerCnxn、绑定到key上面
private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); // 绑定到key上 addCnxn(cnxn); // 维护连接层会话 } catch (IOException e) { // 略 } } }
-
select到读写事件会交给handleIO方法处理
private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its connection cnxn.disableSelectable(); key.interestOps(0); // 重置感兴趣的事件,IO处理完成之后会重新注册读写事件 touchCnxn(cnxn); // 维护连接层会话,刷新过期时间 workerPool.schedule(workRequest); // workRequest.doWork方法做异步读写 }
-
为key注册"感兴趣"的事件
private void processInterestOpsUpdateRequests() { SelectionKey key; while (!stopped && (key = updateQueue.poll()) != null) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { key.interestOps(cnxn.getInterestOps()); } } }
workRequest.doWork方法
workRequest是IOWorkRequest类型对象,doWork会read数据并传递给上层组件:
public void doWork() throws InterruptedException {
// 略
if (key.isReadable() || key.isWritable()) {
cnxn.doIO(key); // 在workerPool线程上执行
// 略
touchCnxn(cnxn); // 维护连接层会话,刷新过期时间
}
// 略
}
数据包使用 len body 方式传输,read的过程不介绍了,cnxn在read到完整的数据之后会调用readConnectRequest或readRequest方法将数据传递给上层组件:
// 应用层建立连接
private void readConnectRequest() throws IOException, ClientCnxnLimitException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
// ConnectRequest封装:
// protocolVersion(0), lastZxidSeen(0), timeOut(3s), sessionId(0), passwd(16位byte), readOnly(F)
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zkServer.processConnectRequest(this, request);
initialized = true;
}
protected void readRequest() throws IOException {
RequestHeader h = new RequestHeader();
// 请求头,封装客户端xid和type由客户端传递过来
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
// 转ByteBufferRequestRecord对象,封装请求字节流
// readRecord将字节流反序列化为指定的Record实现类对象
RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
zkServer.processPacket(this, h, request);
}
NettyServerCnxnFactory
基于Netty的ServerCnxnFactory实现。
CnxnChannelHandler类
核心的网络层处理器,此处记录重要代码:
class CnxnChannelHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final Channel channel = ctx.channel();
// 略
// 创建NettyServerCnxn绑定到channel
NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
// 略
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
try {
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn == null) {
LOG.error("channelRead() on a closed or closing NettyServerCnxn");
} else {
// 读取请求数据
cnxn.processMessage((ByteBuf) msg);
}
} catch (Exception ex) {
throw ex;
}
} finally {
ReferenceCountUtil.release(msg);
}
}
}
cnxn读取请求数据
void processMessage(ByteBuf buf) {
if (throttled.get()) {
// 略
} else {
if (queuedBuffer != null) {
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
receiveMessage(buf); // 解码逻辑在此方法中
// Have to check !closingChannel, because an error in
// receiveMessage() could have led to close() being called.
if (!closingChannel && buf.isReadable()) {
if (queuedBuffer == null) {
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
}
}
}
}
read到完整的数据之后会将数据传递给上层组件:
if (initialized) {
RequestHeader h = new RequestHeader();
ByteBufferInputStream.byteBuffer2Record(bb, h);
RequestRecord request = RequestRecord.fromBytes(bb.slice());
zks.processPacket(this, h, request);
} else {
// 应用层建立连接
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zks.processConnectRequest(this, request);
initialized = true;
}
ZooKeeperServer处理方法
processConnectRequest方法处理连接请求
ZooKeeperServer的processConnectRequest方法用来处理连接请求:
public void processConnectRequest(
ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
long sessionId = request.getSessionId(); // 默认0
int tokensNeeded = 1;
// 略
// ro验证
if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client";
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
// 客户端zxid验证
if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(request.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
int sessionTimeout = request.getTimeOut(); // 客户端默认30000
byte[] passwd = request.getPasswd();
int minSessionTimeout = getMinSessionTimeout(); // 默认tickTime * 2
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout(); // 默认tickTime * 20
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout); // 设置超时时长
// We don't want to receive any packets until we are sure that the session is setup
cnxn.disableRecv();
if (sessionId == 0) {
// 创建新session
long id = createSession(cnxn, passwd, sessionTimeout);
} else {
validateSession(cnxn, sessionId);
// 杀掉旧的session和连接
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
// add新session
cnxn.setSessionId(sessionId);
// 返回connect响应
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
}
}
// 重点看一下创建新session
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
if (passwd == null) {
passwd = new byte[0];
}
long sessionId = sessionTracker.createSession(timeout); // 创建Session返回sessionId
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd); // passwd会赋值给request.passwd
CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
// 给业务层处理器提交createSession请求
// RequestRecord.fromRecord(txn)返回SimpleRequestRecord对象,封装Record对象
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
processPacket方法处理业务请求
ZooKeeperServer的processPacket方法用来处理业务请求:
public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
cnxn.incrOutstandingAndCheckThrottle(h);
if (h.getType() == OpCode.auth) {
// 略
return;
} else if (h.getType() == OpCode.sasl) {
// 略
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
return;
} else {
Request si = new Request(
cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
int length = request.limit();
// 大请求验证
if (isLargeRequest(length)) { // 默认返回false
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
// 提交给业务层处理器
submitRequest(si);
}
}
}
submitRequest流程
- 先把request提交给requestThrottler组件
- requestThrottler是一个限流(默认不启用)组件,内部使用队列缓存request,异步线程消费队列,将request提交给业务处理器
- 直到submitRequest方法,业务处理才离开workerPool线程
if (request != null) {
if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
}
final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
// 默认不限流
if (shouldThrottleOp(request, elapsedTime)) {
request.setIsThrottled(true);
ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
}
// 提交
zks.submitRequestNow(request);
}
submitRequestNow方法将请求提交给业务层处理器:
public void submitRequestNow(Request si) {
// 略
try {
touch(si.cnxn); // 刷新session过期时间
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
firstProcessor.processRequest(si); // 提交给业务层处理器
if (si.cnxn != null) {
incInProcess();
}
} else {
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
requestFinished(si);
} catch (RequestProcessorException e) {
requestFinished(si);
}
}
Leader客户端业务层处理器链
在之前的文章已经介绍,leader使用LeaderZooKeeperServer作为服务实现类。
本章节介绍"leader处理客户端请求"的流程。
处理器链
// 构建处理器链
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor =
new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(
toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager();
}
-
FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾
-
ToBeAppliedRequestProcessor - 维护toBeApplied列表,之后必须是FinalRequestProcessor且processRequest必须同步处理
-
CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器
-
ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { this.zks = zks; this.nextProcessor = nextProcessor; // 内部有维护一个SyncRequestProcessor和AckRequestProcessor AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); syncProcessor = new SyncRequestProcessor(zks, ackProcessor); forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean( FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED); }
-
PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置
-
LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器
LeaderRequestProcessor
public void processRequest(Request request) throws RequestProcessorException {
// 略
// 默认不支持localSession
Request upgradeRequest = null;
try {
upgradeRequest = lzks.checkUpgradeSession(request);
} catch (KeeperException ke) {
// 略
} catch (IOException ie) {
// 略
}
// 此处upgradeRequest==null
if (upgradeRequest != null) {
nextProcessor.processRequest(upgradeRequest);
}
// 调用下游processor
nextProcessor.processRequest(request);
}
PrepRequestProcessor
事务设置:
- 使用队列缓存request
- 消费线程从队列拉request设置事务
run方法
public void run() {
try {
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
// 略
if (Request.requestOfDeath == request) {
break;
}
request.prepStartTime = Time.currentElapsedTime();
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
}
protected void pRequest(Request request) throws RequestProcessorException {
request.setHdr(null);
request.setTxn(null);
if (!request.isThrottled()) {
pRequestHelper(request);
}
request.zxid = zks.getZxid(); // zxid
long timeFinishedPrepare = Time.currentElapsedTime();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
nextProcessor.processRequest(request); // 调用下游processor
ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}
pRequestHelper方法
private void pRequestHelper(Request request) {
try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
// 创建节点请求封装path、data、acl、flag
CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
// zks.getNextZxid()获取递增zxid
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
break;
case OpCode.createTTL:
// 创建ttl请求封装path、data、acl、flag、ttl
CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
break;
case OpCode.deleteContainer:
// 封装path
DeleteContainerRequest deleteContainerRequest =
request.readRequestRecord(DeleteContainerRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
break;
case OpCode.delete:
// 删除节点请求封装path、version
DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
// 设置节点数据请求封装path、data、version
SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
break;
case OpCode.reconfig:
// reconfig请求封装joiningServers、leavingServers、newMembers、curConfigId
ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
break;
case OpCode.setACL:
// 设置acl请求封装path、acl、version
SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
break;
case OpCode.check:
// check请求封装path、version
CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
break;
case OpCode.multi:
// 遍历 逐个pRequest2Txn
// pRequest2Txn(op.getType(), zxid, request, subrequest)
// 封装MultiTxn
break;
// create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) { // 非本地会话
pRequest2Txn(request.type, zks.getNextZxid(), request, null);
}
break;
// All the rest don't need to create a Txn - just verify session
case OpCode.sync:
// sync,exists,getData,getACL,getChildren,getAllChildrenNumber,getChildren2,ping
// setWatches,setWatches2,checkWatches,removeWatches,getEphemerals,multiRead,addWatch
case OpCode.whoAmI:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
break;
}
} catch (KeeperException e) {
// 略
} catch (Exception e) {
// 略
}
}
pRequest2Txn方法流程
代码量大,仅对重点的业务类型做简单分析。
该方法首先会为request设置TxnHeader信息:
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
}
TxnHeader封装事务请求头:
public class TxnHeader implements Record {
private long clientId; // 会话ID
private int cxid; // 客户端xid
private long zxid; // 服务端xid
private long time;
private int type; // 操作类型
}
pRequest2Txn - create相关操作
create/create2/createTTL/createContainer操作:
-
从flags创建createMode并验证ttl和ephemeral
-
验证acl
zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
-
生成顺序节点path
int parentCVersion = parentRecord.stat.getCversion(); if (createMode.isSequential()) { // 形如/users/admin0000000001 path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId);
-
request.setTxn
int newCversion = parentRecord.stat.getCversion() + 1; if (type == OpCode.createContainer) { request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion)); } else if (type == OpCode.createTTL) { request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl)); } else { request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion)); }
-
获取ephemeralOwner
TxnHeader hdr = request.getHdr(); long ephemeralOwner = 0; if (createMode.isContainer()) { ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER; } else if (createMode.isTTL()) { ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl); } else if (createMode.isEphemeral()) { ephemeralOwner = request.sessionId; }
-
addChangeRecord
StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner); parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); parentRecord.stat.setPzxid(request.getHdr().getZxid()); parentRecord.precalculatedDigest = precalculateDigest( DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat); addChangeRecord(parentRecord); // 维护outstandingChanges集 ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL); nodeRecord.data = data; nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s); setTxnDigest(request, nodeRecord.precalculatedDigest); addChangeRecord(nodeRecord); // 维护outstandingChanges集
pRequest2Txn - delete操作
-
验证acl和version
checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path); // stat.version与delete.version需要一致
-
验证没有子节点,有子节点无法删除
-
创建DeleteTxn
request.setTxn(new DeleteTxn(path));
-
addChangeRecord
parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount--; parentRecord.stat.setPzxid(request.getHdr().getZxid()); parentRecord.precalculatedDigest = precalculateDigest( DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat); addChangeRecord(parentRecord); // 维护outstandingChanges集 nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null); nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path); setTxnDigest(request, nodeRecord.precalculatedDigest); addChangeRecord(nodeRecord); // 维护outstandingChanges集
pRequest2Txn - setData操作
-
验证acl、获取newVersion
-
创建SetDataTxn
request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
-
addChangeRecord
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid()); nodeRecord.stat.setVersion(newVersion); nodeRecord.stat.setMtime(request.getHdr().getTime()); nodeRecord.stat.setMzxid(zxid); nodeRecord.data = setDataRequest.getData(); nodeRecord.precalculatedDigest = precalculateDigest( DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat); setTxnDigest(request, nodeRecord.precalculatedDigest); addChangeRecord(nodeRecord);
pRequest2Txn - setACL操作
-
验证acl、获取newVersion
-
创建SetACLTxn
request.setTxn(new SetACLTxn(path, listACL, newVersion));
-
addChangeRecord
pRequest2Txn - createSession操作
CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());
pRequest2Txn - closeSession操作
long startTime = Time.currentElapsedTime();
synchronized (zks.outstandingChanges) {
// 获取所有临时节点
Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
for (ChangeRecord c : zks.outstandingChanges) {
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path2Delete : es) {
if (digestEnabled) {
parentPath = getParentPathAndValidate(path2Delete);
parentRecord = getRecordForPath(parentPath);
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
}
nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null);
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path2Delete);
addChangeRecord(nodeRecord);
}
if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
ProposalRequestProcessor
processRequest方法
public void processRequest(Request request) throws RequestProcessorException {
if (request instanceof LearnerSyncRequest) { // 处理sync命令,后续补充sync命令分析
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
if (shouldForwardToNextProcessor(request)) {
nextProcessor.processRequest(request); // 调用下游processor(CommitProcessor)
}
if (request.getHdr() != null) { // 事务消息需要发proposal、写磁盘
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request); // 给follower发proposal
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 该对象的nextProcessor是AckRequestProcessor
syncProcessor.processRequest(request);
}
}
}
发proposal
发起一个proposal并发给所有成员:
public Proposal propose(Request request) throws XidRolloverException {
// zxid的低32位满了,强制重新选举,生成新一轮epoch和zxid
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
// 序列化
byte[] data = request.getSerializeData();
proposalStats.setLastBufferSize(data.length);
// 封装数据包
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
// 封装Proposal对象
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
p.addQuorumVerifier(self.getQuorumVerifier());
if (request.getHdr().getType() == OpCode.reconfig) {
// 此处会把lastSeenQuorumVerifier写入zoo.cfg.dynamic.next文件
self.setLastSeenQuorumVerifier(request.qv, true);
}
if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
lastProposed = p.packet.getZxid();
// 缓存到outstandingProposals中,processAck时会根据quorum状态确定是否提交
outstandingProposals.put(lastProposed, p);
// 给follower发数据
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
return p;
}
syncProcessor.processRequest方法
processRequest方法将request放入queuedRequests队列,异步线程消费做业务处理:
-
从queuedRequests拉request
-
写txnlog
zks.getZKDatabase().append(si);
-
滚动txnlog文件、生成snapshot文件
// 默认当logCount超过了50000或logSize超过2GB时触发 if (shouldSnapshot()) { resetSnapshotStats(); // 滚动txnlog文件 zks.getZKDatabase().rollLog(); // 生成snapshot文件 if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { // 异步线程生成snapshot文件 new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { } finally { snapThreadMutex.release(); } } }.start(); } }
-
之后会把request传递给nextProcessor(AckRequestProcessor对象)
AckRequestProcessor
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if (self != null) {
request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
leader.processAck(self.getMyId(), request.zxid, null);
}
}
processAck
Keep a count of acks that are received by the leader for a particular proposal.
public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {
// 略
if ((zxid & 0xffffffffL) == 0) {
// We no longer process NEWLEADER ack with this method. However,
// the learner sends an ack back to the leader after it gets
// UPTODATE, so we just ignore the message.
return;
}
if (outstandingProposals.size() == 0) {
return;
}
// 说明zxid的数据已经提交
if (lastCommitted >= zxid) {
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
return;
}
// 略
p.addAck(sid); // 添加ack
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
// reconfig类型命令的特殊处理,略
}
tryToCommit方法会判断quorum状态,即超过半数ack,如果到了quorum状态:
-
从outstandingProposals集移除
-
加入到toBeApplied集
-
给follower发COMMIT
public void commit(long zxid) { synchronized (this) { lastCommitted = zxid; } QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null); sendPacket(qp); // 发给follower }
-
提交
zk.commitProcessor.commit(p.request); // 会进入commitProcessor的committedRequests队列
CommitProcessor
processRequest方法
本地写磁盘之后即调用此方法:
- 把request提交到queuedRequests队列
- 写请求提交到queuedWriteRequests队列
public void processRequest(Request request) {
request.commitProcQueueStartTime = Time.currentElapsedTime();
queuedRequests.add(request); // 所有请求
// If the request will block, add it to the queue of blocking requests
if (needCommit(request)) { // 写请求
queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
}
wakeup();
}
commit方法
follower对proposal到了quorum状态后,会使用这个方法提交事务,然后会将事务写到ZKDatabase中。
public void commit(Request request) {
request.commitRecvTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
committedRequests.add(request); // 进committedRequests队列
wakeup();
}
run方法
对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给下游的ToBeAppliedRequestProcessor处理器。
ToBeAppliedRequestProcessor
维护toBeApplied列表:清理已提交成功的request数据。
FinalRequestProcessor
处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。
- 执行事务
- 区分OpCode执行对应逻辑,发回响应
- 使用cnxn把响应返回给客户端
执行事务
if (!request.isThrottled()) {
rc = applyRequest(request);
}
// ProcessTxnResult rc = zks.processTxn(request);
createSession操作
zks.finishSessionInit(request.cnxn, true);
closeSession操作
给客户端发closeConn数据包:
cnxn.sendCloseSession();
create相关操作
create、create2、createTTL、createContainer操作,创建对应的Response对象。
delete相关操作
略。
setData操作
返回SetDataResponse响应。
setACL操作
返回SetACLResponse响应。
getData操作
private Record handleGetDataRequest(
Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
GetDataRequest getDataRequest = (GetDataRequest) request;
String path = getDataRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
// 检查权限
zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
Stat stat = new Stat();
// 查询数据、addWatcher
byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
return new GetDataResponse(b, stat);
}
setWatches相关操作
setWatches、setWatches2操作:
SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
setWatches.getPersistentWatches(),
setWatches.getPersistentRecursiveWatches(),
cnxn);
addWatch操作
AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
removeWatches操作
RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
getChildren相关操作
getChildren、getChildren2操作:
GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ, request.authInfo, path, null);
List<String> children = zks.getZKDatabase()
.getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
Follower处理Leader数据
处理器链
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(
finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
处理器链:
FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
commitProcessor和syncProcessor处理leader的proposal和commit请求。
processPacket方法
在"zookeeper源码(08)leader、follower和observer"中已经介绍,Follower使用processPacket方法处理来自leader的数据包:
protected void processPacket(QuorumPacket qp) throws Exception {
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL: // 提案
ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
TxnHeader hdr = logEntry.getHeader();
Record txn = logEntry.getTxn();
TxnDigest digest = logEntry.getDigest();
// 略
lastQueued = hdr.getZxid();
// 略
// 记录log数据
// 使用syncProcessor持久化log数据,之后给leader发ack
fzk.logRequest(hdr, txn, digest);
// 略
if (om != null) {
// 略
}
break;
case Leader.COMMIT: // 提交
ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
fzk.commit(qp.getZxid()); // 使用commitProcessor提交log
if (om != null) {
// 略
}
break;
case Leader.COMMITANDACTIVATE: // Similar to COMMIT, only for a reconfig operation
// get the new configuration from the request
Request request = fzk.pendingTxns.element();
SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
final long zxid = qp.getZxid();
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
// commit (writes the new config to ZK tree (/zookeeper/config)
fzk.commit(zxid);
// 略
break;
case Leader.UPTODATE:
// leader告知follower已处于最新状态,可以开始响应客户端
// 正常情况下不应该再出现该类型请求
break;
case Leader.REVALIDATE:
if (om == null || !om.revalidateLearnerSession(qp)) {
revalidate(qp);
}
break;
case Leader.SYNC:
fzk.sync(); // sync命令
break;
default:
LOG.warn("Unknown packet type");
break;
}
}
处理PROPOSAL
syncProcessor.processRequest方法
processRequest方法将request放入queuedRequests队列,异步线程消费做业务处理。
在本地持久化之后,调用下游处理器(SendAckRequestProcessor对象)。
SendAckRequestProcessor
public void processRequest(Request si) {
if (si.type != OpCode.sync) {
// 确认zxid已持久化
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
try {
si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
learner.writePacket(qp, false);
} catch (IOException e) {
// learner.sock.close();
}
}
}
public void flush() throws IOException {
try {
learner.writePacket(null, true);
} catch (IOException e) {
// learner.sock.close();
}
}
处理COMMIT
提交给commitProcessor处理器,该处理器会继续向下游(FinalRequestProcessor)传递。
FinalRequestProcessor
上文已经介绍,此处省略。
Observer处理Leader数据
处理器链
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(
finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}
processPacket方法
protected void processPacket(QuorumPacket qp) throws Exception {
TxnLogEntry logEntry;
TxnHeader hdr;
TxnDigest digest;
Record txn;
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
LOG.warn("Ignoring proposal");
break;
case Leader.COMMIT:
LOG.warn("Ignoring commit");
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Observer started");
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
((ObserverZooKeeperServer) zk).sync();
break;
case Leader.INFORM:
ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
logEntry = SerializeUtils.deserializeTxn(qp.getData());
hdr = logEntry.getHeader();
txn = logEntry.getTxn();
digest = logEntry.getDigest();
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
request.setTxnDigest(digest);
ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
obs.commitRequest(request);
break;
case Leader.INFORMANDACTIVATE: // 处理reconfig请求
// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
logEntry = SerializeUtils.deserializeTxn(remainingdata);
hdr = logEntry.getHeader();
txn = logEntry.getTxn();
digest = logEntry.getDigest();
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));
request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
request.setTxnDigest(digest);
obs = (ObserverZooKeeperServer) zk;
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
obs.commitRequest(request);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
default:
LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
break;
}
}