从Paxos到Zookeeper分布式一致性原理与实践

第1章 分布式结构

从集中式到分布式

集中式的特点

由一台或多台主计算机组成的中心节点,数据集中存储于这个中心节点中,并且整个系统的所有业务单元都集中部署在这个中心节点上,系统的所有功能均由其集中处理。也就是说,在集中式系统中,每个终端或客户端机器仅仅负责数据的录入和输出,而数据的存储与控制处理完全交由主机完成。

特点:部署结构简单,不用考虑多节点部署带来的分布式协作问题

分布式的特点

《分布式系统概念与设计》中对分布式系统的定义:

分布式系统是一个硬件或软件组件分布在不同的网络计算机上,彼此之间仅仅通过消息传递进行通信和协调的系统。

分布式系统在没有任何特定业务逻辑约束的情况下,都会有如下几个特征:

  1. 分布性

    分布式系统中的多台计算机都会在空间上随意分布,同时,机器的分布情况也会随时变动

    节点上下线

  2. 对等性

    分布式系统中的计算机没有主/从之分的,所有计算机节点是对等的。

    副本(Replica)是分布式系统最常见的概念之一,指的是分布式系统对数据和服务提供的一种冗余方式。

    在分布式系统中为了对外提供高可用的服务,往往会对数据和服务进行副本处理。

    副本分两类

    1. 数据副本
    2. 服务副本

    数据副本是指在不同的节点上持久化同一份数据,当某一个节点上存储的数据丢失时,可以从副本上读取到该数据,这是解决分布式系统数据丢失问题最为有效的手段

    涉及到副本就肯定会出现数据不一致的问题

    另一类副本是服务副本,指多个节点提供同样的服务,每个节点都有能力接受来自外部的请求并进行相应的处理。

  3. 并发性

    同一个分布式系统中的多个节点可能会并发地操作一些共享的资源,如数据库或分布式存储等

  4. 缺乏全局时钟

    分布式系统中很难定义两个事件的先后,因为分布式系统缺乏一个全局的时钟序列控制

  5. 故障总是会发生

    组成分布式系统的所有计算机,都有可能发生任何形式的故障。

    任何在设计阶段考虑到的异常情况,一定会在系统实际运行中发生,并且在系统实际运行过程中还会遇到很多在设计时未能考虑到的异常故障。所以,除非需求指标允许,在系统设计时不能放过任何异常情况。

分布式环境的各种问题
通信异常

分布式系统需要在各个节点之间进行网络通信,由于网络本身的不可靠性,会导致最终分布式系统无法顺利完成一次网络通信。

即使能够正常进行通信,延时也会远大于单机操作。通常单机内存访问的延时在纳秒级别(通常是10ns左右),而正常的一次网络通信的延时在0.1~1ms左右(相当于内存访问延时的105~106倍),延时大也会影响消息的收发过程,因此消息丢失和消息延迟变得非常普遍。

网络分区

当网络由于发生异常情况,导致分布式系统中部分节点之间的网络延时不断增大,最终导致组成分布式系统的所有节点中,只有部分节点之间能够正常通信,而另一些节点不能——这个现象称为网络分区,也就是俗称的脑裂。

网络分区出现的时候,分布式系统会出现局部小集群,在极端情况下,这些局部小集群会独立完成原本需要整个分布式系统才能完成的功能,包括对数据的事务处理。

三态

传统的单机系统中,应用程序在调用一个函数之后,能够得到一个非常明确的响应:成功或失败。而在分布式系统中,由于网络是不可靠的,虽然在绝大部分情况下,网络通信也能够接受到成功或失败的响应,但是当网络出现异常的情况下,就可能会出现超时现象:

  1. 由于网络原因,该请求并没有被成功地发送到接收方,而是在发送过程中就发生了消息丢失现象。
  2. 该请求成功地被接收方接收后,并进行了处理,但是将响应反馈给发送方的过程中,发生了消息丢失现象。
  1. 接受方甚至都没有收到请求
  2. 接收方处理完并反馈,但是发送方没接收到

当出现这样的超时现象时,网络通信的发起方是无法确定当前请求是否被成功处理的。

节点故障

指的是组成分布式系统的服务器节点出现宕机或“僵死”现象

从ACID到CAP/BASE

ACID
分布式事务
CAP和BASE理论
CAP定理

CAP理论告诉我们,一个分布式系统不可能同时满足一致性(C:Consistency)、可用性(A:Availability)和分区容错性(P:Partition tolerance)这三个基本需求,最多只能同时满足其中的两项。

  • 一致性

    指数据在多个副本是否能够保持一致的特性。在一致性需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致状态。

    对于一个将数据副本分布在不同分布式节点上的系统来说,如果对第一个节点的数据进行了更新操作并且更新成功后,却没有使得第二节点上的数据得到相应的更新,于是在对第二个节点的数据进行读取操作时,获取的依然是老数据(或称为脏数据),这就是典型的分布式数据不一致的情况。

    在分布式系统中,如果能够做到针对一个数据项的更新操作执行成功后,所有用户都可以读取到其最新的值,那么这样的系统被认为具有强一致性(或严格一致性)。

  • 可用性

    指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。

    “返回结果”是可用性的另一个非常重要的指标,他要求系统在完成对用户请求的处理后,返回一个正常的响应结果(就是能够反映出对请求的处理结果,即成功或失败,而不是一个让用户感到困惑的返回结果)

  • 分区容错性

    分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非整个网络都发生故障。

    网络分区:在分布式系统中,不同的节点分布在不同的子网络(机房或异地网络等)中,由于一些特殊的原因导致这些子网络之间出现网络不连通的情况,但是子网络内部通信正常,从而导致整个系统的网络环境被切分成若干个孤立的区域。

    需要注意的是,组成分布式系统的每个节点的加入与退出都可以看作是一个特殊的网络分区。

