首页 > 其他分享 >深入浅出Zookeeper之二Session建立

深入浅出Zookeeper之二Session建立

时间:2023-07-27 20:34:44浏览次数:47  
标签:11 10 Zookeeper 深入浅出 server Session new 12 null


上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。

官网Datamonitor的代码:

Executor

 



1. public class Executor implements
2.         DataMonitor.DataMonitorListener {  
3.     String znode;  
4.   
5.     DataMonitor dm;  
6.   
7.     ZooKeeper zk;  
8.   
9.     String filename;  
10.   
11.     String exec[];  
12.   
13.     Process child;  
14.   
15. //Executor是一个watcher,不过其处理都代理给DataMonitor了
16. public
17. throws
18. this.filename = filename;  
19. this.exec = exec;  
20. //初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
21. new ZooKeeper(hostPort, 3000, this);  
22. //datamonitor是真实的处理类
23. new DataMonitor(zk, znode, null, this);  
24.     }



 

 DataMonitor

 



1. public class DataMonitor implements
2.   
3. .......  
4.   
5. public
6.             DataMonitorListener listener) {  
7. ......  
8. // Get things started by checking if the node exists. We are going
9. // to be completely event driven,异步exist,注册watcher,设置回调
10. true, this, null);  
11.     }  
12.   
13. ......  
14. //处理watcher通知事件
15. public void
16.         String path = event.getPath();  
17. //如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
18. else
19. if (path != null
20. // Something has changed on the node, let's find out
21. true, this, null);  
22.             }  
23.         }  
24. if (chainedWatcher != null) {  
25.             chainedWatcher.process(event);  
26.         }  
27.     }  
28. //处理exist操作的回掉结果
29. public void processResult(int
30. boolean
31. switch
32. case
33. true;  
34. break;  
35. case
36. false;  
37. break;  
38. case
39. case
40. true;  
41.             listener.closing(rc);  
42. return;  
43. default:  
44. // Retry errors
45. true, this, null);  
46. return;  
47.         }  
48. //如果节点存在,则同步获取节点数据
49. byte b[] = null;  
50. if
51. try
52. false, null);  
53. catch
54. // We don't need to worry about recovering now. The watch
55. // callbacks will kick off any exception handling
56.                 e.printStackTrace();  
57. catch
58. return;  
59.             }  
60.         }  
61. //如果数据有变化,则处理之
62. if ((b == null
63. null
64.             listener.exists(b);  
65.             prevData = b;  
66.         }  
67.     }  
68. }



  从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。

