上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。
官网Datamonitor的代码:
Executor
1. public class Executor implements
2. DataMonitor.DataMonitorListener {
3. String znode;
4.
5. DataMonitor dm;
6.
7. ZooKeeper zk;
8.
9. String filename;
10.
11. String exec[];
12.
13. Process child;
14.
15. //Executor是一个watcher,不过其处理都代理给DataMonitor了
16. public
17. throws
18. this.filename = filename;
19. this.exec = exec;
20. //初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
21. new ZooKeeper(hostPort, 3000, this);
22. //datamonitor是真实的处理类
23. new DataMonitor(zk, znode, null, this);
24. }
DataMonitor
1. public class DataMonitor implements
2.
3. .......
4.
5. public
6. DataMonitorListener listener) {
7. ......
8. // Get things started by checking if the node exists. We are going
9. // to be completely event driven,异步exist,注册watcher,设置回调
10. true, this, null);
11. }
12.
13. ......
14. //处理watcher通知事件
15. public void
16. String path = event.getPath();
17. //如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
18. else
19. if (path != null
20. // Something has changed on the node, let's find out
21. true, this, null);
22. }
23. }
24. if (chainedWatcher != null) {
25. chainedWatcher.process(event);
26. }
27. }
28. //处理exist操作的回掉结果
29. public void processResult(int
30. boolean
31. switch
32. case
33. true;
34. break;
35. case
36. false;
37. break;
38. case
39. case
40. true;
41. listener.closing(rc);
42. return;
43. default:
44. // Retry errors
45. true, this, null);
46. return;
47. }
48. //如果节点存在,则同步获取节点数据
49. byte b[] = null;
50. if
51. try
52. false, null);
53. catch
54. // We don't need to worry about recovering now. The watch
55. // callbacks will kick off any exception handling
56. e.printStackTrace();
57. catch
58. return;
59. }
60. }
61. //如果数据有变化,则处理之
62. if ((b == null
63. null
64. listener.exists(b);
65. prevData = b;
66. }
67. }
68. }
从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。
Zookeeper构造
1. public ZooKeeper(String connectString, int
2. boolean
3. throws
4. {
5. "Initiating client connection, connectString="
6. " sessionTimeout=" + sessionTimeout + " watcher="
7. //设置默认watcher
8. watchManager.defaultWatcher = watcher;
9.
10. new
11. connectString);
12. //从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
13. new
14. connectStringParser.getServerAddresses());
15. //创建客户端连接,初始化SendThread和EventThread
16. new
17. this, watchManager,
18. getClientCnxnSocket(), canBeReadOnly);
19. //启动SendThread和EventThread
20. cnxn.start();
21. }
初始化连接
1. public ClientCnxn(String chrootPath, HostProvider hostProvider, int
2. ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
3. long sessionId, byte[] sessionPasswd, boolean
4. this.zooKeeper = zooKeeper;
5. this.watcher = watcher;
6. //客户端sessionId
7. this.sessionId = sessionId;
8. this.sessionPasswd = sessionPasswd;
9. //客户端设置的超时时间
10. this.sessionTimeout = sessionTimeout;
11. //主机列表
12. this.hostProvider = hostProvider;
13. this.chrootPath = chrootPath;
14. //连接超时
15. connectTimeout = sessionTimeout / hostProvider.size();
16. //读超时
17. 2 / 3;
18. readOnly = canBeReadOnly;
19. //初始化client2个核心线程,SendThread是client的IO核心线程,EventThread从SendThread里拿到event,调用对应watcher
20. new
21. new
22.
23. }
SendThread核心流程
1. public void
2. .....
3. while
4. try
5. //如果还没连上,则启动连接过程,这个方法有歧义,其实现是判断sockkey是否已注册,可能此时连接上server
6. if
7. ......
8. //异步连接
9. startConnect();
10. clientCnxnSocket.updateLastSendAndHeard();
11. }
12. //如果状态为连接上,则真的是连上server了
13. if
14. ......
15. //下一次select超时时间
16. to = readTimeout - clientCnxnSocket.getIdleRecv();
17. else
18. //如果没连上,则递减连接超时
19. to = connectTimeout - clientCnxnSocket.getIdleRecv();
20. }
21. //session超时,包括连接超时
22. if (to <= 0) {
23. throw new
24. "Client session timed out, have not heard from server in "
25. "ms"
26. " for sessionid 0x"
27. + Long.toHexString(sessionId));
28. }
29. //如果send空闲,则发送心跳包
30. if
31. int timeToNextPing = readTimeout / 2
32. - clientCnxnSocket.getIdleSend();
33. if (timeToNextPing <= 0) {
34. sendPing();
35. clientCnxnSocket.updateLastSend();
36. else
37. if
38. to = timeToNextPing;
39. }
40. }
41. }
42.
43. // If we are in read-only mode, seek for read/write server
44. //如果是只读模式,则寻找R/W server,如果找到,则清理之前的连接,并重新连接到R/W server
45. if
46. long
47. int idlePingRwServer = (int) (now - lastPingRwServer);
48. if
49. lastPingRwServer = now;
50. 0;
51. pingRwTimeout =
52. 2*pingRwTimeout, maxPingRwTimeout);
53. //同步测试下个server是否是R/W server,如果是则抛出RWServerFoundException
54. pingRwServer();
55. }
56. to = Math.min(to, pingRwTimeout - idlePingRwServer);
57. }
58. //处理IO
59. this);
60. catch
61. if
62. if
63. // closing so this is expected
64. "An exception was thrown while closing send thread for session 0x"
65. + Long.toHexString(getSessionId())
66. " : "
67. }
68. break;
69. else
70. // this is ugly, you have a better way speak up
71. if (e instanceof
72. ", closing socket connection");
73. else if (e instanceof
74. LOG.info(e.getMessage() + RETRY_CONN_MSG);
75. else if (e instanceof
76. LOG.info(e.getMessage() + RETRY_CONN_MSG);
77. else if (e instanceof
78. LOG.info(e.getMessage());
79. else
80. ......
81. }
82. //清理之前的连接,找下一台server连接
83. cleanup();
84. if
85. new
86. Event.EventType.None,
87. Event.KeeperState.Disconnected,
88. null));
89. }
90. clientCnxnSocket.updateNow();
91. clientCnxnSocket.updateLastSendAndHeard();
92. }
93. }
94. }
95. ......
96. }
具体过程
1. private void startConnect() throws
2. //状态改为CONNETING
3. state = States.CONNECTING;
4. //拿目标地址
5. InetSocketAddress addr;
6. if (rwServerAddress != null) {
7. addr = rwServerAddress;
8. null;
9. else
10. 1000);
11. }
12.
13. "\\(.*\\)",
14. "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
15. ......
16. //异步连接
17. clientCnxnSocket.connect(addr);
18. }
具体connect
1. void connect(InetSocketAddress addr) throws
2. //创建客户端SocketChannel
3. SocketChannel sock = createSock();
4. try
5. //注册OP_CONNECT事件,尝试连接
6. registerAndConnect(sock, addr);
7. catch
8. "Unable to open socket to "
9. sock.close();
10. throw
11. }
12. //session还未初始化
13. false;
14.
15. /*
16. * Reset incomingBuffer
17. */
18. //重置2个读buffer,准备下一次读
19. lenBuffer.clear();
20. incomingBuffer = lenBuffer;
21. }
registerAndConnect过程:
1. void
2. throws
3. sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
4. 试连接
5. boolean
6. 果网络情况很好,立马可以连上,则发送ConnectRequest请求,请求和server建立session
7. if
8. sendThread.primeConnection();
9. }
10. }
primeConnection代表连上之后的操作,主要是建立session:
1. void primeConnection() throws
2. ......
3. //客户端sessionId默认为0
4. long sessId = (seenRwServerBefore) ? sessionId : 0;
5. //构造连接请求
6. new ConnectRequest(0, lastZxid,
7. sessionTimeout, sessId, sessionPasswd);
8. synchronized
9. ......
10. //组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null
11. new Packet(null, null, conReq,
12. null, null, readOnly));
13. }
14. //确保读写事件都监听
15. clientCnxnSocket.enableReadWriteOnly();
16. .....
17. }
此时ConnectRequest请求已经添加到发送队列,SendThread进入doTransport处理流程:
1. void doTransport(int
2. ClientCnxn cnxn)
3. throws
4. //select
5. selector.select(waitTimeOut);
6. Set<SelectionKey> selected;
7. synchronized (this) {
8. selected = selector.selectedKeys();
9. }
10. // Everything below and until we get back to the select is
11. // non blocking, so time is effectively a constant. That is
12. // Why we just have to do this once, here
13. updateNow();
14. for
15. SocketChannel sc = ((SocketChannel) k.channel());
16. //如果之前连接没有立马连上,则在这里处理OP_CONNECT事件
17. if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
18. if
19. updateLastSendAndHeard();
20. sendThread.primeConnection();
21. }
22. }
23. //如果读写就位,则处理之
24. else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
25. doIO(pendingQueue, outgoingQueue, cnxn);
26. }
27. }
28. if
29. synchronized(outgoingQueue) {
30. if
31. null) {
32. enableWrite();
33. }
34. }
35. }
36. selected.clear();
37. }
假设我们此时连接已经好了,WRITE事件ok,则SendThread开始发送我们的ConnectRequest
1. if
2. //同步处理
3. synchronized(outgoingQueue) {
4. //从发送队列中拿请求
5. Packet p = findSendablePacket(outgoingQueue,
6. cnxn.sendThread.clientTunneledAuthenticationInProgress());
7.
8. if (p != null) {
9. //修改上次发送时间
10. updateLastSend();
11. // If we already started writing p, p.bb will already exist
12. //序列化Packet到ByteBuffer
13. if (p.bb == null) {
14. //如果是业务请求,则需要设置事务Id
15. if ((p.requestHeader != null) &&
16. (p.requestHeader.getType() != OpCode.ping) &&
17. (p.requestHeader.getType() != OpCode.auth)) {
18. p.requestHeader.setXid(cnxn.getXid());
19. }
20. //序列化
21. p.createBB();
22. }
23. //写数据
24. sock.write(p.bb);
25. //写完了,太好了,发送成功
26. if
27. //已发送的业务Packet数量
28. sentCount++;
29. //发送完了,那从发送队列删掉,方便后续发送请求处理
30. outgoingQueue.removeFirstOccurrence(p);
31. //如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就扔了。。。
32. if (p.requestHeader != null
33. && p.requestHeader.getType() != OpCode.ping
34. && p.requestHeader.getType() != OpCode.auth) {
35. synchronized
36. pendingQueue.add(p);
37. }
38. }
39. }
40. }
41. //请求发完了,不需要再监听OS的写事件了,如果没发完,那还是要继续监听的,继续写嘛
42. if
43. // No more packets to send: turn off write interest flag.
44. // Will be turned on later by a later call to enableWrite(),
45. // from within ZooKeeperSaslClient (if client is configured
46. // to attempt SASL authentication), or in either doIO() or
47. // in doTransport() if not.
48. disableWrite();
49. else
50. // Just in case
51. enableWrite();
52. }
53. }
54. }
具体序列化方式,ConnRequest的packet没有协议头
1. public void
2. try
3. new
4. BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
5. //写一个int,站位用,整个packet写完会来更新这个值,代表packet的从长度,4个字节
6. 1, "len"); // We'll fill this in later
7. //序列化协议头
8. if (requestHeader != null) {
9. "header");
10. }
11. //序列化协议体
12. if (request instanceof
13. "connect");
14. // append "am-I-allowed-to-be-readonly" flag
15. "readOnly");
16. else if (request != null) {
17. "request");
18. }
19. baos.close();
20. //生成ByteBuffer
21. this.bb = ByteBuffer.wrap(baos.toByteArray());
22. //将bytebuffer的前4个字节修改成真正的长度,总长度减去一个int的长度头
23. this.bb.putInt(this.bb.capacity() - 4);
24. //准备给后续读
25. this.bb.rewind();
26. catch
27. "Ignoring unexpected exception", e);
28. }
29. }
这里我们的第一个Packet是ConnReq,它构造的packet没有header,所以发完就直接丢掉了,但是SendThread还需要监听server端的返回,以确认连上,并进行session的初始化。那到这里client端等待server端返回了,我们看看server是怎么处理ConnReq请求的。
假设server的selector线程已经就位,则selector会拿到一个读就位的事件,也就是client的connReq请求
1. else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
2. NIOServerCnxn c = (NIOServerCnxn) k.attachment();
3. c.doIO(k);
if (k.isReadable()) {
1. //先从Channel读4个字节,代表头
2. int
3. if (rc < 0) {
4. throw new
5. "Unable to read additional data from client sessionid 0x"
6. + Long.toHexString(sessionId)
7. ", likely client has closed socket");
8. }
9. //int读好,继续往下读
10. if (incomingBuffer.remaining() == 0) {
11. boolean
12. //2个一样,就可以继续读下一个请求了
13. if (incomingBuffer == lenBuffer) { // start of next request
14. incomingBuffer.flip();
15. //给incomingBuffer分配一个length长度的内存,将后续的数据都给读进来
16. isPayload = readLength(k);
17. //clear一下,准备写
18. incomingBuffer.clear();
19. else
20. // continuation
21. true;
22. }
23. //好,读后续数据
24. if (isPayload) { // not the case for 4letterword
25. readPayload();
26. }
27. else
28. // four letter words take care
29. // need not do anything else
30. return;
31. }
32. }
33. }
具体的后续数据流程:
1. /** Read the request payload (everything following the length prefix) */
2. private void readPayload() throws
3. if (incomingBuffer.remaining() != 0) { // have we read length bytes?
4. //尝试一次读进来
5. int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
6. if (rc < 0) {
7. throw new
8. "Unable to read additional data from client sessionid 0x"
9. + Long.toHexString(sessionId)
10. ", likely client has closed socket");
11. }
12. }
13. //哈哈,一次读完
14. if (incomingBuffer.remaining() == 0) { // have we read length bytes?
15. //server的packet统计
16. packetReceived();
17. //准备使用这个buffer了
18. incomingBuffer.flip();
19. //嘿嘿,如果CoonectRequst还没来,那第一个packet肯定是他了
20. if
21. readConnectRequest();
22. }
23. //处理请他请求
24. else
25. readRequest();
26. }
27. //清理现场,为下一个packet读做准备
28. lenBuffer.clear();
29. incomingBuffer = lenBuffer;
30. }
31. }
我们现在发的ConnReq已经被server端接受了,处理如下
1. private void readConnectRequest() throws
2. if (zkServer == null) {
3. throw new IOException("ZooKeeperServer not running");
4. }
5. //开始执行ConnectRequest的处理链
6. this, incomingBuffer);
7. //处理完了,说明业务连接已经建立好了
8. true;
9. }
具体处理:
1. public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws
2. //ConnectReq的packet是没有header的,所以直接读内容,反序列化
3. new
4. new
5. "connect");
6. ...
7. boolean readOnly = false;
8. try
9. //是否readOnly
10. "readOnly");
11. false;
12. catch
13. ....
14. }
15. ...
16. //设置客户端请求的session相关参数
17. int
18. byte
19. int
20. if
21. sessionTimeout = minSessionTimeout;
22. }
23. int
24. if
25. sessionTimeout = maxSessionTimeout;
26. }
27. cnxn.setSessionTimeout(sessionTimeout);
28. // We don't want to receive any packets until we are sure that the
29. // session is setup
30. //暂时先不读后续请求了,直到session建立
31. cnxn.disableRecv();
32. //拿客户端的sessionId
33. long
34. //重试
35. if (sessionId != 0) {
36. long
37. "Client attempting to renew session 0x"
38. + Long.toHexString(clientSessionId)
39. " at "
40. serverCnxnFactory.closeSession(sessionId);
41. cnxn.setSessionId(sessionId);
42. reopenSession(cnxn, sessionId, passwd, sessionTimeout);
43. else
44. "Client attempting to establish new session at "
45. + cnxn.getRemoteSocketAddress());
46. //创建新Session
47. createSession(cnxn, passwd, sessionTimeout);
48. }
49. }
创建新session如下:
1. long createSession(ServerCnxn cnxn, byte passwd[], int
2. //server端创建session,sessionId自增
3. long
4. //随机密码
5. new
6. r.nextBytes(passwd);
7. 4);
8. to.putInt(timeout);
9. //每个server端连接都有一个唯一的SessionId
10. cnxn.setSessionId(sessionId);
11. //提交请求给后面的执行链
12. 0, to, null);
13. return
14. }
提交过程:
1. private void submitRequest(ServerCnxn cnxn, long sessionId, int
2. int
3. new
4. submitRequest(si);
5. }
Server端开始执行链,参数是内部的Request对象,此时type是CREATE_SESSION:
1. public void
2. ......
3. try
4. touch(si.cnxn);
5. boolean
6. if
7. //提交给后续的processor执行,一般用异步以提升性能
8. firstProcessor.processRequest(si);
9. if (si.cnxn != null) {
10. incInProcess();
11. }
12. ......
13. }
第一个processor PrepRequestProcessor执行:
1. public void
2. try
3. while (true) {
4. Request request = submittedRequests.take();
5. ......
6. pRequest(request);
7. }
8. ......
9. }
对于CREATE_SESSION具体处理:
1. //create/close session don't require request record
2. case
3. case
4. //在这里,组装了Request的header和txh实现,方便后续processor处理
5. null, true);
6. break;
7. ......
8. request.zxid = zks.getZxid();
9. //让后续processor处理,这里一般是异步以提高性能
10. nextProcessor.processRequest(request);
1. case
2. //读session超时值
3. request.request.rewind();
4. int
5. //组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理
6. new
7. request.request.rewind();
8. zks.sessionTracker.addSession(request.sessionId, to);
9. zks.setOwner(request.sessionId, request.getOwner());
10. break;
从上可见,PrepRequestProcessor主要是负责组装Request的header和txn参数的,相当于是预处理
第二个Processor SyncRequestProcessor处理:
1. int randRoll = r.nextInt(snapCount/2);
2. while (true) {
3. null;
4. //flush队列如果为空,阻塞等待,代表之前的请求都被处理了
5. if
6. si = queuedRequests.take();
7. }
8. //如果不为空,就是说还有请求等待处理,先非阻塞拿一下,如果系统压力小,正好没有请求进来,则处理之前积压的请求
9. //如果系统压力大,则可能需要flush满1000个才会继续处理
10. else
11. si = queuedRequests.poll();
12. //任务queue空闲,处理积压的待flush请求
13. if (si == null) {
14. flush(toFlush);
15. continue;
16. }
17. }
18. if
19. break;
20. }
21. if (si != null) {
22. // track the number of records written to the log
23. //将Request append到log输出流,先序列化再append,注意此时request还没flush到磁盘,还在内存呢
24. if
25. //成功计数器
26. logCount++;
27. //如果成功append的request累计数量大于某个值,则执行flush log的操作
28. //并启动一个线程异步将内存里的Database和session状态写入到snapshot文件,相当于一个checkpoint
29. //snapCount默认是100000
30. if (logCount > (snapCount / 2
31. 2);
32. // roll the log
33. //将内存中的log flush到磁盘
34. zks.getZKDatabase().rollLog();
35. // take a snapshot
36. //启动线程异步将内存中的database和sessions状态写入snapshot文件中
37. if (snapInProcess != null
38. "Too busy to snap, skipping");
39. else
40. new Thread("Snapshot Thread") {
41. public void
42. try
43. zks.takeSnapshot();
44. catch(Exception e) {
45. "Unexpected exception", e);
46. }
47. }
48. };
49. snapInProcess.start();
50. }
51. 0;
52. }
53. }
54. //如果是写请求,而且flush队列为空,执行往下执行
55. else if
56. // optimization for read heavy workloads
57. // iff this is a read, and there are no pending
58. // flushes (writes), then just pass this to the next
59. // processor
60. nextProcessor.processRequest(si);
61. if (nextProcessor instanceof
62. ((Flushable)nextProcessor).flush();
63. }
64. continue;
65. }
66. //写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理
67. toFlush.add(si);
68. //如果系统压力大,可能需要到1000个request才会flush,flush之后可以被后续processor处理
69. if (toFlush.size() > 1000) {
70. flush(toFlush);
71. }
72. }
具体的flush处理:
1. private void
2. throws
3. {
4. if
5. return;
6. //将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄
7. zks.getZKDatabase().commit();
8. //log flush完后,开始处理flush队列里的Request
9. while
10. Request i = toFlush.remove();
11. //执行后面的processor
12. nextProcessor.processRequest(i);
13. }
14. if (nextProcessor instanceof
15. ((Flushable)nextProcessor).flush();
16. }
17. }
我们假设现在系统压力小,我们的ConnectionRequest可以被立马处理了,执行FinalRequestProcessor:
1. if (request.hdr != null) {
2. TxnHeader hdr = request.hdr;
3. Record txn = request.txn;
4. //对于事务型请求,处理之
5. rc = zks.processTxn(hdr, txn);
6. }
具体处理:
1. public
2. ProcessTxnResult rc;
3. int
4. long
5. //进一步调用database来处理事务
6. rc = getZKDatabase().processTxn(hdr, txn);
7. //如果是创建session,添加session
8. if
9. if (txn instanceof
10. CreateSessionTxn cst = (CreateSessionTxn) txn;
11. sessionTracker.addSession(sessionId, cst
12. .getTimeOut());
13. ......
14. return
15. }
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
1. {
2. //在这里构造一个Result对象,返回给FinalRequestProcessor
3. new
4.
5. try
6. rc.clientId = header.getClientId();
7. rc.cxid = header.getCxid();
8. rc.zxid = header.getZxid();
9. rc.type = header.getType();
10. 0;
11. null;
12. ......
在FinalRequestProcessor拿到database的处理结果,继续处理:
1. case
2. zks.serverStats().updateLatency(request.createTime);
3.
4. "SESS";
5. cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
6. request.createTime, System.currentTimeMillis());
7. //在这里写回response
8. true);
9. return;
10. }
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
1. ......
2. //构造一个返回对象,返回协商的sessionTimeout,唯一的sessionId和client的密码
3. new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
4. 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
5. // longer valid
6. new byte[16]);
7. new
8. BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
9. //用-1占位
10. 1, "len");
11. //序列化内容
12. "connect");
13. if
14. bos.writeBool(
15. this instanceof ReadOnlyZooKeeperServer, "readOnly");
16. }
17. baos.close();
18. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
19. //将之前的-1改成真实的长度
20. 4).rewind();
21. //通过channel写回
22. cnxn.sendBuffer(bb);
23.
24. ......
25. //打开selector的读事件
26. cnxn.enableRecv();
27. ......
28. }
具体写回,通讯层NIOServerCnxn:
1. public void
2. try
3. if
4. // We check if write interest here because if it is NOT set,
5. // nothing is queued, so we can try to send the buffer right
6. // away without waking up the selector
7. //确保可写
8. if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
9. try
10. //写回client
11. sock.write(bb);
12. catch
13. // we are just doing best effort right now
14. }
15. }
16. // if there is nothing left to send, we are done
17. //一次写完了,太好了
18. if (bb.remaining() == 0) {
19. packetSent();
20. return;
21. }
22. }
23. //如果一次没写完,添加到输出队列,后续继续写
24. synchronized(this.factory){
25. sk.selector().wakeup();
26. if
27. "Add a buffer to outgoingBuffers, sk "
28. " is valid: "
29. }
30. outgoingBuffers.add(bb);
31. if
32. sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
33. }
34. }
35.
36. .......
37. }
到这里server端已经执行完毕了,返回给client一个ConnectResponse对象,client端的SendThread收到server端的Response处理:
1. void
2. throws
3. SocketChannel sock = (SocketChannel) sockKey.channel();
4. if (sock == null) {
5. throw new IOException("Socket is null!");
6. }
7. if
8. //先读包的长度,一个int
9. int
10. if (rc < 0) {
11. throw new
12. "Unable to read additional data from server sessionid 0x"
13. + Long.toHexString(sessionId)
14. ", likely server has closed socket");
15. }
16. //如果读满,注意这里同一个包,要读2次,第一次读长度,第二次读内容,incomingBuffer重用
17. if
18. incomingBuffer.flip();
19. //如果读的是长度
20. if
21. recvCount++;
22. //给incomingBuffer分配包长度的空间
23. readLength();
24. }
25. //如果还未初始化,就是session还没建立,那server端返回的必须是ConnectResponse
26. else if
27. //读取ConnectRequest,其实就是将incomingBuffer的内容反序列化成ConnectResponse对象
28. readConnectResult();
29. //继续读后续响应
30. enableRead();
31. //如果还有写请求,确保write事件ok
32. if
33. null) {
34. // Since SASL authentication has completed (if client is configured to do so),
35. // outgoing packets waiting in the outgoingQueue can now be sent.
36. enableWrite();
37. }
38. //准备读下一个响应
39. lenBuffer.clear();
40. incomingBuffer = lenBuffer;
41. updateLastHeard();
42. //session建立完毕
43. true;
44. else
45. sendThread.readResponse(incomingBuffer);
46. lenBuffer.clear();
47. incomingBuffer = lenBuffer;
48. updateLastHeard();
49. }
50. }
51. }
具体的读取:
1. void readConnectResult() throws
2. .....
3. //将incomingBuffer反序列化成CoonectResponse
4. new
5. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
6. new
7. "connect");
8.
9. // read "is read-only" flag
10. boolean isRO = false;
11. try
12. "readOnly");
13. catch
14. // this is ok -- just a packet from an old server which
15. // doesn't contain readOnly field
16. "Connected to an old server; r-o mode will be unavailable");
17. }
18. //server返回的sessionId
19. this.sessionId = conRsp.getSessionId();
20. //后续处理,初始化client的一些参数,最后触发WatchedEvent
21. this.sessionId,
22. conRsp.getPasswd(), isRO);
23. }
后续处理如下:
1. void onConnected(int _negotiatedSessionTimeout, long
2. byte[] _sessionPasswd, boolean isRO) throws
3. negotiatedSessionTimeout = _negotiatedSessionTimeout;
4. ......
5. //初始化client端的session相关参数
6. 2 / 3;
7. connectTimeout = negotiatedSessionTimeout / hostProvider.size();
8. hostProvider.onConnected();
9. sessionId = _sessionId;
10. sessionPasswd = _sessionPasswd;
11. //修改CONNECT状态
12. state = (isRO) ?
13. States.CONNECTEDREADONLY : States.CONNECTED;
14. seenRwServerBefore |= !isRO;
15. "Session establishment complete on server "
16. + clientCnxnSocket.getRemoteSocketAddress()
17. ", sessionid = 0x"
18. ", negotiated timeout = "
19. " (READ-ONLY mode)" : ""));
20. //触发一个SyncConnected事件,这里有专门的EventThread会异步通知注册的watcher来处理
21. KeeperState eventState = (isRO) ?
22. KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
23. new
24. Watcher.Event.EventType.None,
25. null));
26. }
EventThread处理:
1. public void
2. if
3. && sessionState == event.getState()) {
4. return;
5. }
6. //EventThread同步session状态
7. sessionState = event.getState();
8.
9. // materialize the watchers based on the event
10. //找出那些需要被通知的watcher,主线程直接调用对应watcher接口即可
11. new
12. watcher.materialize(event.getState(), event.getType(),
13. event.getPath()),
14. event);
15. // queue the pair (watch set & event) for later processing
16. //提交异步队列处理
17. waitingEvents.add(pair);
18. }
EventThread主线程
1. public void
2. try
3. true;
4. while (true) {
5. //拿事件
6. Object event = waitingEvents.take();
7. if
8. true;
9. else
10. //处理
11. processEvent(event);
12. }
13. if
14. synchronized
15. if
16. false;
17. break;
18. }
19. }
20. }
21. catch
22. "Event thread exiting due to interruption", e);
23. }
24.
25. "EventThread shut down");
26. }
具体处理:
1. if (event instanceof
2. // each watcher will process the event
3. WatcherSetEventPair pair = (WatcherSetEventPair) event;
4. for
5. try
6. watcher.process(pair.event);
7. catch
8. "Error while calling watcher ", t);
9. }
10. }
11. }
在我们的例子里,会调用Executor这个watcher的process方法,又代理给了DataMonitor,对于SyncConnected啥事不干
1. case
2. // In this particular example we don't need to do anything
3. // here - watches are automatically re-registered with
4. // server and any watches triggered while the client was
5. // disconnected will be delivered (in order of course)
6. break;
好了,到这里client和server端session已经建立,可以进行后续的业务处理了。通过这个例子,我们讲解了client和server是如何交互数据,后续的请求比如create,get,set,delete都是类似流程。
Session建立核心流程:
1.创建TCP连接
2.client发送ConnectRequest包
http://iwinit.iteye.com/blog/1754611
3.server收到ConnectRequest包,创建session,将server端的sessionId返回给client
4.client收到server的响应,触发相应SyncConnected状态的事件
5.client端watcher消费事件
标签:11,10,Zookeeper,深入浅出,server,Session,new,12,null From: https://blog.51cto.com/u_2650279/6872511