20220516094438424

  1. CA(放弃P)

    如果希望能够避免系统出现分区,一种简单的做法是将所有的数据(或者仅仅将那些与事务相关的数据)都放在一个分布式节点上,这样虽然无法100%保证系统不会出错,但至少不会碰到由于网络分区带来的负面影响。

    注意:放弃P也就意味着放弃了系统的可扩展性,可以说就不是分布式系统了。

  2. CP(放弃A)

    一旦发生网络分区或其他故障,受到影响的服务需要等待一定的时间,因为在这段期间系统无法对外提供正常的服务,即不可用

  3. AP(放弃C)

    注意:这里说的放弃一致性并不是完全放弃一致性(没有一致性的系统没意义)

    放弃一致性指的是放弃强一致性,但是保留数据的最终一致性。

    即系统无法保证数据一定是实时同步的,但是能够保证数据最终会达到一个一致的状态。

    这就引入了一个时间窗口的概念,具体多久取决于系统设计,主要包括数据副本在不同节点之间的复制时间长短

对于一个分布式系统而言,分区容错性是最基本的要求。

因为既然是一个分布式系统,那么分布式系统中的组件必然需要被部署到不同的节点,就必然出现子网络。

BASE理论
  1. Basically Available(基本可用)
  2. Soft state(软状态)
  3. Eventually consistent(最终一致性)

核心思想:即使无法做到强一致性(Strong consistency),但是每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventually consistency)。

  • 基本可用

    指分布式系统在出现不可预知故障的时候,允许损失部分可用性(不等价于系统不可用!)

    以下两个就是“基本可用”的典型例子:

    • 响应时间上的损失:正常情况下,一个搜索引擎需要在0.5秒之内返回给用户结果,但是由于故障,查询时间可能增加到1~2秒。
    • 功能上的损失:正常情况下,消费者几乎能在购物网站顺利完成每一笔订单,但是节日大促活动时,消费者购物行为激增,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。
  • 软状态(弱状态)

    和硬状态相比,指允许系统中的数据存在中间状态,并认为该中间状态的存在不影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时

  • 最终一致性

    最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。

    保证最终数据能够达到一致,不需要实时保证系统数据的强一致性

    实际工程实践中,最终一致性存在五类主要变种

    1. 因果一致性(Causal consistency)

      如果进程A在更新完某个数据项后通知了进程B,那么进程B之后对该数据项的访问都应该能够获取到进程A更新后的最新值,并且如果进程B要对该数据项进行更新操作的话,务必基于进程A更新后的最新值,即不能发生丢失更新的情况。与此同时,与进程A无因果关系的进程C的数据访问则没有这样的限制。

    2. 读己之所写(Read your writes)

      进程A更新一个数据项之后,它自己总是能够访问到更新过的最新值,而不会看到旧值。也就是说,对于单个数据获取者来说,其读取到的数据,一定不会比自己上次写入的值旧。因此可以看作是一种特殊的因果一致性。

    3. 会话一致性(Session consistency)

      对系统数据的访问过程框定在一个会话当中:系统能够保证在同一个有效的会话中实现“读己之所写”的一致性,也就是说,执行更新操作之后,客户端能够在同一个会话中始终读取到该数据项的最新值。

    4. 单调读一致性(Monotonic read consistency)

      如果一个进程从系统中读取出一个数据项的一个值后,那么系统对于该进程后续的任何数据访问都不应该返回更旧的值。

    5. 单调写一致性(Monotonic write consistency)

      一个系统需要能够保证来自同一个进程的写操作被顺序执行。

    实际系统实践中,可以将若干种一致性变种互相结合

    最终一致性并不是只有大型分布式系统才涉及的特性,许多现代的关系型数据库都采用了最终一致性模型。采用同步和异步方式来实现主备数据复制。在同步方式中,数据的复制过程通常是更新事务的一部分,因此事务完成后,主备数据库的数据就会达到一致。而在异步方式中,备库的更新往往会存在延时,这取决于事务日志在主备数据库之间传输的时间长短,如果传输时间过长或者甚至是日志传输过程中出现异常导致无法及时将事务应用到备库上,此时就会出现数据不一致的情况。无论是采用重试还是人为订正,关系型数据库还是能够保证最终数据一致性。

第2章 一致性协议

2PC与3PC

在分布式系统中,每一个机器节点虽然都能明确知道自己在进行事务操作过程中的结果是成功还是失败,但是却无法直接获取到其他分布式节点的操作结果。因此当一个事务操作需要跨越多个分布式节点的时候,为了保持事务处理的ACID特性,就需要引入一个称为“协调者(Coordinator)”的组件来统一调度所有分布式节点的执行逻辑,这些被调度的分布式节点则被称为“参与者(Participant)”。

协调者负责调度参与者的行为,并最终决定这些参与者是否要把事务真正进行提交。

基于这个思想衍生出二阶段提交和三阶段提交两种协议。

2PC

Two-Phase Commit(二阶段提交),是为了使基于分布式系统架构下的所有节点在进行事务处理过程中能够保持原子性和一致性而设计的一种算法。也被认为是一种一致性协议,用来保证分布式系统数据的一致性。

协议说明

二阶段提交协议就是将事务的提交过程分成了两个阶段来进行处理,其执行流程如下

阶段一:提交事务请求
  1. 事务询问

    协调者向所有的参与者发送事务内容,询问是否可以执行事务提交操作,并开始等待各参与者的响应。

  2. 执行事务

    各参与者节点执行事务操作,并将Undo和Redo信息记入事务日志中。

  3. 各参与者向协调者反馈事务询问的响应

    如果参与者成功执行了事务操作,那么就反馈给协调者Yes响应,表示事务可以执行;

    如果参与者没有成功执行事务,那么就反馈给协调者No响应,表示事务不可以执行。

