Zookeeper - 会话管理和事务处理

春衣少年当酒歌,起舞四顾以笑和。这篇文章主要讲述Zookeeper - 会话管理和事务处理相关的知识,希望能为你提供帮助。

本章围绕两个主题展开,会话管理和事务的处理(数据处理)流程,目的是了解下zk的CS模式是如何运转的。
一、服务端启动详细的启动过程可参考??https://blog.51cto.com/arch/5363898?? 文章中的描述,那么服务端启动后主要要做哪些工作呢,主要有以下三个:
  1. 设置默认Watcher;
  2. 设置Zookeeper服务器地址列表;
  3. 创建ClientCnxn;
二、会话管理2.1、创建
会话创建的流程图如下所示:

2.1.1、ServerAddressList在实例化Zookeeper时,用户传入Zookeeper服务器地址列表,如192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,此时,Zookeeper客户端在连接服务器的过程中,Zookeeper收到服务器地址列表后,会解析出chrootPath和保存服务器地址列表:
  1. Chroot,每个客户端可以设置自己的命名空间,若客户端设置了Chroot,此时,该客户端对服务器的任何操作都将被限制在自己的命名空间下,如设置Choot为/app/X,那么该客户端的所有节点路径都是以/app/X为根节点;
  2. 地址列表管理,Zookeeper使用StaticHostProvider打散服务器地址(shuffle),并将服务器地址形成一个环形循环队列,然后再依次取出服务器地址;