Zookeeper构造

 




    1. public ZooKeeper(String connectString, int
    2. boolean
    3. throws
    4.    {  
    5. "Initiating client connection, connectString="
    6. " sessionTimeout=" + sessionTimeout + " watcher="
    7. //设置默认watcher
    8.        watchManager.defaultWatcher = watcher;  
    9.   
    10. new
    11.                connectString);  
    12. //从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
    13. new
    14.                connectStringParser.getServerAddresses());  
    15. //创建客户端连接,初始化SendThread和EventThread
    16. new
    17. this, watchManager,  
    18.                getClientCnxnSocket(), canBeReadOnly);  
    19. //启动SendThread和EventThread
    20.        cnxn.start();  
    21.    }


     初始化连接

     




      1. public ClientCnxn(String chrootPath, HostProvider hostProvider, int
      2.             ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,  
      3. long sessionId, byte[] sessionPasswd, boolean
      4. this.zooKeeper = zooKeeper;  
      5. this.watcher = watcher;  
      6. //客户端sessionId
      7. this.sessionId = sessionId;  
      8. this.sessionPasswd = sessionPasswd;  
      9. //客户端设置的超时时间
      10. this.sessionTimeout = sessionTimeout;  
      11. //主机列表
      12. this.hostProvider = hostProvider;  
      13. this.chrootPath = chrootPath;  
      14. //连接超时
      15.         connectTimeout = sessionTimeout / hostProvider.size();  
      16. //读超时
      17. 2 / 3;  
      18.         readOnly = canBeReadOnly;  
      19. //初始化client2个核心线程,SendThread是client的IO核心线程,EventThread从SendThread里拿到event,调用对应watcher
      20. new
      21. new
      22.   
      23.     }


       SendThread核心流程

       



      1. public void
      2.             .....  
      3. while
      4. try
      5. //如果还没连上,则启动连接过程,这个方法有歧义,其实现是判断sockkey是否已注册,可能此时连接上server
      6. if
      7.                         ......  
      8. //异步连接
      9.                         startConnect();  
      10.                         clientCnxnSocket.updateLastSendAndHeard();  
      11.                     }  
      12. //如果状态为连接上,则真的是连上server了
      13. if
      14.                         ......  
      15. //下一次select超时时间
      16.                         to = readTimeout - clientCnxnSocket.getIdleRecv();  
      17. else
      18. //如果没连上,则递减连接超时
      19.                         to = connectTimeout - clientCnxnSocket.getIdleRecv();  
      20.                     }  
      21. //session超时,包括连接超时
      22. if (to <= 0) {  
      23. throw new
      24. "Client session timed out, have not heard from server in "
      25. "ms"
      26. " for sessionid 0x"
      27.                                         + Long.toHexString(sessionId));  
      28.                     }  
      29. //如果send空闲,则发送心跳包
      30. if
      31. int timeToNextPing = readTimeout / 2
      32.                                 - clientCnxnSocket.getIdleSend();  
      33. if (timeToNextPing <= 0) {  
      34.                             sendPing();  
      35.                             clientCnxnSocket.updateLastSend();  
      36. else
      37. if
      38.                                 to = timeToNextPing;  
      39.                             }  
      40.                         }  
      41.                     }  
      42.   
      43. // If we are in read-only mode, seek for read/write server
      44. //如果是只读模式,则寻找R/W server,如果找到,则清理之前的连接,并重新连接到R/W server
      45. if
      46. long
      47. int idlePingRwServer = (int) (now - lastPingRwServer);  
      48. if
      49.                             lastPingRwServer = now;  
      50. 0;  
      51.                             pingRwTimeout =  
      52. 2*pingRwTimeout, maxPingRwTimeout);  
      53. //同步测试下个server是否是R/W server,如果是则抛出RWServerFoundException
      54.                             pingRwServer();  
      55.                         }  
      56.                         to = Math.min(to, pingRwTimeout - idlePingRwServer);  
      57.                     }  
      58. //处理IO
      59. this);  
      60. catch
      61. if
      62. if
      63. // closing so this is expected
      64. "An exception was thrown while closing send thread for session 0x"
      65.                                     + Long.toHexString(getSessionId())  
      66. " : "
      67.                         }  
      68. break;  
      69. else
      70. // this is ugly, you have a better way speak up
      71. if (e instanceof
      72. ", closing socket connection");  
      73. else if (e instanceof
      74.                             LOG.info(e.getMessage() + RETRY_CONN_MSG);  
      75. else if (e instanceof
      76.                             LOG.info(e.getMessage() + RETRY_CONN_MSG);  
      77. else if (e instanceof
      78.                             LOG.info(e.getMessage());  
      79. else
      80.                 ......  
      81.                         }  
      82. //清理之前的连接,找下一台server连接
      83.                         cleanup();  
      84. if
      85. new
      86.                                     Event.EventType.None,  
      87.                                     Event.KeeperState.Disconnected,  
      88. null));  
      89.                         }  
      90.                         clientCnxnSocket.updateNow();  
      91.                         clientCnxnSocket.updateLastSendAndHeard();  
      92.                     }  
      93.                 }  
      94.             }  
      95.      ......  
      96.         }



       具体过程

       



      1. private void startConnect() throws
      2. //状态改为CONNETING
      3.             state = States.CONNECTING;  
      4. //拿目标地址
      5.             InetSocketAddress addr;  
      6. if (rwServerAddress != null) {  
      7.                 addr = rwServerAddress;  
      8. null;  
      9. else
      10. 1000);  
      11.             }  
      12.   
      13. "\\(.*\\)",  
      14. "(" + addr.getHostName() + ":" + addr.getPort() + ")"));  
      15.         ......  
      16. //异步连接
      17.             clientCnxnSocket.connect(addr);  
      18.         }

       具体connect

       




        1. void connect(InetSocketAddress addr) throws
        2. //创建客户端SocketChannel
        3.        SocketChannel sock = createSock();  
        4. try
        5. //注册OP_CONNECT事件,尝试连接
        6.           registerAndConnect(sock, addr);  
        7. catch
        8. "Unable to open socket to "
        9.            sock.close();  
        10. throw
        11.        }  
        12. //session还未初始化
        13. false;  
        14.   
        15. /*
        16.         * Reset incomingBuffer
        17.         */
        18. //重置2个读buffer,准备下一次读
        19.        lenBuffer.clear();  
        20.        incomingBuffer = lenBuffer;  
        21.    }



         registerAndConnect过程:

         



        1. void
        2. throws
        3.     sockKey = sock.register(selector, SelectionKey.OP_CONNECT);  
        4. 试连接  
        5. boolean
        6. 果网络情况很好,立马可以连上,则发送ConnectRequest请求,请求和server建立session  
        7. if
        8.         sendThread.primeConnection();  
        9.     }  
        10. }


         primeConnection代表连上之后的操作,主要是建立session:

         


        1. void primeConnection() throws
        2.             ......  
        3. //客户端sessionId默认为0
        4. long sessId = (seenRwServerBefore) ? sessionId : 0;  
        5. //构造连接请求
        6. new ConnectRequest(0, lastZxid,  
        7.                     sessionTimeout, sessId, sessionPasswd);  
        8. synchronized
        9.                 ......  
        10. //组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null
        11. new Packet(null, null, conReq,  
        12. null, null, readOnly));  
        13.             }  
        14. //确保读写事件都监听
        15.             clientCnxnSocket.enableReadWriteOnly();  
        16.             .....  
        17.         }



         此时ConnectRequest请求已经添加到发送队列,SendThread进入doTransport处理流程:

         


        1. void doTransport(int
        2.                     ClientCnxn cnxn)  
        3. throws
        4. //select
        5.        selector.select(waitTimeOut);  
        6.        Set<SelectionKey> selected;  
        7. synchronized (this) {  
        8.            selected = selector.selectedKeys();  
        9.        }  
        10. // Everything below and until we get back to the select is
        11. // non blocking, so time is effectively a constant. That is
        12. // Why we just have to do this once, here
        13.        updateNow();  
        14. for
        15.            SocketChannel sc = ((SocketChannel) k.channel());  
        16. //如果之前连接没有立马连上,则在这里处理OP_CONNECT事件
        17. if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {  
        18. if
        19.                    updateLastSendAndHeard();  
        20.                    sendThread.primeConnection();  
        21.                }  
        22.            }   
        23. //如果读写就位,则处理之
        24. else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {  
        25.                doIO(pendingQueue, outgoingQueue, cnxn);  
        26.            }  
        27.        }  
        28. if
        29. synchronized(outgoingQueue) {  
        30. if
        31. null) {  
        32.                    enableWrite();  
        33.                }  
        34.            }  
        35.        }  
        36.        selected.clear();  
        37.    }


         假设我们此时连接已经好了,WRITE事件ok,则SendThread开始发送我们的ConnectRequest

         


        1. if
        2. //同步处理
        3. synchronized(outgoingQueue) {  
        4. //从发送队列中拿请求
        5.                 Packet p = findSendablePacket(outgoingQueue,  
        6.                         cnxn.sendThread.clientTunneledAuthenticationInProgress());  
        7.   
        8. if (p != null) {  
        9. //修改上次发送时间
        10.                     updateLastSend();  
        11. // If we already started writing p, p.bb will already exist
        12. //序列化Packet到ByteBuffer
        13. if (p.bb == null) {  
        14. //如果是业务请求,则需要设置事务Id
        15. if ((p.requestHeader != null) &&  
        16.                                 (p.requestHeader.getType() != OpCode.ping) &&  
        17.                                 (p.requestHeader.getType() != OpCode.auth)) {  
        18.                             p.requestHeader.setXid(cnxn.getXid());  
        19.                         }  
        20. //序列化
        21.                         p.createBB();  
        22.                     }  
        23. //写数据
        24.                     sock.write(p.bb);  
        25. //写完了,太好了,发送成功
        26. if
        27. //已发送的业务Packet数量
        28.                         sentCount++;  
        29. //发送完了,那从发送队列删掉,方便后续发送请求处理
        30.                         outgoingQueue.removeFirstOccurrence(p);  
        31. //如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就扔了。。。
        32. if (p.requestHeader != null
        33.                                 && p.requestHeader.getType() != OpCode.ping  
        34.                                 && p.requestHeader.getType() != OpCode.auth) {  
        35. synchronized
        36.                                 pendingQueue.add(p);  
        37.                             }  
        38.                         }  
        39.                     }  
        40.                 }  
        41. //请求发完了,不需要再监听OS的写事件了,如果没发完,那还是要继续监听的,继续写嘛
        42. if
        43. // No more packets to send: turn off write interest flag.
        44. // Will be turned on later by a later call to enableWrite(),
        45. // from within ZooKeeperSaslClient (if client is configured
        46. // to attempt SASL authentication), or in either doIO() or
        47. // in doTransport() if not.
        48.                     disableWrite();  
        49. else
        50. // Just in case
        51.                     enableWrite();  
        52.                 }  
        53.             }  
        54.         }



         具体序列化方式,ConnRequest的packet没有协议头

         



        1. public void
        2. try
        3. new
        4.                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);  
        5. //写一个int,站位用,整个packet写完会来更新这个值,代表packet的从长度,4个字节
        6. 1, "len"); // We'll fill this in later
        7. //序列化协议头
        8. if (requestHeader != null) {  
        9. "header");  
        10.                }  
        11. //序列化协议体
        12. if (request instanceof
        13. "connect");  
        14. // append "am-I-allowed-to-be-readonly" flag
        15. "readOnly");  
        16. else if (request != null) {  
        17. "request");  
        18.                }  
        19.                baos.close();  
        20. //生成ByteBuffer
        21. this.bb = ByteBuffer.wrap(baos.toByteArray());  
        22. //将bytebuffer的前4个字节修改成真正的长度,总长度减去一个int的长度头
        23. this.bb.putInt(this.bb.capacity() - 4);  
        24. //准备给后续读
        25. this.bb.rewind();  
        26. catch
        27. "Ignoring unexpected exception", e);  
        28.            }  
        29.        }


        这里我们的第一个Packet是ConnReq,它构造的packet没有header,所以发完就直接丢掉了,但是SendThread还需要监听server端的返回,以确认连上,并进行session的初始化。那到这里client端等待server端返回了,我们看看server是怎么处理ConnReq请求的。

        假设server的selector线程已经就位,则selector会拿到一个读就位的事件,也就是client的connReq请求




          1. else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {  
          2.                         NIOServerCnxn c = (NIOServerCnxn) k.attachment();  
          3.                         c.doIO(k);



             if (k.isReadable()) {




            1. //先从Channel读4个字节,代表头
            2. int
            3. if (rc < 0) {  
            4. throw new
            5. "Unable to read additional data from client sessionid 0x"
            6.                           + Long.toHexString(sessionId)  
            7. ", likely client has closed socket");  
            8.               }  
            9. //int读好,继续往下读
            10. if (incomingBuffer.remaining() == 0) {  
            11. boolean
            12. //2个一样,就可以继续读下一个请求了
            13. if (incomingBuffer == lenBuffer) { // start of next request
            14.                       incomingBuffer.flip();  
            15. //给incomingBuffer分配一个length长度的内存,将后续的数据都给读进来
            16.                       isPayload = readLength(k);  
            17. //clear一下,准备写
            18.                       incomingBuffer.clear();  
            19. else
            20. // continuation
            21. true;  
            22.                   }  
            23. //好,读后续数据
            24. if (isPayload) { // not the case for 4letterword
            25.                       readPayload();  
            26.                   }  
            27. else
            28. // four letter words take care
            29. // need not do anything else
            30. return;  
            31.                   }  
            32.               }  
            33.           }



             具体的后续数据流程:

             




            1. /** Read the request payload (everything following the length prefix) */
            2. private void readPayload() throws
            3. if (incomingBuffer.remaining() != 0) { // have we read length bytes?
            4. //尝试一次读进来
            5. int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            6. if (rc < 0) {  
            7. throw new
            8. "Unable to read additional data from client sessionid 0x"
            9.                         + Long.toHexString(sessionId)  
            10. ", likely client has closed socket");  
            11.             }  
            12.         }  
            13. //哈哈,一次读完
            14. if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            15. //server的packet统计
            16.             packetReceived();  
            17. //准备使用这个buffer了
            18.             incomingBuffer.flip();  
            19. //嘿嘿,如果CoonectRequst还没来,那第一个packet肯定是他了
            20. if
            21.                 readConnectRequest();  
            22.             }   
            23. //处理请他请求
            24. else
            25.                 readRequest();  
            26.             }  
            27. //清理现场,为下一个packet读做准备
            28.             lenBuffer.clear();  
            29.             incomingBuffer = lenBuffer;  
            30.         }  
            31.     }



             我们现在发的ConnReq已经被server端接受了,处理如下

             



            1. private void readConnectRequest() throws
            2. if (zkServer == null) {  
            3. throw new IOException("ZooKeeperServer not running");  
            4.        }  
            5. //开始执行ConnectRequest的处理链
            6. this, incomingBuffer);  
            7. //处理完了,说明业务连接已经建立好了
            8. true;  
            9.    }



             具体处理:

             



              1. public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws
              2. //ConnectReq的packet是没有header的,所以直接读内容,反序列化
              3. new
              4. new
              5. "connect");  
              6.         ...  
              7. boolean readOnly = false;  
              8. try
              9. //是否readOnly
              10. "readOnly");  
              11. false;  
              12. catch
              13.           ....  
              14.         }  
              15.         ...  
              16. //设置客户端请求的session相关参数
              17. int
              18. byte
              19. int
              20. if
              21.             sessionTimeout = minSessionTimeout;  
              22.         }  
              23. int
              24. if
              25.             sessionTimeout = maxSessionTimeout;  
              26.         }  
              27.         cnxn.setSessionTimeout(sessionTimeout);  
              28. // We don't want to receive any packets until we are sure that the
              29. // session is setup
              30. //暂时先不读后续请求了,直到session建立
              31.         cnxn.disableRecv();  
              32. //拿客户端的sessionId
              33. long
              34. //重试
              35. if (sessionId != 0) {  
              36. long
              37. "Client attempting to renew session 0x"
              38.                     + Long.toHexString(clientSessionId)  
              39. " at "
              40.             serverCnxnFactory.closeSession(sessionId);  
              41.             cnxn.setSessionId(sessionId);  
              42.             reopenSession(cnxn, sessionId, passwd, sessionTimeout);  
              43. else
              44. "Client attempting to establish new session at "
              45.                     + cnxn.getRemoteSocketAddress());  
              46. //创建新Session
              47.             createSession(cnxn, passwd, sessionTimeout);  
              48.         }  
              49.     }



               创建新session如下:

               



              1. long createSession(ServerCnxn cnxn, byte passwd[], int
              2. //server端创建session,sessionId自增
              3. long
              4. //随机密码
              5. new
              6.        r.nextBytes(passwd);  
              7. 4);  
              8.        to.putInt(timeout);  
              9. //每个server端连接都有一个唯一的SessionId
              10.        cnxn.setSessionId(sessionId);  
              11. //提交请求给后面的执行链
              12. 0, to, null);  
              13. return
              14.    }


              提交过程:



              1. private void submitRequest(ServerCnxn cnxn, long sessionId, int
              2. int
              3. new
              4.         submitRequest(si);  
              5.     }


                Server端开始执行链,参数是内部的Request对象,此时type是CREATE_SESSION:

               



              1. public void
              2.        ......  
              3. try
              4.             touch(si.cnxn);  
              5. boolean
              6. if
              7. //提交给后续的processor执行,一般用异步以提升性能
              8.                 firstProcessor.processRequest(si);  
              9. if (si.cnxn != null) {  
              10.                     incInProcess();  
              11.                 }  
              12.        ......  
              13.     }


               第一个processor PrepRequestProcessor执行:

               



              1. public void
              2. try
              3. while (true) {  
              4.                 Request request = submittedRequests.take();  
              5.                 ......  
              6.                 pRequest(request);  
              7.             }  
              8.       ......  
              9.     }



               对于CREATE_SESSION具体处理:

               


              1. //create/close session don't require request record
              2. case
              3. case
              4. //在这里,组装了Request的header和txh实现,方便后续processor处理
              5. null, true);  
              6. break;  
              7.         ......  
              8.     request.zxid = zks.getZxid();  
              9. //让后续processor处理,这里一般是异步以提高性能
              10.         nextProcessor.processRequest(request);



              1. case
              2. //读session超时值
              3.                request.request.rewind();  
              4. int
              5. //组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理
              6. new
              7.                request.request.rewind();  
              8.                zks.sessionTracker.addSession(request.sessionId, to);  
              9.                zks.setOwner(request.sessionId, request.getOwner());  
              10. break;


                从上可见,PrepRequestProcessor主要是负责组装Request的header和txn参数的,相当于是预处理

              第二个Processor SyncRequestProcessor处理:

               


              1. int randRoll = r.nextInt(snapCount/2);  
              2. while (true) {  
              3. null;  
              4. //flush队列如果为空,阻塞等待,代表之前的请求都被处理了
              5. if
              6.                     si = queuedRequests.take();  
              7.                 }   
              8. //如果不为空,就是说还有请求等待处理,先非阻塞拿一下,如果系统压力小,正好没有请求进来,则处理之前积压的请求
              9. //如果系统压力大,则可能需要flush满1000个才会继续处理
              10. else
              11.                     si = queuedRequests.poll();  
              12. //任务queue空闲,处理积压的待flush请求
              13. if (si == null) {  
              14.                         flush(toFlush);  
              15. continue;  
              16.                     }  
              17.                 }  
              18. if
              19. break;  
              20.                 }  
              21. if (si != null) {  
              22. // track the number of records written to the log
              23. //将Request append到log输出流,先序列化再append,注意此时request还没flush到磁盘,还在内存呢
              24. if
              25. //成功计数器
              26.                         logCount++;  
              27. //如果成功append的request累计数量大于某个值,则执行flush log的操作
              28. //并启动一个线程异步将内存里的Database和session状态写入到snapshot文件,相当于一个checkpoint
              29. //snapCount默认是100000
              30. if (logCount > (snapCount / 2
              31. 2);  
              32. // roll the log
              33. //将内存中的log flush到磁盘
              34.                             zks.getZKDatabase().rollLog();  
              35. // take a snapshot
              36. //启动线程异步将内存中的database和sessions状态写入snapshot文件中
              37. if (snapInProcess != null
              38. "Too busy to snap, skipping");  
              39. else
              40. new Thread("Snapshot Thread") {  
              41. public void
              42. try
              43.                                                 zks.takeSnapshot();  
              44. catch(Exception e) {  
              45. "Unexpected exception", e);  
              46.                                             }  
              47.                                         }  
              48.                                     };  
              49.                                 snapInProcess.start();  
              50.                             }  
              51. 0;  
              52.                         }  
              53.                     }  
              54. //如果是写请求,而且flush队列为空,执行往下执行 
              55. else if
              56. // optimization for read heavy workloads
              57. // iff this is a read, and there are no pending
              58. // flushes (writes), then just pass this to the next
              59. // processor
              60.                         nextProcessor.processRequest(si);  
              61. if (nextProcessor instanceof
              62.                             ((Flushable)nextProcessor).flush();  
              63.                         }  
              64. continue;  
              65.                     }  
              66. //写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理
              67.                     toFlush.add(si);  
              68. //如果系统压力大,可能需要到1000个request才会flush,flush之后可以被后续processor处理
              69. if (toFlush.size() > 1000) {  
              70.                         flush(toFlush);  
              71.                     }  
              72.                 }



               具体的flush处理:

               



              1. private void
              2. throws
              3.     {  
              4. if
              5. return;  
              6. //将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄
              7.         zks.getZKDatabase().commit();  
              8. //log flush完后,开始处理flush队列里的Request
              9. while
              10.             Request i = toFlush.remove();  
              11. //执行后面的processor
              12.             nextProcessor.processRequest(i);  
              13.         }  
              14. if (nextProcessor instanceof
              15.             ((Flushable)nextProcessor).flush();  
              16.         }  
              17.     }



               我们假设现在系统压力小,我们的ConnectionRequest可以被立马处理了,执行FinalRequestProcessor:

               





              1. if (request.hdr != null) {  
              2.                TxnHeader hdr = request.hdr;  
              3.                Record txn = request.txn;  
              4. //对于事务型请求,处理之
              5.                rc = zks.processTxn(hdr, txn);  
              6.             }


               具体处理:

               


              1. public
              2.         ProcessTxnResult rc;  
              3. int
              4. long
              5. //进一步调用database来处理事务
              6.         rc = getZKDatabase().processTxn(hdr, txn);  
              7. //如果是创建session,添加session
              8. if
              9. if (txn instanceof
              10.                 CreateSessionTxn cst = (CreateSessionTxn) txn;  
              11.                 sessionTracker.addSession(sessionId, cst  
              12.                         .getTimeOut());  
              13.       ......  
              14. return
              15.     }


               public ProcessTxnResult processTxn(TxnHeader header, Record txn)




              1.    {  
              2. //在这里构造一个Result对象,返回给FinalRequestProcessor
              3. new
              4.   
              5. try
              6.            rc.clientId = header.getClientId();  
              7.            rc.cxid = header.getCxid();  
              8.            rc.zxid = header.getZxid();  
              9.            rc.type = header.getType();  
              10. 0;  
              11. null;  
              12. ......


               在FinalRequestProcessor拿到database的处理结果,继续处理:

               



              1. case
              2.                 zks.serverStats().updateLatency(request.createTime);  
              3.   
              4. "SESS";  
              5.                 cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,  
              6.                         request.createTime, System.currentTimeMillis());  
              7. //在这里写回response
              8. true);  
              9. return;  
              10.             }


               public void finishSessionInit(ServerCnxn cnxn, boolean valid) {



              1.       ......  
              2. //构造一个返回对象,返回协商的sessionTimeout,唯一的sessionId和client的密码
              3. new ConnectResponse(0, valid ? cnxn.getSessionTimeout()  
              4. 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
              5. // longer valid
              6. new byte[16]);  
              7. new
              8.           BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);  
              9. //用-1占位
              10. 1, "len");  
              11. //序列化内容
              12. "connect");  
              13. if
              14.               bos.writeBool(  
              15. this instanceof ReadOnlyZooKeeperServer, "readOnly");  
              16.           }  
              17.           baos.close();  
              18.           ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());  
              19. //将之前的-1改成真实的长度
              20. 4).rewind();  
              21. //通过channel写回
              22.           cnxn.sendBuffer(bb);      
              23.   
              24.           ......  
              25. //打开selector的读事件
              26.           cnxn.enableRecv();  
              27.       ......  
              28.   }



               具体写回,通讯层NIOServerCnxn:

               



              1. public void
              2. try
              3. if
              4. // We check if write interest here because if it is NOT set,
              5. // nothing is queued, so we can try to send the buffer right
              6. // away without waking up the selector
              7. //确保可写
              8. if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {  
              9. try
              10. //写回client
              11.                         sock.write(bb);  
              12. catch
              13. // we are just doing best effort right now
              14.                     }  
              15.                 }  
              16. // if there is nothing left to send, we are done
              17. //一次写完了,太好了
              18. if (bb.remaining() == 0) {  
              19.                     packetSent();  
              20. return;  
              21.                 }  
              22.             }  
              23. //如果一次没写完,添加到输出队列,后续继续写
              24. synchronized(this.factory){  
              25.                 sk.selector().wakeup();  
              26. if
              27. "Add a buffer to outgoingBuffers, sk "
              28. " is valid: "
              29.                 }  
              30.                 outgoingBuffers.add(bb);  
              31. if
              32.                     sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);  
              33.                 }  
              34.             }  
              35.               
              36.         .......  
              37.     }


               到这里server端已经执行完毕了,返回给client一个ConnectResponse对象,client端的SendThread收到server端的Response处理:

               




              1. void
              2. throws
              3.         SocketChannel sock = (SocketChannel) sockKey.channel();  
              4. if (sock == null) {  
              5. throw new IOException("Socket is null!");  
              6.         }  
              7. if
              8. //先读包的长度,一个int
              9. int
              10. if (rc < 0) {  
              11. throw new
              12. "Unable to read additional data from server sessionid 0x"
              13.                                 + Long.toHexString(sessionId)  
              14. ", likely server has closed socket");  
              15.             }  
              16. //如果读满,注意这里同一个包,要读2次,第一次读长度,第二次读内容,incomingBuffer重用
              17. if
              18.                 incomingBuffer.flip();  
              19. //如果读的是长度
              20. if
              21.                     recvCount++;  
              22. //给incomingBuffer分配包长度的空间
              23.                     readLength();  
              24.                 }   
              25. //如果还未初始化,就是session还没建立,那server端返回的必须是ConnectResponse       
              26. else if
              27. //读取ConnectRequest,其实就是将incomingBuffer的内容反序列化成ConnectResponse对象
              28.                     readConnectResult();  
              29. //继续读后续响应
              30.                     enableRead();  
              31. //如果还有写请求,确保write事件ok
              32. if
              33. null) {  
              34. // Since SASL authentication has completed (if client is configured to do so),
              35. // outgoing packets waiting in the outgoingQueue can now be sent.
              36.                         enableWrite();  
              37.                     }  
              38. //准备读下一个响应
              39.                     lenBuffer.clear();  
              40.                     incomingBuffer = lenBuffer;  
              41.                     updateLastHeard();  
              42. //session建立完毕
              43. true;  
              44. else
              45.                     sendThread.readResponse(incomingBuffer);  
              46.                     lenBuffer.clear();  
              47.                     incomingBuffer = lenBuffer;  
              48.                     updateLastHeard();  
              49.                 }  
              50.             }  
              51.         }



               具体的读取:

               



              1. void readConnectResult() throws
              2.         .....  
              3. //将incomingBuffer反序列化成CoonectResponse
              4. new
              5.         BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);  
              6. new
              7. "connect");  
              8.   
              9. // read "is read-only" flag
              10. boolean isRO = false;  
              11. try
              12. "readOnly");  
              13. catch
              14. // this is ok -- just a packet from an old server which
              15. // doesn't contain readOnly field
              16. "Connected to an old server; r-o mode will be unavailable");  
              17.         }  
              18. //server返回的sessionId
              19. this.sessionId = conRsp.getSessionId();  
              20. //后续处理,初始化client的一些参数,最后触发WatchedEvent
              21. this.sessionId,  
              22.                 conRsp.getPasswd(), isRO);  
              23.     }


               后续处理如下:

               




                1. void onConnected(int _negotiatedSessionTimeout, long
                2. byte[] _sessionPasswd, boolean isRO) throws
                3.             negotiatedSessionTimeout = _negotiatedSessionTimeout;  
                4.             ......  
                5. //初始化client端的session相关参数
                6. 2 / 3;  
                7.             connectTimeout = negotiatedSessionTimeout / hostProvider.size();  
                8.             hostProvider.onConnected();  
                9.             sessionId = _sessionId;  
                10.             sessionPasswd = _sessionPasswd;  
                11. //修改CONNECT状态
                12.             state = (isRO) ?  
                13.                     States.CONNECTEDREADONLY : States.CONNECTED;  
                14.             seenRwServerBefore |= !isRO;  
                15. "Session establishment complete on server "
                16.                     + clientCnxnSocket.getRemoteSocketAddress()  
                17. ", sessionid = 0x"
                18. ", negotiated timeout = "
                19. " (READ-ONLY mode)" : ""));  
                20. //触发一个SyncConnected事件,这里有专门的EventThread会异步通知注册的watcher来处理
                21.             KeeperState eventState = (isRO) ?  
                22.                     KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;  
                23. new
                24.                     Watcher.Event.EventType.None,  
                25. null));  
                26.         }


                 EventThread处理:

                 




                  1. public void
                  2. if
                  3.                   && sessionState == event.getState()) {  
                  4. return;  
                  5.           }  
                  6. //EventThread同步session状态
                  7.           sessionState = event.getState();  
                  8.   
                  9. // materialize the watchers based on the event
                  10. //找出那些需要被通知的watcher,主线程直接调用对应watcher接口即可
                  11. new
                  12.                   watcher.materialize(event.getState(), event.getType(),  
                  13.                           event.getPath()),  
                  14.                           event);  
                  15. // queue the pair (watch set & event) for later processing
                  16. //提交异步队列处理
                  17.           waitingEvents.add(pair);  
                  18.       }


                   EventThread主线程

                   




                    1. public void
                    2. try
                    3. true;  
                    4. while (true) {  
                    5. //拿事件
                    6.                  Object event = waitingEvents.take();  
                    7. if
                    8. true;  
                    9. else
                    10. //处理
                    11.                     processEvent(event);  
                    12.                  }  
                    13. if
                    14. synchronized
                    15. if
                    16. false;  
                    17. break;  
                    18.                        }  
                    19.                     }  
                    20.               }  
                    21. catch
                    22. "Event thread exiting due to interruption", e);  
                    23.            }  
                    24.   
                    25. "EventThread shut down");  
                    26.         }



                     具体处理:

                     



                    1. if (event instanceof
                    2. // each watcher will process the event
                    3.                   WatcherSetEventPair pair = (WatcherSetEventPair) event;  
                    4. for
                    5. try
                    6.                           watcher.process(pair.event);  
                    7. catch
                    8. "Error while calling watcher ", t);  
                    9.                       }  
                    10.                   }  
                    11.               }


                     在我们的例子里,会调用Executor这个watcher的process方法,又代理给了DataMonitor,对于SyncConnected啥事不干

                     



                    1. case
                    2. // In this particular example we don't need to do anything
                    3. // here - watches are automatically re-registered with
                    4. // server and any watches triggered while the client was
                    5. // disconnected will be delivered (in order of course)
                    6. break;


                     

                    好了,到这里client和server端session已经建立,可以进行后续的业务处理了。通过这个例子,我们讲解了client和server是如何交互数据,后续的请求比如create,get,set,delete都是类似流程。

                    Session建立核心流程:

                    1.创建TCP连接

                    2.client发送ConnectRequest包

                    http://iwinit.iteye.com/blog/1754611

                    3.server收到ConnectRequest包,创建session,将server端的sessionId返回给client

                    4.client收到server的响应,触发相应SyncConnected状态的事件

                    5.client端watcher消费事件

                     

                    标签:11,10,Zookeeper,深入浅出,server,Session,new,12,null
                    From: https://blog.51cto.com/u_2650279/6872511

                    相关文章

                    • ZooKeeper
                      1Zookeeper介绍Zookeeper是一个分布式的协调服务,为分布式应用程序提供synchronization、configurationmaintenance、groups和nameing服务。Zookeeper是一个有众多服务器节点组成的集群,这些节点中有一个主节点(leader),leader是通过leaderselection自动地从服务器节点中选举出来。Zo......
                    • Cookie和Session详解
                      二者的作用Cookie是存在于客户端的“客户通行证”。Session是存在于服务端的“客户档案表”。二者的作用都是跟踪用户的整个会话。Cookie:产生的缘由:一个用户的所有请求操作对应一个会话,另一个用户则对应另一个会话,但是由于HTTP协议的无状态特性,服务器无法单从连接上跟踪到......
                    • java session 关闭页面 失效
                      JavaSession关闭页面失效1.流程图以下是实现JavaSession关闭页面失效的流程图:![流程图](2.详细步骤按照上面的流程图,我们需要完成以下步骤来实现JavaSession关闭页面失效。步骤动作1.设置Session失效时间2.在页面中获取Session对象3.判断S......
                    • 【深入浅出】你必须知道的 InnoDB 锁(二)
                      ......
                    • cookie+session(这里使用redistemplate代替)实现单点登录流程
                       user发起资源请求(带上回调的路径方便回调),通过判断是否浏览器的cookie中是否存在登录过的痕迹,比如有人登了,然后存了一个cookie到浏览器如果拿到了cookie是有东西的,则带上这个cookie的内容返回给client,如果没有东西,则继续登录,向session中存入userInfo,并给浏览器设置cookie......
                    • APP - Appium-Inspector连接报错Failed to create session, The requested resource c
                      APP-Appium-Inspector连接报错Failedtocreatesession,Therequestedresourcecouldnotbefoundappium版本:Appium-Server-GUI-windows-1.22.3-4Appium-Inspector版本:Appium-Inspector-windows-2022.5.4填写好参数连接时报错: 错误信息:错误Failedtocreatesess......
                    • springsession 配置redis集群
                      SpringSession配置Redis集群教程1.流程概述在本教程中,我们将详细介绍如何使用SpringSession来配置Redis集群。整个流程可以总结为以下几个步骤:添加SpringSession和Redis依赖配置Redis集群连接信息配置SpringSession使用Redis集群测试SpringSession与Redis集群的连接......
                    • springcloud- 分布式session,全局session共享的解决方案
                       1.导入依赖     <dependency>       <groupId>org.springframework.session</groupId>       <artifactId>spring-session-data-redis</artifactId> <!--     <version>2.3.0.RELEASE</version>-->......
                    • session有效期内登录
                      fromdjango.utils.deprecationimportMiddlewareMixinfromdjango.shortcutsimportrender,redirectclassMyMiddleware(MiddlewareMixin):defprocess_request(self,request):path=["/Login","/register"]print(request.path_inf......
                    • php cookie与session
                      1、cookie1.1创建cookiesetcookie(name,value,expire,path,domain);注释:在发送cookie时,cookie的值会自动进行URL编码,在取回时进行自动解码。(为防止URL编码,请使用setrawcookie()取而代之。)setcookie("user","runoob",time()+3600);1.2获取cookie//输出coo......