由于上述内容在形式上近似协调者组织各参与者对一次事务操作的投票表态过程,因此二阶段提交协议的阶段一也被称为“投票阶段”,即各参与者投票表明是否要继续执行接下来的事务提交操作。

阶段二:执行事务提交

协调者根据各参与者的反馈情况来决定最终是否可以进行事务提交操作,正常情况下包含以下两种可能:

  • 执行事务提交

    加入协调者从所有的参与者获得的反馈都是Yes响应,那么就会执行事务提交。

    1. 发送提交请求

      协调者向所有参与者节点发出Commit请求。

    2. 事务提交

      参与者接收到Commit请求后,会正式执行事务提交操作,并在完成提交之后释放在整个事务执行期间占用的事务资源。

    3. 反馈事务提交结果

      参与者在完成事务提交之后,向协调者发送Ack消息。

    4. 完成事务

      协调者接收到所有参与者反馈的Ack消息后,完成事务。

  • 中断事务

    假如出现了一个参与者向协调者反馈了No响应,或者在等待超时之后,协调者无法接收到所有参与者的反馈响应,那么就会中断事务。

    1. 发送回滚请求

      协调者向所有参与者节点发送Rollback请求。

    2. 事务回滚

      参与者接收到Rollback请求后,会利用在阶段一中记录的Undo信息来执行事务回滚操作,并在完成回滚之后释放在整个事务执行期间占用的资源。

    3. 反馈事务回滚结果

      参与者在完成事务回滚之后,向协调者发送Ack消息。

    4. 中断事务

      协调者接收到所有参与者反馈的Ack消息后,完成事务中断。

二阶段提交将事务处理过程分为投票执行阶段,其核心是对每个事务都采用先尝试后提交的处理方式,因此也可以将二阶段提交看作是一个强一致性的算法。

事务提交:

20220516120609153

事务中断:

20220516120734800

优缺点

优点:原理简单,实现方便

缺点:同步阻塞,单点问题,脑裂,太过保守

同步阻塞

各个参与者在等待其他参与者响应的过程中将无法进行其他操作

单点问题

协调者的角色之重要,一旦出现问题,其他参与者将会一直处于锁定事务资源的状态,无法继续完成事务操作

数据不一致

在阶段二,当协调者向所有参与者发送Commit请求之后,发生了局部网络异常或者是协调者在尚未发送完Commit请求之前自身发生了崩溃,导致最终只有部分参与者收到了Commit请求。于是,这部分收到了Commit请求的参与者就会进行事务的提交,而其他没有收到Commit请求的参与者则无法进行事务提交,于是整个分布式系统出现数据不一致性现象

太过保守

协调者指示参与者进行事务提交询问的过程中,如果参与者出现故障的话,协调者依靠的自身的超时机制来判断是否需要中断事务,这样的策略比较保守,没有较为完善的重试机制,任意一个节点的失败都会导致事务失败

3PC
协议说明

2PC的改进版,其将二阶段提交协议的“提交事务请求”过程一分为二,形成由CanCommitPreCommitdoCommit三个阶段组成的事务处理协议

20220601184902017

阶段一:CanCommit
  1. 事务询问

    协调者向所有的参与者发送一个包含事务内容的canCommit请求,询问是否可以执行事务提交操作,并开始等待各参与者的响应。

  2. 各参与者向协调者反馈事务询问的响应

    参与者在接收到来自协调者的canCommit请求后,正常情况下,如果其自身认为可以顺利执行事务,那么就会反馈Yes响应,并进入预备状态,否则反馈No响应。

阶段二:PreCommit

协调者会根据反馈情况来决定是否可以进行事务的PreCommit操作,正常情况下,包含两种可能。

  • 执行事务预提交

    所有的参与者反馈都是Yes,那么执行事务预提交

    1. 发送预提交请求

      协调者向所有参与者发送preCommit请求,并进入Prepared阶段

    2. 事务预提交

      参与者接收到后会执行事务操作,并将Undo和Redo信息记录到事务日志中

    3. 各参与者向协调者反馈事务执行响应

      如果参与者成功执行事务,那么就会反馈给协调者Ack响应,同时等待最终的指令:提交(commit)或中止(abort)

  • 中断事务

    有任何一个参与者反馈No,或者在等待超时之后,那么就会中断事务

    1. 发送中断请求

      协调者发送abort请求

    2. 中断事务

      无论是收到协调者的abort请求,还是等待协调者请求超时,参与者都会中断事务

阶段三:doCommit

真正进行事务提交,存在两种情况

  • 执行提交

    1. 发送提交请求

      假设协调者处于正常状态,并且收到了所有参与者的Ack响应,那么它将从“预提交”转换到“提交”状态,并向所有参与者发送doCommit请求

    2. 事务提交

      参与者接收到doCommit请求后,会正式执行事务提交操作,并在完成提交之后释放资源

    3. 反馈事务提交结果

      向协调者发送Ack

    4. 完成事务

      协调者接收到所有参与者反馈的Ack后完成事务

  • 中断事务

    假设协调者处于正常状态,并且有任意一个参与者向协调者发送了No响应,或者等待超时,那么就会中断事务

    1. 发送中断请求

      协调者向所有参与者节点发送abort请求

    2. 事务回滚

      参与者接收到abort请求后,会利用阶段二中记录的Undo信息进行事务回滚,之后释放资源

    3. 反馈事务回滚结果

      向协调者发送Ack

    4. 中断事务

      协调者接收所有参与者反馈的Ack后中断事务

注意:一旦进入阶段三,可能会存在以下故障

  • 协调者出现问题
  • 协调者和参与者之间的网络出现故障

