参看下面,第1步至第10步
涉及网络连接的只和QuorumCnxnManager有关系,和FastLeaderElection没太多关系。
多个服务间,只能myid大的作为客户端,myid小的作为服务端。
一个节点中既有客户端又有服务端(排除最小,最大)
QuorumCnxManager.java-run()-new ListenerHandler-run()-acceptConnections();-while((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry))//带重试机制的创建BIO服务端和绑定端口-serverSocket = createNewServerSocket();//BIO的服务端创建-socket = new ServerSocket();-socket.bind(address);-client = serverSocket.accept();//服务端连接的建立-receiveConnection(client);//服务端接收客户端发来的连接,完成连接-din = new DataInputStream(new BufferedInputStream(sock.getInputStream));//初始化一个输入流,读取myid-handleConnection(sock,din);//客户端会写四个信息过来 1.protocolVersion 2.myid(关键) 3.address length 4.address-protocolVersion = din.readLong();//读取8个字节的数据,协议版本号-InitialMessage init = InitialMessage.parse(protocolVersion,din);//对输入流封装成一个对象-sid = init.sid;//对象中有个sid是myid 对方id客户端id,现在是服务端,当对比myid大小后确定谁是客户端谁是服务端-if(sid < self.getId())//如果对方myid小于自己的myid,对建立的连接进行关闭,并向对方发送连接请求-connectOne(sid);//此方法只要在当前节点执行,说明为客户端,即服务端变成客户端-connectOne(sid,electionAddr);-initiateConnectionAsync(electionAddr,sid);-connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr,sid));//往线程池中提交任务-QuorumConnectionReqThread-run()-initiateConnection(electionAddr,sid);//两个参数为对方的-sock = SOCKET_FACTORY.get();//创建socket,创建BIO的客户端-sock.connect(electionAddr.getReachableOrOne(),cnxTO);//发起连接请求,客户端向服务端,完成物理连接:建立了TCP连接//对应服务端代码Listener里ListenerHandler线程的ServerSocket.accept()-startConnection(sock,sid);//开始逻辑连接,抽象,封装,基本动作//此处为写出四个信息给服务端-BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());//初始化输出流-dout = new DataOutputStream(buf);-dout.writeLong(protocoVersion);-dout.writeLong(self.getId());-dout.writeInt(addr_bytes.length);-dout.write(addr_bytes);-if(sid>self.getId())//如果对方mydi比自己大,关闭连接(此时为客户端)-else//初始化一组工作组件,即发送线程(其中还有个接收线程作为成员变量),发送队列-else if(sid == self.getId())//如果对方myid等于自己的myid 不太可能-else//此处才是合法的,然后做各种逻辑处理//发送线程(其中还有个接收线程作为成员变量),发送队列
ZooKeeperServer四种状态:LOOKING LEADING FOLLOWING OBSERVING
ZAB四种状态:ELECTION DISCOVERY SYNCRONIZATION BROADCAST
崩溃恢复:本来时LEADING/FOLLOWING状态,变成LOOKING状态,发起选举,不能对外提供服务
while(running){switch(getPeerState()){case LOOKING: lookForLeader(); //ELECTIONcase LEADING:lead(); //DISCOVERY SYNCRONIZATION BROADCASTcase FOLLOWING:followerLeader(); //DISCOVERY SYNCRONIZATION BROADCASTcase OBSERVING:observerLeader();}
}QuorumPeer.java
-run()-while(running)-switch(getPeerState)-case LOOKING-setCurrentVote(makeLEStrategy().lookForLeader());//存储选举算法推举的Leader的信息//lookForLeader()执行逻辑选举的入口-lookForLeader()//进入FastLeaderElection.java-case OBSERVING-observer.observeLeader();//observer跟随leader-case FOLLOWING-setFollower(makeFollower(logFactory));//创建一个follower,设置为QuorumPeer的一个变量-follower.followLeader();-case LEADING-setLeader(makeLeader(logFactory));-leader.lead();-setLeader(null);FastLeaderElection.java
-lookForLeader-self.start_fle = Time.currentElapsedTime();//选举开始时间-Map recvset = new HashMap<~>();//合法投票的集合,入宫zk有7台服务器,则recvset最多有7个,标记是myid-logicalclock.incrementAndGet();//逻辑时钟自增,初始化为0,incrementAndGet为1,执行第一次选举-updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());//更新选票-proposedLeader = leader;//三个成员变量,被推举者的信息,票信息-proposedZxid = zxid;-proposedEpoch = epoch;-sendNotifications();//广播选票给其他服务器,选票交换的入口,Vote、ToSend、Message都可理解为选票,此时对象为Notification,触发与其他服务器的连接动作//第1步-for(long sid:self.getCurrentAndNextConfigVotes())//给每一个服务器发一张票-ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch,qv.toString().getBytes());//第2步,构建ToSend对象-sendqueue.offer(notmsg);//第2步,放入队列,WorkerSender线程接收队列-Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);//获取投票,等待所有其他发自己的选票,sendNotifications和recvqueue构成一个完整的选票交换,中间经历10步-//后面是选举逻辑执行-if(n==null)//为空情况:自己给对方发,对方没有给自己发,重发,-if(manager.haveDelivered())//如果发过了,再发一次sendNotifications()-else//自己没有发,manager.connectAll()连接所有节点重发-int tmpTimeOut = notTimeout * 2;//不超过60s,超时时间每次翻倍,不是固定间隔时间-else if(validVoter(n.sid) && validVoter(n.leader))//做合法性校验//发选票的节点有选举权和被选举权,并且被选举的节点有被选举权和选举权,必须再votingMembers集合中-return self.getCurrentAndNextConfigVoters().contains(sid);-Set voterIds = new HashSet<>(getQuorumVerifier().getVotingMembers().keySet());-switch(n.state)//对方服务器状态,WorkerRecevier线程中处理的是自己状态-case LOOKING://对方也在选举-if(n.electionEpoch>logicalclock.get())//先统一逻辑时钟,和对方统一,清空自己投票//然后做选票对比,对方更优,更新选票为对方的,否则,用自己的//最后重新广播选票-else if(n.electionEpoch < logicalclock.get())//不进行处理-else if()//逻辑时钟相同,也会进行选票更新,为万一下一轮选举做准备-recvset.put(n.sid,new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch))//经过合法校验,经过逻辑时钟校验,放入recvset票箱-voteSet = getVoteTracker(recvset,new Vote(proposedLeader,proposedZxid,logicalclock.get(),proposedEpoch));//接收到的是n,统计n的选票集合,为什么统计n的?因为如果在n之前有合适的选票就不会进入这轮了,因此只有n的票能触发结束-if(voteSet.hasAllQuorums)//用n的票数去判断是否超过半数,如果是就结束了-hasAllQuorums()//进入SyncedLearnerTracker.java-while((n==recvqueue.poll(finalizeWait,TimeUnit.MILLSECONDS))//额外获取票,做一个保险-if(n==null)//表示没有更多的票了-Vote endVote = new Vote(proposedLeader,proposedZxid,logicalclock.get(),proposedEpoch);//存储最终胜选结果-return endVote;//lookForLeader方法的结果//什么情况下回返回呢?//当接收到一张新的票,会对这张票做合法性校验//校验通过后,再做逻辑时钟的统一//统一后,把票放到票箱voteSet//把票的个数做个统计,做超过半数的判断//超过半数,再获取一张票(保险那里)//如果没有更多票,即n==null,返回结果-setPeerState(proposedLeader,voteSet);//更改服务器的状态-ServerState ss = (proposedLeader == self.getId())?ServerState.LEADING:learningState();//如果推举的myid等自己的myid,把当前服务器置为LEADING状态-learningState();-return ServerState.FOLLOWING;-return ServerState.OBSERVING;-case OBSERVING://不处理-case FOLLOWING://直接返回follower信息,此处返回n的信息-case LEADING://直接返回leader信息-Vote endVote = new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch);//endVote就是n的信息,此处返回n的信息-WorkerSender-run()-ToSend m = sendqueue.poll(3000,TimeUnit.MILLISECONDS);//第3步,获取sendqueue队列中的选票,获取ToSend-process(m);//第3步,处理-ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),m.leader,m.zxid,m.electionEpoch,m.configData);//将ToSend构建成ByteBuffer-manager.toSend(m.sid,requestBuffer);//发送,m.sid对方服务器myid,manager为QuorumCnxManager
-WorkerReceiver
//第8步-run()-response = manager.pollRecvQueue(1000,TimeUnit.MILLSECONDS);//负责消费recvQueue队列,response为Message-Notification n = new Notification();//构建对象,response经过校验,设置到n中-if(!validVoter(response.sid))//校验对方myid是否有选举权,即是否在votingMembers集合中,如果没有重发,加入sendqueue队列,也是构建成ToSend-ToSend notmsg = new ToSend();-sendqueue.offer(notmsg);//第10步-else//第9步-if(self.getPeerState() == QuorumPeer.ServerState.LOOKING)//如果自己的状态等于LOOKING,证明正在执行选举-recvqueue.offer(n);//存起合法的票-if((ackstate == QuorumPeer.ServerState.LOOKING)&&(n.electionEpoch < logicalclock.get()))//如果对方是LOOKING,但是对方逻辑时钟小于当前轮次,则重发,也是构建ToSend对象发送-else-Vote current = self.getCurrentVote();//获取当前推举的结果,将leader信息返回,也是ToSend对象放入队列QuorumCnxManager.java
-toSend-if(this.mySid == sid)//sid为目标id,等于表示是自己,此处发给自己//第4步-addToRecvQueue(new Message(b.duplicate(),sid));//ByteBuffer构建成Message对象-final boolean success = this.recvQueue.offer(msg);//-else//第4步-BlockingQueue bq = queueSendMap.computeIfAbsent(sid,serverId->new CircularBlockingQueue<>(SEND_CAPACITY));//如果队列不存在,new CircularBlockingQueue初始化-addToSendQueue(bq,b);//发给别人,依然是ByteBuffer没变-connectOne(sid);//真正去联系对方,去找connectOne(xx,xx)-initiateConnectionAsync(electionAddr,sid);-connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr,sid));-QuorumConnectionReqThread-run()-initiateConnection(electionAddr,sid);-startConnection(sock,id);-BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());-dout = new DataOutputStream(buf);-dout.writeLong(protocolVersion);-dout.writeLong(self.getId());-dout.writeInt(addr_bytes.length);-dout.write(addr_bytes);-dout.flush();-SendWorker sw = new SendWorker(sock,id);//第5步-run()-BlockingQueue bq = queueSendMap.get(sid);//根据myid找Map中的队列-ByteBuffer b = lastMessageSent.get(sid);//如果bq队列为空,无票-send(b);//如果bq队列为空-b = pollSendQueue(bq,1000,TimeUnit.MILLISECONDS);//如果bq队列不为空,有票-lastMessageSent.put(sid,b);//用来记录最近一个票-send(b);//如果bq队列不为空-dout.writeInt(b.capacity());//输出流,选票长度-dout.write(b.array());//选票数据-dout.flush();//发送-RecvWorker rw = new RecvWorker(sock,din,sid,sw);//第6步-run()-int lenght = din.readInt();//长度-din.readFully(msgArray,0,length);//数据-addToRecvQueue(new Message(ByteBuffer.wrap(msgArray),sid));//第7步//将二进制字节包装成Message入队-final boolean success = this.recvQueue.offer(msg);-pollRecvQueue//消费recvQueue队列//FastLeaderElection.java中WorkerReceiver线程manager调用该方法-return this.recvQueue.poll(timeout,unit);SyncedLearnerTracker.java
-hasAllQuorums()-if(!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))-containsQuorum//进入QuorumMaj.javaQuorumMaj.java
-containsQuorum(Set ackSet)-return (ackSet.size()>half);//ackSet选票集合,half是votingMembers半数