2.1.2、ClientCnxnClientCnxn是Zookeeper客户端中负责维护客户端与服务端之间的网络连接并进行一系列网络通信的核心工作类,Packet是ClientCnxn内部定义的一个堆协议层的封装,用作Zookeeper中请求和响应的载体。Packet包含了请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath/serverPath)、注册的Watcher(watchRegistration)等信息,然而,并非Packet中所有的属性都在客户端与服务端之间进行网络传输,只会将requestHeader、request、readOnly三个属性序列化,并生成可用于底层网络传输的ByteBuffer,其他属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。
ClientCnxn维护着 outgoingQueue(客户端的请求发送队列 和 pendingQueue(服务端响应的等待队列),outgoingQueue专门用于存储那些需要发送到服务端的Packet集合,pendingQueue用于存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。在正常情况下,会从outgoingQueue中取出一个可发送的Packet对象,同时生成一个客户端请求序号XID并将其设置到Packet请求头中去,然后序列化后再发送,请求发送完毕后,会立即将该Packet保存到pendingQueue中,以便等待服务端响应返回后进行相应的处理。

客户端获取到来自服务端的完整响应数据后,根据不同的客户端请求类型,会进行不同的处理:
  1. 若检测到此时客户端尚未进行初始化,那么说明当前客户端与服务端之间正在进行会话创建,直接将接收的ByteBuffer序列化成ConnectResponse对象;
  2. 若当前客户端已经处于正常会话周期,并且接收到服务端响应是一个事件,那么将接收的ByteBuffer序列化成WatcherEvent对象,并将该事件放入待处理队列中;
  3. 3. 若是一个常规请求(Create、GetData、Exist等),那么从pendingQueue队列中取出一个Packet来进行相应处理。首先会检验响应中的XID来确保请求处理的顺序性,然后再将接收到的ByteBuffer序列化成Response对象。
2.1.3、SendThread它是客户端ClientCnxn内部的一个核心I/O调度线程,用于管理客户端与服务端之间的所有网络I/O操作,在Zookeeper客户端实际运行中,SendThread的作用如下:
  1. 维护了客户端与服务端之间的会话生命周期(通过一定周期频率内向服务端发送PING包检测心跳),如果会话周期内客户端与服务端出现TCP连接断开,那么就会自动且透明地完成重连操作;
  2. 管理了客户端所有的请求发送和响应接收操作,其将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调;
  3. 将来自服务端的事件传递给EventThread去处理。
2.1.4、EventThread它客户端ClientCnxn内部的一个事件处理线程,负责客户端的事件处理,并触发客户端注册的Watcher监听。EventThread中的watingEvents队列用于临时存放那些需要被触发的Object,包括客户端注册的Watcher和异步接口中注册的回调器AsyncCallback。同时,EventThread会不断地从watingEvents中取出Object,识别具体类型(Watcher或AsyncCallback),并分别调用process和processResult接口方法来实现对事件的触发和回调。
2.2、管理
Zookeeper的会话管理主要是通过SessionTracker来负责,其采用了分桶策略(将类似的会话放在同一区块中进行管理)进行管理,以便Zookeeper对会话进行不同区块的隔离处理以及同一区块的统一处理。

Zookeeper将所有的会话都分配在不同的区块一种,分配的原则是每个会话的下次超时时间点(ExpirationTime)。ExpirationTime指该会话最近一次可能超时的时间点。同时,Zookeeper Leader服务器在运行过程中会定时地进行会话超时检查,时间间隔是ExpirationInterval,默认为tickTime的值,ExpirationTime的计算时间如下ExpirationTime = ((CurrentTime + SessionTimeOut) / ExpirationInterval + 1) * ExpirationInterval为了保持客户端会话的有效性,客户端会在会话超时时间过期范围内向服务端发送PING请求来保持会话的有效性(心跳检测)。同时,服务端需要不断地接收来自客户端的心跳检测,并且需要重新激活对应的客户端会话,这个重新激活过程称为TouchSession。会话激活不仅能够使服务端检测到对应客户端的存活性,同时也能让客户端自己保持连接状态,其流程如下:

如上图所示,整个流程分为四步:
  1. 检查该会话是否已经被关闭。若已经被关闭,则直接返回即可;
  2. 计算该会话新的超时时间ExpirationTime_New。使用上面提到的公式计算下一次超时时间点;
  3. 获取该会话上次超时时间ExpirationTime_Old。计算该值是为了定位其所在的区块;
  4. 迁移会话。将该会话从老的区块中取出,放入ExpirationTime_New对应的新区块中。
在上面会话激活过程中,只要客户端发送心跳检测,服务端就会进行一次会话激活,心跳检测由客户端主动发起,以PING请求形式向服务端发送,在Zookeeper的实际设计中,只要客户端有请求发送到服务端,那么就会触发一次会话激活,以下两种情况都会触发会话激活:
  1. 客户端向服务端发送请求,包括读写请求,就会触发会话激活;
  2. 客户端发现在sessionTimeout/3时间内尚未和服务端进行任何通信,那么就会主动发起PING请求,服务端收到该请求后,就会触发会话激活;
对于会话的超时检查而言,Zookeeper使用SessionTracker来负责,SessionTracker使用单独的线程(超时检查线程)专门进行会话超时检查,即逐个一次地对会话桶中剩下的会话进行清理。如果一个会话被激活,那么Zookeeper就会将其从上一个会话桶迁移到下一个会话桶中,如ExpirationTime 1 的session n 迁移到ExpirationTime n 中,此时ExpirationTime 1中留下的所有会话都是尚未被激活的,超时检查线程就定时检查这个会话桶中所有剩下的未被迁移的会话,超时检查线程只需要在这些指定时间点(ExpirationTime 1、ExpirationTime 2…)上进行检查即可,这样提高了检查的效率,性能也非常好。
2.3、重连
重连状态(CONNECTED & EXPIRED)当客户端与服务端之间的网络连接断开时,Zookeeper客户端会自动进行反复的重连,直到最终成功连接上Zookeeper集群中的一台机器。此时,再次连接上服务端的客户端有可能处于以下两种状态之一:
  1. CONNECTED:如果在会话超时时间内重新连接上集群中一台服务器;
  2. EXPIRED:如果在会话超时时间以外重新连接上,那么服务端其实已经对该会话进行了会话清理操作,此时会话被视为非法会话。
在客户端与服务端之间维持的是一个长连接,在sessionTimeout时间内,服务端会不断地检测该客户端是否还处于正常连接,服务端会将客户端的每次操作视为一次有效的心跳检测来反复地进行会话激活。因此,在正常情况下,客户端会话时一直有效的。然而,当客户端与服务端之间的连接断开后,用户在客户端可能主要看到两类异常:CONNECTION_LOSS(连接断开)和SESSION_EXPIRED(会话过期)。
2.3.1、重连异常: CONNECTION_LOSS(连接断开)和SESSION_EXPIRED(会话过期)连接断开connection_loss
有时因为网络闪断导致客户端与服务器断开连接,或是因为客户端当前连接的服务器出现问题导致连接断开,我么称“客户端与服务器断开连接”现象,即connection_loss。在这种情况下,zookeeper客户端会自动从地址列表中重新获取新的地址并尝试进行重新连接,直到最终成功连接上服务器。
举个例子:某应用在使用zookeeper客户端进行setData操作时,正好出现了connection_loss现象,那么客户端会记录接收到事件:none-disconnected通知同时会抛出异常ConnectionLossException。这时,我们的应用需要做的事情是捕获异常,然后等待zookeeper的客户端自动完成重连,一旦客户端成功连上一台zookeeper机器后,那么客户端就会收到事件none-syncconnnected通知,之后就可以重试刚才的setData操作。
会话失效session_expired
通常发生在connection_loss期间,客户端和服务器连接断开后,由于重连期间耗时过长,超过了会话超时时间限制后还没有成功连接上服务器,那么服务器认为这个会话已经结束了,就会开始进行会话清理,但是另一方面,该客户端本身不知道会话已经失效了,并且其客户端状态还是disconnected。之后,如果客户端重新连接上了服务器,服务器会告知客户端会话已经失效,在这时,用户就需要重新实例化一个zookeeper对象,并看应用的复杂程度,重新恢复临时数据。
会话转移session_moved
是指客户端会话从一台服务器转移到另一台服务器上,假设客户端和服务器s1之间的连接断开后,如果通过尝试重连后,成功连接上了新的服务器s2并且延续了有效会话,那么就可以说会话从s1转移到了s2上。
会话管理+事务管理
2.4、清理
当SessionTracker的会话超时线程检查出已经过期的会话后,就开始进行会话清理工作,大致可以分为如下七步:
  1. 标记会话状态为已关闭。由于会话清理过程需要一段时间,为了保证在此期间不再处理来自该客户端的请求,SessionTracker会首先将该会话的isClosing标记为true,这样在会话清理期间接收到该客户端的心情求也无法继续处理了;
  2. 发起会话关闭请求。为了使对该会话的关闭操作在整个服务端集群都生效,Zookeeper使用了提交会话关闭请求的方式,并立即交付给PreRequestProcessor进行处理;
  3. 收集需要清理的临时节点。一旦某个会话失效后,那么和该会话相关的临时节点都需要被清理,因此,在清理之前,首先需要将服务器上所有和该会话相关的临时节点都整理出来。Zookeeper在内存数据库中会为每个会话都单独保存了一份由该会话维护的所有临时节点集合,在Zookeeper处理会话关闭请求之前,若正好有以下两类请求到达了服务端并正在处理中。节点删除请求,删除的目标节点正好是上述临时节点中的一个,需要将所有请求对应的数据节点路径从当前临时节点列表中移出,以避免重复删除。临时节点创建请求,创建的目标节点正好是上述临时节点中的一个,需要将所有这些请求对应的数据节点路径添加到当前临时节点列表中,以删除这些即将被创建但是尚未保存到内存数据库中的临时节点;
  4. 添加节点删除事务变更。完成该会话相关的临时节点收集后,Zookeeper会逐个将这些临时节点转换成"节点删除"请求,并放入事务变更队列outstandingChanges中;
  5. 删除临时节点。FinalRequestProcessor会触发内存数据库,删除该会话对应的所有临时节点。6. 移除会话。完成节点删除后,需要将会话从SessionTracker中删除。7. 关闭NioserverCnxn。最后,从NIOServerCnxnFactory找到该会话对应的NIOServerCnxn,将其关闭。
三、事务处理3.1、处理流程
大体可以分为请求接收、会话创建、预处理、事务处理、事务应用和会话响应六大环节,其大体流程如下图所示:

3.1.1、请求接收
  1. I/O层接收来自客户端的请求。NIOServerCnxn维护每一个客户端连接,客户端与服务器端的所有通信都是由NIOServerCnxn负责,其负责统一接收来自客户端的所有请求,并将请求内容从底层网络I/O中完整地读取出来;
  2. 判断是否是客户端会话创建请求。每个会话对应一个NIOServerCnxn实体,对于每个请求,Zookeeper都会检查当前NIOServerCnxn实体是否已经被初始化,如果尚未被初始化,那么就可以确定该客户端一定是会话创建请求;
  3. 反序列化ConnectRequest请求。一旦确定客户端请求是否是会话创建请求,那么服务端就可以对其进行反序列化,并生成一个ConnectRequest载体;
  4. 判断是否是ReadOnly客户端。如果当前Zookeeper服务器是以ReadOnly模式启动,那么所有来自非ReadOnly型客户端的请求将无法被处理。因此,服务端需要先检查是否是ReadOnly客户端,并以此来决定是否接受该会话创建请求;
  5. 检查客户端ZXID。正常情况下,在一个Zookeeper集群中,服务端的ZXID必定大于客户端的ZXID,因此若发现客户端的ZXID大于服务端ZXID,那么服务端不接受该客户端的会话创建请求;
  6. 协商sessionTimeout。在客户端向服务器发送超时时间后,服务器会根据自己的超时时间限制最终确定该会话超时时间,这个过程就是sessionTimeout协商过程;
  7. 判断是否需要重新激活创建会话。服务端根据客户端请求中是否包含sessionID来判断该客户端是否需要重新创建会话,若客户单请求中包含sessionID,那么就认为该客户端正在进行会话重连,这种情况下,服务端只需要重新打开这个会话,否则需要重新创建。
3.1.2、会话创建
  1. 为客户端生成sessionID。在为客户端创建会话之前,服务端首先会为每个客户端分配一个sessionID,服务端为客户端分配的sessionID是全局唯一的;
  2. 注册会话。向SessionTracker中注册会话,SessionTracker中维护了sessionsWithTimeout和sessionsById,在会话创建初期,会将客户端会话的相关信息保存到这两个数据结构中;
  3. 激活会话。激活会话涉及Zookeeper会话管理的分桶策略,其核心是为会话安排一个区块,以便会话清理程序能够快速高效地进行会话清理;
  4. 生成会话密码。服务端在创建一个客户端会话时,会同时为客户端生成一个会话密码,连同sessionID一同发给客户端,作为会话在集群中不同机器间转移的凭证。
3.1.3、事务预处理
  1. 将请求交给PrepRequestProcessor处理器处理。在提交给第一个请求处理器之前,Zookeeper会根据该请求所属的会话,进行一次激活会话操作,以确保当前会话处于激活状态,完成会话激活后,则提交请求至处理器;
  2. 创建请求事务头。对于事务请求,Zookeeper会为其创建请求事务头,服务端后续的请求处理器都是基于该请求头来识别当前请求是否是事务请求,请求事务头包含了一个事务请求最基本的一些信息,包括sessionID、ZXID(事务请求对应的事务ZXID)、CXID(客户端的操作序列)和请求类型(如create、delete、setData、createSession等)等;
  3. 创建请求事务体。由于此时是会话创建请求,其事务体是CreateSessionTxn;
  4. 注册于激活会话。处理由非Leader服务器转发过来的会话创建请求。
3.1.4、事务处理
  1. 将请求交给ProposalRequestProcessor处理器,与提议相关的处理器,从ProposalRequestProcessor开始,请求的处理将会进入三个子处理流程,分别是Sync流程、Proposal流程、Commit流程。

  • Sync流程:使用SyncRequestProcessor处理器记录事务日志,针对每个事务请求,都会通过事务日志的形式将其记录,完成日志记录后,每个Follower都会向Leader发送ACK消息,表明自身完成了事务日志的记录,以便Leader统计每个事务请求的投票情况;
  • Proposal流程:每个事务请求都需要集群中过半机器投票认可才能被真正应用到内存数据库中,这个投票与统计过程就是Proposal流程,详细过程如下:
  • 发起投票。若当前请求是事务请求,Leader会发起一轮事务投票,在发起事务投票之前,会检查当前服务端的ZXID是否可用;
  • 生成提议Proposal。若ZXID可用,Zookeeper会将已创建的请求头和事务体以及ZXID和请求本身序列化到Proposal对象中,此Proposal对象就是一个提议;
  • 广播提议。Leader以ZXID作为标识,将该提议放入投票箱outstandingProposals中,同时将该提议广播给所有Follower;
  • 收集投票。Follower接收到Leader提议后,进入Sync流程进行日志记录,记录完成后,发送ACK消息至Leader服务器,Leader根据这些ACK消息来统计每个提议的投票情况,当一个提议获得半数以上投票时,就认为该提议通过,进入Commit阶段;
  • 将请求放入toBeApplied队列中;
  • 广播Commit消息。Leader向Follower和Observer发送COMMIT消息。向Observer发送INFORM消息,向Leader发送ZXID。
  • Commit流程,详细如下:
  • 将请求交付CommitProcessor。CommitProcessor收到请求后,将其放入queuedRequests队列中;
  • 处理queuedRequest队列请求。CommitProcessor中单独的线程处理queuedRequests队列中的请求;
  • 标记nextPending。若从queuedRequests中取出的是事务请求,则需要在集群中进行投票处理,同时将nextPending标记位当前请求;
  • 等待Proposal投票。在进行Commit流程的同时,Leader会生成Proposal并广播给所有Follower服务器,此时,Commit流程等待,直到投票结束;
  • 投票通过。若提议获得过半机器认可,则进入请求提交阶段,该请求会被放入commitedRequests队列中,同时唤醒Commit流程;
  • 提交请求。若commitedRequests队列中存在可以提交的请求,那么Commit流程则开始提交请求,将请求放入toProcess队列中,然后交付下一个请求处理器:FinalRequestProcessor。
3.1.5、事务应用
  1. 交付给FinalRequestProcessor处理器。FinalRequestProcessor处理器检查outstandingChanges队列中请求的有效性,若发现这些请求已经落后于当前正在处理的请求,那么直接从outstandingChanges队列中移除;
  2. 事务应用。之前的请求处理仅仅将事务请求记录到了事务日志中,而内存数据库中的状态尚未改变,因此,需要将事务变更应用到内存数据库;
  3. 将事务请求放入队列commitProposal。完成事务应用后,则将该请求放入commitProposal队列中,commitProposal用来保存最近被提交的事务请求,以便集群间机器进行数据的快速同步。
3.1.6、会话响应
  1. 统计处理。Zookeeper计算请求在服务端处理所花费的时间,统计客户端连接的基本信息,如lastZxid(最新的ZXID)、lastOp(最后一次和服务端的操作)、lastLatency(最后一次请求处理所花费的时间)等;
  2. 创建响应ConnectResponse。会话创建成功后的响应,包含了当前客户端和服务端之间的通信协议版本号、会话超时时间、sessionID和会话密码;
  3. 序列化ConnectResponse;
  4. I/O层发送响应给客户端。
3.2、请求处理链
【Zookeeper - 会话管理和事务处理】
使用责任链来处理每个客户端的请求是Zookeeper的特色,Leader服务器的请求处理链如下:
  1. PrepRequestProcessor。请求预处理器。在Zookeeper中,那些会改变服务器状态的请求称为事务请求(创建节点、更新数据、删除节点、创建会话等),PrepRequestProcessor能够识别出当前客户端请求是否是事务请求。对于事务请求,PrepRequestProcessor处理器会对其进行一系列预处理,如创建请求事务头、事务体、会话检查、ACL检查和版本检查等;
  2. ProposalRequestProcessor。事务投票处理器。Leader服务器事务处理流程的发起者,对于非事务性请求,ProposalRequestProcessor会直接将请求转发到CommitProcessor处理器,不再做任何处理,而对于事务性请求,处理将请求转发到CommitProcessor外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follower服务器来发起一次集群内的事务投票。同时,ProposalRequestProcessor还会将事务请求交付给SyncRequestProcessor进行事务日志的记录;
  3. SyncRequestProcessor。事务日志记录处理器。用来将事务请求记录到事务日志文件中,同时会触发Zookeeper进行数据快照;
  4. AckRequestProcessor。负责在SyncRequestProcessor完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录;
  5. CommitProcessor。事务提交处理器。对于非事务请求,该处理器会直接将其交付给下一级处理器处理;对于事务请求,其会等待集群内针对Proposal的投票直到该Proposal可被提交,利用CommitProcessor,每个服务器都可以很好地控制对事务请求的顺序处理;
  6. ToBeCommitProcessor。该处理器有一个toBeApplied队列,用来存储那些已经被CommitProcessor处理过的可被提交的Proposal。其会将这些请求交付给FinalRequestProcessor处理器处理,待其处理完后,再将其从toBeApplied队列中移除;
  7. FinalRequestProcessor。用来进行客户端请求返回之前的操作,包括创建客户端请求的响应,针对事务请求,该处理还会负责将事务应用到内存数据库中去。

    推荐阅读