无论哪种,最终都会导致参与者无法接收到协调者的doCommitabort请求,参与者会在等待超时之后继续进行事务提交

优缺点

相较于二阶段,最大的优点就是降低参与者的阻塞范围,并且能够在出现单点故障后继续达成一致

???

缺点:参与者在接收到PreCommit消息后,如果出现网络分区,此时协调者所在的节点和参与者无法进行正常的网络通信,这种情况下参与者仍然会进行事务提交,这会造成数据不一致性

Paxos算法

Paxos[ˈpæksoʊs]

基于消息传递且具有高度容错特性的一致性算法

拜占庭将军问题

20220607100552312

理论上来说试图在异步系统和不可靠的通道上来达到一致性状态是不可能的,因此对于一致性的研究都是假设信道是可靠的。

  • 大多数系统都是部署在局域网内的,因此消息被篡改的情况非常罕见
  • 由于硬件和网络原因而造成的消息不完整问题只需要一套简单的校验算法即可避免

所以在实际工程实践中,可以假设不存在拜占庭将军问题

20220607101012024

Paxos算法详解
问题描述

假设有一组可以提出提案的进程集合,那么对于一个一致性算法来说需要保证以下几点:

  • 在这些被提出的提案中,只有一个会被选定
  • 如果没有提案被提出,那么就不会有被选定的提案
  • 当一个提案被选定后,进程应该可以获取被选定的提案信息

对于一致性来说,安全性(Safety)需求如下:

  • 只有被提出的提案才能被选定
  • 只能有一个值被选定
  • 如果某个进程认为某个提案被选定了,那么这个提案必须是真的被选定的那个

Paxos算法中,有三种参与角色,用ProposerAcceptorLearner表示

在具体实现中,一个进程可能充当不止一种角色,我们不关心进程如何映射到各种角色。

假设不同参与者之间可以通过收发消息来进行通信,那么:

  • 每个参与者以任意的速度执行,可能会因为出错而停止,也可能会重启。同时,即使一个提案被选定后,所有的参与者也都有可能失败或重启,因此除非那些失败或重启的参与者可以记录某些信息,否则将无法确定最终的值。
  • 消息在传输过程中可能会出现不可预知的延迟,也可能会重复或丢失,但是消息不会被损坏,即消息内容不会被篡改(拜占庭式问题)
提案的选定

单一Acceptor

要选定唯一提案的最简单的方式就是只允许一个Acceptor存在,这样Proposer只能发送提案给这个AcceptorAcceptor会选择它接收到的第一个提案作为被选定提案。

这种方式很简单,但是一旦这个Acceptor出现问题,整个系统就无法工作

多个Acceptor

使用多个Acceptor避免单点问题

Proposer向一个Acceptor集合发送提案,集合中的每个Acceptor都可能会批准该提案,当有足够多的Acceptor批准这个提案的时候(比如说半数以上?)我们就认为该提案被选定了。

什么是足够多?

假定足够多的Acceptor是整个Acceptor集合的子集,并且让这个子集大得可以包含Acceptor集合中的大多数成员,因为任意两个包含大多数Acceptor的子集至少有一个公共成员。另外再规定每个Acceptor最多只能批准一个提案,那么就能保证只有一个提案被选定了。

推导过程

在没有失败和消息丢失的情况下,如果我们希望即使在只有一个提案被提出的情况下,仍然可以选出一个提案,这就暗示了一个需求:

P1:一个Acceptor必须批准它收到的第一个提案

但是这个需求会引出另一个问题:如果多个提案被不同的Proposer同时提出,这可能会导致虽然每个Acceptor都批准了它收到的第一个提案,但是没有一个提案是由多数人都批准的

20220607205041955

另外,即使只有两个提案被提出,如果每个提案都被差不多一半的Acceptor批准,此时即使只有一个Acceptor出错,都有可能导致无法确定该选定哪个提案

20220607205238893

P1的基础上,再加上一个提案被选定需要由半数以上的Acceptor批准的需求,暗示着一个Acceptor必须能够批准不止一个提案。在这里用全局的编号来唯一标识每一个被Acceptor批准的提案,当一个具有某Value值的提案被半数以上的Acceptor批准后,我们就认为该Value被选定了,此时我们也认为该提案被选定了。

此处讲的提案已经和Value不是同一个概念了,提案变成了由编号和Value组成的组合体,即[编号, Value]来表示一个提案

虽然允许多个提案被选定,但同时必须要保证所有被选定的提案都具有相同的Value值,结合提案的编号,该约定可以定义如下:

P2:如果编号为M0、Value值为V0的提案(即[M0, V0])被选定了,那么所有比编号M0更高的,且被选定的提案,其Value值必须也是V0

第4章 ZooKeeper与Paxos

初识ZooKeeper

ZooKeeper中的基本概念
集群角色

通常在分布式系统中,构成一个集群的每一台机器都有自己的角色,最典型的就是主从模式(Maste/Slave)。

  • Master机器:能够处理所有写操作
  • Slave机器:通过异步复制方式获取最新数据,并提供读服务的机器

但是在ZooKeeper中,它没有沿用传统的Master/Slave概念,而是引入Leader、Follower和Observer三种角色。

  • 所有机器通过Leader选举过程来选定一台Leader机器,Leader服务器为客户端提供读和写服务
  • Follower和Observer都能提供读服务,区别在于Observer不参与Leader选举过程,也不参与写操作的“过半写成功”策略,因此Observer可以在不影响写性能的情况下提升集群的读性能
会话(Session)

指客户端会话,ZooKeeper中一个客户端连接是与服务器之间的TCP长连接。客户端启动的时候与服务器建立TCP连接,从第一次连接建立开始,客户端会话的生命周期就开始了,客户端通过心跳检测与服务器保持有效的会话。

Session的sessionTimeout用来设置客户端会话超时时间,当由于服务器压力太大、网络故障或是客户端主动断开连接等原因导致客户端连接断开,只要在sessionTimeout规定时间内重连,那么之前创建的会话仍然有效。

数据节点(ZNode)

ZooKeeper中的节点分为两类

  • 构成集群的机器,称为机器节点
  • 数据模型中的数据单元,称为数据节点——ZNode

ZooKeeper将所有数据存储在内存中,数据模型是一颗树(ZNode Tree),由斜杠(/)进行分割的路径,就是一个ZNode。每个ZNode上会保存自己的数据内容,同时还会保存一系列属性信息。

数据节点分类

  • 持久节点

    一旦创建,除非主动移除,否则会一直保存在ZooKeeper中

  • 临时节点

    生命周期与客户端会话绑定,一旦客户端会话失效,这个客户端创建的临时节点就会被移除

    ZooKeeper还允许用户为每个节点添加一个特殊的属性:SEQUENTIAL。一旦节点被标记这个属性,那么这个节点创建的时候会自动在节点名后面追加上一个整型数字,这个整型数字是由父节点维护的自增数字

版本

ZooKeeper的每个ZNode上都会存储数据,ZooKeeper会维护一个叫作Stat的数据结构,Stat中记录了这个ZNode的三个数据版本,分别是version(当前ZNode的版本)、cversion(当前ZNode子节点版本)和aversion(当前ZNode的ACL版本)

Watcher

事件监听器,ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发时,ZooKeeper会将事件通知到感兴趣的客户端上

ACL

Access Control Lists,一种权限控制策略,类似于UNIX文件系统的权限控制。ZooKeeper定义了5种权限

  • CREATE:创建子节点权限
  • READ:获取节点数据和子节点列表的权限
  • WRITE:更新节点数据的权限
  • DELETE:删除子节点的权限
  • ADMIN:设置节点ACL的权限

注意:CREATE和DELETE都是针对子节点的权限控制

ZooKeeper的ZAB协议

第5章 使用ZooKeeper

安装部署

  1. 安装JRE

    • yum安装

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      yum search java
      ...
      java-1.8.0-openjdk.x86_64 : OpenJDK 8 Runtime Environment
      java-1.8.0-openjdk-accessibility.i686 : OpenJDK accessibility connector
      java-1.8.0-openjdk-accessibility.x86_64 : OpenJDK accessibility connector
      java-1.8.0-openjdk-demo.i686 : OpenJDK Demos 8
      java-1.8.0-openjdk-demo.x86_64 : OpenJDK 8 Demos
      java-1.8.0-openjdk-devel.i686 : OpenJDK Development Environment 8
      java-1.8.0-openjdk-devel.x86_64 : OpenJDK 8 Development Environment
      java-1.8.0-openjdk-headless.i686 : OpenJDK Headless Runtime Environment 8
      java-1.8.0-openjdk-headless.x86_64 : OpenJDK 8 Headless Runtime Environment
      java-1.8.0-openjdk-javadoc.noarch : OpenJDK 8 API documentation
      java-1.8.0-openjdk-javadoc-zip.noarch : OpenJDK 8 API documentation compressed in a single archive
      java-1.8.0-openjdk-src.i686 : OpenJDK Source Bundle 8
      java-1.8.0-openjdk-src.x86_64 : OpenJDK 8 Source Bundle
      ...

      从描述可以看出各个包的区别,选择jre安装就行了

      yum install java-1.8.0-openjdk.x86_64

    • apt安装

    • 源码安装

  2. 安装ZooKeeper

    1. 官网下载地址:https://zookeeper.apache.org/releases.html

      清华镜像地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
      ###可能出现###
      Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.15.130, 2402:f000:1:400::2
      Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.15.130|:443... connected.
      ERROR: cannot verify mirrors.tuna.tsinghua.edu.cn's certificate, issued by '/C=US/O=Let\'s Encrypt/CN=R3':
      Issued certificate has expired.
      To connect to mirrors.tuna.tsinghua.edu.cn insecurely, use `--no-check-certificate'.


      加上 --no-check-certificate 选项即可
      # wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
    2. 解包

      1
      2
      3
      4
      5
      6
      tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
      # 参数解释
      # -z: 通过gzip指令压缩/解压缩文件,文件名最好为*.tar.gz
      # -x: 从归档文件中提取文件
      # -v: 显示指令执行过程
      # -f: 指定备份(压缩)文件
    3. 拷贝配置文件

      conf目录下有个样例配置,拷贝一份重命名为zoo.cfg

      1
      2
      3
      cd apache-zookeeper-3.7.1-bin
      cd conf
      cp zoo_sample.cfg zoo.cfg
    4. 启动服务端

      1
      2
      3
      # 回退到bin目录
      cd ../bin
      ./zkServer.sh start

    可能出现的问题:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /usr/bin/java
    ZooKeeper JMX enabled by default
    Using config: /opt/apache-zookeeper-3.7.1/bin/../conf/zoo.cfg
    Starting zookeeper ... FAILED TO START

    # 使用前台运行查看日志信息
    ./zkServer.sh start-foreground
    /usr/bin/java
    ZooKeeper JMX enabled by default
    Using config: /opt/apache-zookeeper-3.7.1/bin/../conf/zoo.cfg
    Error: Could not find or load main class org.apache.zookeeper.server.quorum.QuorumPeerMain
    # 可以看到原因在于找不到主类

    解决方案:要下载名字带bin的压缩包,不带bin的属于源码包,需要自行进行编译,缺少了jar包当然就找不到主类了

    左图为源码包,右图为bin

    20220606202341162

    源码编译参照README_packaging.md文档操作就可以了

    • 源码安装(TODO)

    查看服务端状态

    1
    2
    3
    4
    5
    6
    ./zkServer.sh status
    /usr/bin/java
    ZooKeeper JMX enabled by default
    Using config: /opt/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost. Client SSL: false.
    Mode: standalone

参数配置

  • 集群模式下,集群中的每台机器都需要感知到整个集群是由哪几台机器组成的

    server.id=host:port:port

    id为Server ID,用来标识机器在集群中的机器序号。同时在每台机器上,需要在数据目录创建一个myid文件,里面写入机器的id,需要确保每个机器的myid文件中的数字不同,数字范围在1~255。

    如:

    1
    2
    3
    server.1=IP1:2888:3888
    server.2=IP2:2888:3888
    server.3=IP3:2888:3888
    1
    2
    3
    4
    5
    6
    7
    # 查看状态
    ./zkServer.sh status
    /usr/bin/java
    ZooKeeper JMX enabled by default
    Using config: /opt/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
    Client port found: 2181. Client address: localhost. Client SSL: false.
    Mode: follower # 可以看到该机器属于follower

客户端脚本

bin目录下的zkCli.shzkCli.cmd就是客户端脚本

执行脚本即可连接至ZooKeeper服务端

1
bin/zkCli.sh

成功连接时会有以下输出

1
2
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]

如果没有显示指定服务器地址,默认连接的是本地ZooKeeper服务器

1
2
# 该命令可以连接指定ZooKeeper服务器
bin/zkCli.sh -server ip:port
创建

create命令用于创建一个ZooKeeper节点

1
create [-s] [-e] path data acl

说明:

-s:顺序节点

-e:临时节点

path:节点路径,必须以/开头

data:节点的值

acl:节点权限信息,不是必须的,缺省情况下不做任何权限控制

1
2
[zk: localhost:2181(CONNECTED) 0] create /crayon 0
Created /crayon
读取

与读取相关的命令有三个,分别是lsgetstat

ls

可以列出ZooKeeper指定节点下的所有子节点(第一级子节点)

1
ls path [watch]

说明:

path:节点路径

watch:监听节点

1
2
[zk: localhost:2181(CONNECTED) 1] ls /
[crayon, zookeeper]

get

可以获取ZooKeeper指定节点的数据内容

1
get path [watch]
1
2
[zk: localhost:2181(CONNECTED) 2] get /crayon
0

stat

获取节点的属性信息

1
stat path [watch]
1
2
3
4
5
6
7
8
9
10
11
12
[zk: localhost:2181(CONNECTED) 9] stat /crayon
cZxid = 0x4
ctime = Thu Jun 09 15:09:52 CST 2022
mZxid = 0x4
mtime = Thu Jun 09 15:09:52 CST 2022
pZxid = 0x4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
更新

使用set命令,可以更新指定节点的数据内容

1
set path data [version]

说明:

path:节点路径

data:要更新的数据内容

version:指定本次更新操作是基于ZNode的哪一个数据版本进行的

1
2
3
4
5
6
7
8
9
10
11
12
[zk: localhost:2181(CONNECTED) 11] stat /crayon
cZxid = 0x4
ctime = Thu Jun 09 15:09:52 CST 2022
mZxid = 0x5
mtime = Thu Jun 09 15:32:42 CST 2022
pZxid = 0x4
cversion = 0
dataVersion = 1 # 数据版本从0变为1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
删除

使用delete命令可以删除ZooKeeper上的指定节点

1
delete path [version]

version参数与set命令中的version参数作用一致

1
[zk: localhost:2181(CONNECTED) 14] delete /crayon

如果节点存在子节点则会删除失败

1
2
[zk: localhost:2181(CONNECTED) 25] delete /crayon
Node not empty: /crayon

Java客户端API使用

创建会话

通过new ZooKeeper()创建一个连接

该过程是异步的,构造方法在处理完成客户端初始化工作后立即返回,生命周期进入CONNECTING状态

在真正建立连接之后,服务端会发送给客户端一个事件通知,此时才算真正建立了会话

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* @author Crayon
* @date 2022/6/9 16:18
* 客户端连接
*/
public class ZooKeeperConnectTest {
@Test
public void test() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 4000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
// 如果收到了服务端的响应事件:连接成功
latch.countDown();
System.out.println("connected!");
}
}
});
System.out.println("waiting...");
latch.await();
System.out.println("State: " + zooKeeper.getState());
}
}

携带Session进行连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* @author Crayon
* @date 2022/6/10 10:46
* 携带Session信息进行连接
*/
public class ZooKeeperConnectWithSessionIdTest {
private static class DefaultWatch implements Watcher {
private final String name;
private final CountDownLatch latch;

public DefaultWatch(String name, CountDownLatch latch) {
this.name = name;
this.latch = latch;
}

public void process(WatchedEvent event) {
System.out.printf("name: %s, event: %s\n", name, event);
latch.countDown();
}
}

@Test
public void test() throws IOException, InterruptedException {
String connectString = "localhost:2181";
int sessionTimeout = 4000;
int count = 2;
CountDownLatch latch0 = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(count);
ZooKeeper zooKeeper0 = new ZooKeeper(connectString, sessionTimeout, new DefaultWatch("zookeeper0", latch0));
latch0.await();
long sessionId = zooKeeper0.getSessionId();
byte[] sessionPasswd = zooKeeper0.getSessionPasswd();
ZooKeeper zooKeeper1 = new ZooKeeper(connectString, sessionTimeout, new DefaultWatch("zookeeper1", latch), 1, "error-password".getBytes());
ZooKeeper zooKeeper2 = new ZooKeeper(connectString, sessionTimeout, new DefaultWatch("zookeeper2", latch), sessionId, sessionPasswd);
System.out.println("waiting...");
latch.await();
System.out.printf("zookeeper0 => sessionId: %d\n", sessionId);
System.out.printf("zookeeper2 => sessionId: %d\n", zooKeeper2.getSessionId());
}
}
创建节点

通过create方法可以进行节点的创建,分别同步异步两种方式

  • 同步方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    /**
    * @author Crayon
    * @date 2022/6/10 11:11
    * 同步节点创建
    */
    public class SyncCreateNodeTest {
    public ZooKeeper getConnect() throws InterruptedException, IOException {
    final CountDownLatch latch = new CountDownLatch(1);
    ZooKeeper zooKeeper = ZooKeeperConnectTest.getConnect(new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    if (event.getState() == Event.KeeperState.SyncConnected) {
    System.out.println("connected!");
    latch.countDown();
    }
    }
    });
    latch.await();
    return zooKeeper;
    }

    public ZooKeeper getConnect(long sessionId, byte[] sessionPasswd) throws InterruptedException, IOException {
    final CountDownLatch latch = new CountDownLatch(1);
    ZooKeeper zooKeeper = ZooKeeperConnectTest.getConnect(sessionId, sessionPasswd, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    if (event.getState() == Event.KeeperState.SyncConnected) {
    System.out.println("connected!");
    latch.countDown();
    }
    }
    });
    latch.await();
    return zooKeeper;
    }

    @Test
    public void test() throws InterruptedException, IOException, KeeperException {
    ZooKeeper zooKeeper = getConnect();
    byte[] data = "crayon-data".getBytes("utf-8");
    String path = zooKeeper.create("/crayon", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    System.out.println("path: " + path);
    ZooKeeper zooKeeper1 = getConnect(zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
    Stat stat = zooKeeper1.exists("/crayon", false);
    byte[] nodeData = zooKeeper1.getData("/crayon", false, stat);
    System.out.println("data: " + new String(nodeData, "utf-8"));
    }
    }

    临时节点属于会话级别,当连接断开后,在会话过期时间内携相同的会话信息重连的话,临时节点依然存在

  • 异步方式,通过异步回调的方式进行通知

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /**
    * @author Crayon
    * @date 2022/6/10 11:11
    * 异步节点创建
    */
    public class AsyncCreateNodeTest {
    @Test
    public void test() throws IOException, InterruptedException, KeeperException {
    final CountDownLatch latch = new CountDownLatch(1);
    ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    byte[] data = "crayon-data".getBytes("utf-8");
    zooKeeper.create("/crayon", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
    System.out.println("rc: " + rc);
    System.out.println("path: " + path);
    System.out.println("ctx: " + ctx);
    System.out.println("name: " + name);
    latch.countDown();
    }
    }, "ctx");
    latch.await();
    }
    }

    和同步接口不同的是,异步接口本身不会抛出异常,而是以回调函数中的响应码(rc字段)体现,而同步接口对于失败的情况会抛出异常

    回调的参数说明:

    20220610115820746

删除节点

同样有同步与异步两种方式

ZooKeeper只允许删除叶子节点

  • 同步方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /**
    * @author Crayon
    * @date 2022/6/14 16:36
    * 同步删除节点
    */
    public class SyncDeleteNodeTest {
    @Test
    public void test() throws IOException, InterruptedException, KeeperException {
    ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    // 删除不存在的节点,version为-1表示匹配任意version
    zooKeeper.delete("/inexist-node", -1);
    // 删除存在的节点,但是version不对应
    zooKeeper.delete("/crayon", 1);
    // 删除存在的节点,version对应
    zooKeeper.delete("/crayon", 0);
    }
    }

    操作不成功则会抛出异常

    节点不存在:org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /inexist-node

    节点版本不对应:org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /crayon

  • 异步方式

    异步方式通过状态码来判断结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    /**
    * @author Crayon
    * @date 2022/6/14 16:36
    * 同步删除节点
    */
    public class AsyncDeleteNodeTest {
    @Test
    public void test() throws IOException, InterruptedException {
    ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    final CountDownLatch latch = new CountDownLatch(3);
    AsyncCallback.VoidCallback cb = new AsyncCallback.VoidCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx) {
    latch.countDown();
    switch (KeeperException.Code.get(rc)) {
    case OK:
    System.out.println("请求成功");
    break;
    case NONODE:
    System.out.println("节点不存在");
    break;
    case BADVERSION:
    System.out.println("节点版本不对应");
    break;
    default:
    }
    System.out.println("result code: " + rc);
    System.out.println("path: " + path);
    System.out.println("context: " + ctx);
    }
    };
    // 删除不存在的节点,version为-1表示匹配任意version
    zooKeeper.delete("/inexist-node", -1, cb, "operation-0");
    // 删除存在的节点,但是version不对应
    zooKeeper.delete("/crayon", 1, cb, "operation-1");
    // 删除存在的节点,version对应
    zooKeeper.delete("/crayon", 0, cb, "operation-2");

    latch.await();
    }
    }

读取数据
getChildren
  • 同步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /**
    * @author Crayon
    * @date 2022/6/14 16:33
    * 同步获取节点数据
    */
    public class SyncGeChildrenTest {
    @Test
    public void test() throws IOException, InterruptedException, KeeperException, BrokenBarrierException {
    final CountDownLatch latch = new CountDownLatch(2);
    final ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    System.out.println(zooKeeper.getChildren("/", false));
    Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    if (event.getState() == Event.KeeperState.SyncConnected) {
    if (event.getType() == Event.EventType.NodeChildrenChanged) {
    try {
    System.out.println("re-get children: " + zooKeeper.getChildren(event.getPath(), true));
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    latch.countDown();
    }
    }
    }
    }
    };
    zooKeeper.getChildren("/", watcher);
    zooKeeper.delete("/crayon0", -1);
    zooKeeper.delete("/crayon1", -1);
    latch.await();
    }
    }

    Watcher是一次性的,在监听到一次通知并执行之后就没用了,监听下次通知需要重新注册Watcher

  • 异步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    /**
    * @author Crayon
    * @date 2022/6/14 16:33
    * 异步获取节点数据
    */
    public class AsyncGeChildrenTest {
    @Test
    public void test() throws IOException, InterruptedException, KeeperException, BrokenBarrierException {
    final CountDownLatch latch = new CountDownLatch(2);
    final ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    System.out.println(zooKeeper.getChildren("/", false));
    Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    if (event.getState() == Event.KeeperState.SyncConnected) {
    if (event.getType() == Event.EventType.NodeChildrenChanged) {
    try {
    System.out.println("re-get children: " + zooKeeper.getChildren(event.getPath(), true));
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    latch.countDown();
    }
    }
    }
    }
    };
    zooKeeper.getChildren("/", watcher, new AsyncCallback.ChildrenCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
    System.out.println("result code: " + rc);
    System.out.println("path: " + path);
    System.out.println("ctx: " + ctx);
    System.out.println("children: " + children);
    latch.countDown();
    }
    }, null);
    zooKeeper.delete("/crayon0", -1);
    latch.await();
    }
    }
getData
  • 同步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * @author Crayon
    * @date 2022/6/15 11:04
    * 同步获取节点数据
    */
    public class SyncGetDataTest {
    @Test
    public void test() throws IOException, InterruptedException, KeeperException {
    ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    final CountDownLatch latch = new CountDownLatch(1);
    zooKeeper.create("/crayon", "crayon".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    System.out.println(new String(zooKeeper.getData("/crayon", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    if (event.getState() == Event.KeeperState.SyncConnected) {
    if (event.getType() == Event.EventType.NodeDataChanged) {
    System.out.println("node data changed!");
    latch.countDown();
    }
    }
    }
    }, new Stat())));
    zooKeeper.setData("/crayon", "crayon".getBytes(), -1);
    latch.await();
    }
    }

    尽管内容没有改变,但是数据版本号变了,所以ZooKeeper也认为节点数据变化了

  • 异步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    /**
    * @author Crayon
    * @date 2022/6/15 11:04
    * 异步获取节点数据
    */
    public class AsyncGetDataTest {
    @Test
    public void test() throws IOException, InterruptedException, KeeperException {
    ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    final CountDownLatch latch = new CountDownLatch(2);
    zooKeeper.create("/crayon", "crayon".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    zooKeeper.getData("/crayon", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    if (event.getState() == Event.KeeperState.SyncConnected) {
    if (event.getType() == Event.EventType.NodeDataChanged) {
    System.out.println("node data changed!");
    latch.countDown();
    }
    }
    }
    }, new AsyncCallback.DataCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
    System.out.println("result code: " + rc);
    System.out.println("path: " + path);
    System.out.println("context: " + ctx);
    System.out.println("data: " + new String(data));
    System.out.println("stat: " + stat);
    latch.countDown();
    }
    }, null);
    zooKeeper.setData("/crayon", "crayon".getBytes(), -1);
    latch.await();
    }
    }
更新数据

更新节点数据是可以携带数据版本的,指定数据版本进行更新,但是读取数据的接口并没有提供指定数据版本的功能?

CAS:通过携带一个预期值进行比较,满足的情况下就可以进行修改

如:使用CAS将1修改为2,那么就需要携带预期值1,通过与现值比较看是否数据被并发修改过,相同则可以判定没有被并发修改,那么修改操作才可成功

实际上这种CAS策略有个ABA问题,也就是如果多个并发修改后结果不变,那么这种方式是无法感知到的

如:线程1:1 –> 2

线程2:2 –> 1

此时线程3无法感知到并发修改的存在,因为数据值与预期值(1)相同

解决这个问题的版本就是通过版本号来区分,每次修改都会递增版本号

这个数据版本就是根据CAS衍化而来的,携带的version参数作为CAS的预期值,可以通过这个version判断数据是否被并发修改了

使用这个功能可以实现分布式锁服务

  • 同步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * @author Crayon
    * @date 2022/6/15 11:36
    * 同步更新数据
    */
    public class SyncSetDataTest {
    @Test
    public void test() throws IOException, InterruptedException, KeeperException {
    ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    zooKeeper.create("/crayon", "crayon".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    zooKeeper.getData("/crayon", false, null);
    // -1不是一个特定的版本号,而是表示基于最新版本进行更新,对数据版本没有要求的情况可以使用
    Stat stat = zooKeeper.setData("/crayon", "crayon-modifyed".getBytes(), -1);
    System.out.println(stat.getCzxid());
    System.out.println(stat.getMzxid());
    System.out.println(stat.getVersion());
    }
    }

  • 异步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /**
    * @author Crayon
    * @date 2022/6/15 11:36
    * 异步更新数据
    */
    public class AsyncSetDataTest {
    @Test
    public void test() throws IOException, InterruptedException, KeeperException {
    ZooKeeper zooKeeper = SyncCreateNodeTest.getConnect();
    final CountDownLatch latch = new CountDownLatch(1);
    zooKeeper.create("/crayon", "crayon".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    zooKeeper.getData("/crayon", false, null);
    zooKeeper.setData("/crayon", "crayon-modifyed".getBytes(), -1, new AsyncCallback.StatCallback() {
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
    System.out.println("result code: " + rc);
    System.out.println("path: " + path);
    System.out.println("context: " + ctx);
    System.out.println(stat.getCzxid());
    System.out.println(stat.getMzxid());
    System.out.println(stat.getVersion());
    latch.countDown();
    }
    }, null);
    latch.await();
    }
    }
检测节点是否存在
权限控制