Zookeeper C语言API封装和注意事项

背景说明

Zookeeper在分布式系统中扮演一个协调和少量数据存储的服务,具体可见维基百科的介绍,它提供了Java和C的简单接口供开发者使用。简单来说,可以将Zookeeper看做一个分布式的文件系统,它提供一个树形结构存储,每个节点可以存储信息,同时提供Watcher机制可以像Linux文件系统inotify那样监控文件系统的变化。
最近有机会使用Zookeeper作为项目的配置管理存储服务,项目使用C++作为主要开发语言,由于原生C接口提供的功能非常简单和底层,比较难用,我决定封装Zookeeper提供的C接口,将使用原API过程中遇到的一些问题记录总结下来,可以帮助到需要使用Zookeeper C语言API的读者。

封装API地址

封装好的API的地址:
https://github.com/godmoon/CppZooKeeperApi/blob/master/CppZookeeper.h
https://github.com/godmoon/CppZooKeeperApi/blob/master/CppZookeeper.cpp

单元测试用例地址:
https://github.com/godmoon/MoonLib/blob/master/gtest/src/CppZooKeeperTest.cpp

ZOOKEEPER接口介绍

Zookeeper提供的的接口主要集中在头文件/src/c/include/zookeeper.h中,其中主要包括以下方面的内容:

  • 常量类型
    • 全局错误码:ZOO_ERRORS
    • ACL(Access Control List,权限控制列表)控制参数
    • 日志级别:ZooLogLevel
    • 事件类型:ZOOKEEPER_WRITE,ZOOKEEPER_READ
    • 节点创建类型:ZOO_EPHEMERAL,ZOO_SEQUENCE
    • 连接状态:ZOO_*_STATE
    • 事件类型:ZOO_*_EVENT
  • 所需的数据结构和类型
    • 句柄:_zhandle
    • 会话ID:clientid_t
    • 批量操作相关:zoo_op,zoo_op_result
    • Watcher的定义:watcher_fn
    • 回调函数的定义:*_completion_t
  • API接口
    • Zookeeper初始化和连接:
      • zookeeper_init
      • zookeeper_close
      • zookeeper_interest
      • zookeeper_process
      • is_unrecoverable
      • zoo_add_auth
    • 批量操作类型生成
      • zoo_*_op_init
    • 内部数据操作接口
      • zoo_client_id
      • zoo_get_context
      • zoo_set_context
      • zoo_recv_timeout
      • zoo_set_watcher
      • zookeeper_get_connected_host
      • zoo_state
      • zerror
      • zoo_set_debug_level
      • zoo_set_log_stream
      • zoo_deterministic_conn_order
    • 节点操作接口
      • create系列:zoo_acreate,zoo_create
      • delete系列:zoo_adelete,zoo_delete
      • exists系列:zoo_aexists,zoo_awexists,zoo_exists,zoo_wexists
      • get系列:zoo_aget,zoo_awget,zoo_get,zoo_wget
      • set系列:zoo_aset,zoo_set,zoo_set2
      • get_children系列:zoo_aget_children,zoo_awget_children,zoo_aget_children2,zoo_awget_children2,zoo_get_children,zoo_wget_children,zoo_get_children2,zoo_wget_children2
      • 强制节点同步Leader:zoo_async,在切换到一个Observer节点时可能会出现读到老数据,调用此接口能强制让连接的节点同步Leader。
      • acl控制接口:zoo_aget_acl,zoo_aset_acl,zoo_get_acl,zoo_set_acl
      • 批量操作接口:zoo_amulti,zoo_multi
Zookeeper库

Zookeeper提供了多线程和单线程库。
多线程库会在内部创建2个线程,一个用于IO进行事件驱动,一个用于回调和Watcher的调用。
单线程需要用户调用zookeeper_interest来进行事件驱动,回调和Watcher均在zookeeper_interest中完成。
二者的接口一致,内部有适配器进行匹配。

由于多线程库操作起来比较简单,选择使用多线程库进行封装。需要注意的是所有的回调和Watcher内部对于线程间可能会同时操作的数据需要加锁保护。

需要的功能

了解了接口之后,对于C++ API的封装可以从以下几个方面着手:

  • 封装句柄的连接和释放。
  • 封装一系列内部数据操作接口。
  • 封装节点操作接口,节点操作接口不仅提供基础操作,还提供了异步、注册自定义Watcher、带额外节点信息等衍生接口,异步的在操作名称前面带’a’,注册自定义Watcher在名称前面加’w’,部分带Stat信息的接口在名称后面带2。在封装API接口的设计中可以通过默认参数简化接口,提高易用度。
  • 断线重连的状态恢复,比如临时节点和Watcher的恢复。
  • 由于原始API只支持函数指针方式回调,这样限制了接口的参数,因此封装的API要能够支持函数对象回调。
接口使用注意事项

接口封装的整体结构图如下:


在接口实现的过程中,有一些需要特别注意的地方:

  • 异步请求
    异步请求使用函数对象进行回调,封装API内部实现了所有的原始API的回调函数,用户在调用封装API接口时,会生成一个context上下文信息,用于保存本请求的一些数据,比如用户的回调函数对象。
    请求流程序列图如下:

  • Watcher注册机制
    Watcher的管理机制是这个API封装最复杂的地方,为了制定一个合理的封装方案,必须要先了解清楚Watcher的管理机制:

    • 对于同一个节点,Watcher是可以存在多个的,但是这个机制比较复杂,它存在三个数据,组合起来作为一个Key:
      • Watcher类型(GetExists,GetChildren)
        • Get:触发ZOO_DELETED_EVENT和ZOO_CHANGED_EVENT事件。
          当节点不存在时,无法注册Watcher,这点要和Exists区分。
        • Exists:触发ZOO_CREATED_EVENT、ZOO_DELETED_EVENT和ZOO_CHANGED_EVENT事件。
          这里还要考虑到节点不存在时,Watcher也是可以通过Exists接口注册成功的,在处理ZOO_DELETED_EVENT事件时,如果这个节点注册了Exists事件,那么这个事件需要重注册。
        • GetChildren:触发ZOO_CHILD_EVENT事件和ZOO_DELETED_EVENT事件。
          对于ZOO_DELETED_EVENT事件,Exists和GetChildren使用同样的Watcher和参数注册后,如果节点被删除,这个事件只会触发一次。
      • Watcher回调函数,自定义的和全局的不同,不同的自定义回调函数地址也算不同。
      • 传入的自定义数据指针地址。

    同样的参数,使用同样的方法注册2次,只会触发一次,因此封装的API在管理Watcher时,也需要排除重复。

    内部实现,可以将Get和Exists看做是对节点的事件注册,GetChildren就是对节点的子节点的事件注册。区别在于Exists允许节点不存在的时候注册事件,当节点创建时就会触发ZOO_CREATED_EVENT事件,而Get只允许在节点已经存在的时候注册事件。这里就会有一个问题,既然Get和Exists在内部都是针对节点进行事件3注册,如果Get和Exists使用同样的回调函数和参数注册,那么ZOO_DELETED_EVENT或ZOO_CHANGED_EVENT事件仅会触发一次。因此,如果需要自动注册Watcher,需要把Get和Exists合并管理,而且全局回调和自定义回调的处理方式也有所不同。

    在Exists、Get和GetChildren接口以及对应的异步接口中,需要对Watcher数据进行管理,以便可以进行关键功能重注册操作。将Watcher管理起来的一个必要条件,是Watcher生效,只需要请求成功了,那么Watcher就会生效。一个例外是,对于Exists接口,返回码为ZNONODE时,Watcher也会生效。
    实现上,同步接口可以在调用原始API后判断返回码直接添加Watcher。而异步接口需要在异步回调中判断返回码再添加Watcher,相关的信息可以在异步请求时加入到Context中,在回调中读取出来使用。

    这里可以稍微改进一下,将Watcher分为节点和子节点,所有节点的重注册,均使用Exists,而不再使用Get了,这样做能够将问题简化不少,API使用者也无需关注Exists和Get的区别了,只需要知道,节点Watcher在节点删除后,Watcher还是可以重注册,而子节点Watcher在节点删除后,不会再重注册了。

  • 断线重连
    Zookeeper API内部实现了断线重连功能,默认情况下,断线后,每隔10秒会尝试重连一次,重连后相关的Watcher仍然会生效,但是前提是服务端保存的Session并未失效,如果服务端Session已经失效,那么在重连成功后会收到ZOO_SESSION_EVENT事件,此时连接状态为ZOO_EXPIRED_SESSION_STATE,后续的操作都会失败,返回错误码ZSESSIONEXPIRED。
    此时需要手工重连直到成功,重连成功后,在全局Watcher中会收到ZOO_SESSION_EVENT并且状态为ZOO_CONNECTED_STATE的事件,这里要重新注册所有的Watcher并且恢复临时节点,因此如果使用到了Watcher和临时节点功能,这部分工作在封装的API中也是必不可少的,可以使用内部的全局Watcher来完成这一系列的操作。
    而实现此功能,就需要记住下面所有的信息:

    • 用户所有的临时节点信息:路径、数据、ACL和Flag,关于临时节点的操作,后面有单独的小节来说明。
    • 全局Watcher函数对象以及上下文数据。
    • 注册到全局Watcher上的所有路径以及对应的注册方式(Get、Exists和GetChildren)。
    • 用户自定义Watcher函数对象以及上下文数据。

    以上数据均可以使用Map存储于封装后的API中,当操作检测到Session失效(收到Session状态变为ZSESSIONEXPIRED),进行重连并且重新注册所有的Watcher,创建所有的临时节点。注意这些数据的操作如果在全局Watcher中进行,意味着它与用户调用的接口处于不同的线程,需要进行加锁保护。

  • Watcher自动重注册
    每个Watcher触发一次之后,就不会再触发第二次,如果仍需要触发,需要再次调用Watcher注册函数进行再次注册,如果需要API自动重注册,需要考虑到以下几个问题:

    • 回调是全局回调还是自定义回调?
      这个可以通过传入的参数data来区分,在data中保存相应的函数对象即可。
    • 前面说过,对于一个路径,Get和Exists注册的回调只会触发一次,内部实际上都是当做节点的事件注册,那么用户是希望再次注册是使用Exists还是Get呢?
      Get和Exists触发的事件,只有ZOO_CREATED_EVENT是有区别的,而这个仅在节点不存在时调用Exists才能够注册成功。按照原始API的实现方式,调用了Exists,如果节点存在,相当于调用了Get进行注册;如果节点不存在,那么仅注册ZOO_CREATED_EVEN。
      对于全局Watcher的记录,不同的注册类型使用位记录,设计时可以把Exists类型值包含Get类型:
    • 而对于自定义Watcher,只需要将路径对应的上下文保存下来即可,因为每次注册自定义Watcher,都会创建一个完全不同的上下文实例(指针值不同),因此不会出现Exists和Get冲突的问题。
    • 如何停止重注册?何时进行重注册?
      如果要实现重注册功能,就必须要提供相应的方式让用户决定停止重新注册,这个可以通过回调函数的返回值来实现,如果在回调后根据返回值来决定是否重注册,就会涉及到一个问题,注册Watcher必须调用Get、Exists或者GetChildren函数,这些函数会对本地缓存产生影响,如果调用用户回调和重注册之间,数据变了,那么用户可能就失去感知数据变化的能力,导致Watcher丢通知的情况发生,所以只能选择在回调函数前重注册,如果用户不需要重注册了,需要将Watcher从Zookeeper API和封装API中删除。
      然而原生API并没有提供这样的功能,通过原生API的代码,可以知道它把Watcher存储在zhandle_t中的3个zk_hashtable中:

      hashtable中存储的是watcher_object_list,这是一个watcher_object_t的单向链表,而watcher_object_t中就存储的是最终的数据:Watcher回到函数指针和context指针,还有链表下一个节点的地址。相关代码在zk_hashtable.c中:

      通过函数collectWatchers可以了解到客户端触发Watcher的方式:

      上面的函数根据不同的Watcher类型,从3个hashtable中取出对应路径的watcher,加入到list中,并且删除原hashtable中的记录。刚开始我是用这个函数去获得需要删除的Watcher,但是这个函数只能把对应路径中所有的Watcher一次性全部删除,这个并不符合我们的要求,一个路径可能对应多个Watcher,不能全部删除,否则会出Bug。
      了解了数据结构和数据对象,就可以写出删除指定context的Watcher的方法了:

      对于内部的数据结构,因为原生API并没有暴露出来,这里我们只能很恶心地把它重定义一份,其中需要注意的是,zk_adaptor.h函数中的_zhandle定义,是区分多线程和单线程的,多线程的库在编译前定义了宏THREADED,里面多了一些线程相关的变量,导致这个宏会影响到最终_zhandle的结构,所以在包含zk_adaptor.h前,一定要定义THREADED宏,否则变量内容都是错误的。

      另一个需要注意的情况是节点删除时的处理,因为节点被删除,Get和GetChildren类型节点无法重注册
      自动重注册在提供一定的便利性的同时,也会导致多调用一次API接口用于注册Watcher,这个在不同的项目场景下有不同的取舍。后续可以将这个接口获得的数据传递给用户的Watcher,从而减少用户可能进行的重复数据获取操作。
      Watcher的管理流程时序图如下:

  • Multi操作的机制
    Multi操作会按照用户传入的顺序进行操作,当遇到第一个失败时,操作即停止,后续的操作不再进行。返回的zoo_op_result_t中,前面成功的操作返回码为0,失败的操作返回码为该操作的返回码,后面所有的操作均返回ZRUNTIMEINCONSISTENCY(-2),表示并未执行。

  • 临时节点的维护
    临时节点的维护也并不简单,并不是用户创建了临时节点,在封装API中记录下来就完事了,后面如果用户对这个节点进行写操作(Set、Delete、改变ACL)时,都需要对记录的信息做修改,在Multi操作的封装中,也要对每个操作进行同样的判断,否则会出现Session失效重连恢复的状态和断开连接之前不一致的情况。

    • 在同步操作中,如果请求成功了,则更新临时节点数据。
    • 对于异步操作稍微麻烦一些,需要在异步回调中判断操作是否成功,如果成功了,才更新临时节点数据。
    • 对于Multi操作,需要对结果进行遍历,只更新成功的操作部分。
  • 原始API数据结构的内存管理
    Zookeeper C语言API对于内存管理比较简单粗暴,对于用惯了C++的同学来说会感觉有点麻烦,甚至会遇到一些坑,表现在部分同步操作返回的结果需要用户在使用完毕后手工释放资源,而异步操作返回的结果由Zookeeper API调用回调之后自动释放,具体问题和解决方法如下:

    • 在封装API析构函数中调用zookeeper_close来关闭连接。
    • get_children系列函数获得的String_vector对象在每次调用完成后需要调用方释放。这里有2个问题,一个是需要手工释放,第二个是不可重复使用
      对于第一个问题,可以封装一个自动释放的类型ScopedStringVector,为了兼容之前的接口以及使用方便,采用继承而不是组合,因为有自动释放,所以禁用赋值、拷贝、转移构造函数,当然你也可以自己重新申请内存进行深拷贝。
      对于第二个问题,封装接口zoo_get_children,在使用前对传入的ScopedStringVector调用Clear()函数释放内存。
      ScopedStringVector不带深拷贝的代码如下,其中deallocate_String_vector位于zookeeper.jute.h中,包含zookeeper.h时会自动包含:
    • 另一个需要调用后手工释放的类型是ACL_vector,和String_vector一样,封装一下,让它带析构自动释放功能,并且封装zoo_get_acl函数,在每次使用前调用Clear()释放内存。
  • 封装API的内存管理
    封装的API会显式申请堆内存保存以下数据:

    • 全局Context
    • 所有异步调用的Context
    • 所有Watcher的Context
    • 异步操作临时节点时,保存临时节点的信息用于成功后修改临时节点数据
    • 批量操作的路径和数据信息

所有的堆内存几乎都使用std::shared_ptr进行管理,在结构失效时,自动释放内存,大大降低了内存泄露的可能性。
API也经过了全面的测试用例的测试,使用valgrind并未检测出内存泄露。


  • 由于我们封装的是多线程的API,如果在Watcher或者回调函数和调用线程中同时对内部成员对象进行操作,那么就需要使用线程锁对成员对象加以保护,因为在Watcher或者回调函数是由Zookeeper API创建出来的线程调用。
    为了避免Zookeeper线程调用接口时对同一个锁多次加锁导致死锁,建议使用可重入锁(递归锁,C++中为std::recursive_mutex)进行保护,同时设计上避免死锁的情况发生。
  • 二进制值
    Zookeeper节点的值是二进制格式,因此如果存储的是字符串的话,如果末尾不包含’\0’,通过Get获得的值是不包含结束符的,如果直接使用字符串操作函数,很有可能会存在溢出问题,因此API提供了获得C语言风格字符串值的接口GetCString方便用户调用。
  • 递归接口
    原始API是不包含递归接口的,意味着递归创建或者删除路径需要比较复杂的操作。而递归操作在很多操作中都有比较重要的作用,比如Session过期后重连恢复临时节点时,需要创建临时节点的父路径。
    封装的API内部使用Multi接口实现了递归创建和删除节点的功能。
    对于递归创建接口,先使用Multi接口的Exists操作逐个判断节点是否存在,从第一个不存在的节点处,将后面的节点全部使用Multi接口的Create操作创建。
    对于递归删除接口,使用深度遍历,将要删除的路径的所有子节点遍历出来,然后逆序添加到Multi中进行批量删除。
    这里的实现过程中也需要考虑到临时节点信息的维护,如果删除接口中某个临时节点被其他连接删除了,那么在Multi中不会对这个临时节点数据做删除,所以连接重连后会重新创建临时节点,这是我们不希望看到的。因此需要在删除的递归接口中对所有临时节点信息进行遍历,如果临时节点的前缀与删除的路径一致,说明用户希望删除这个临时节点信息,此时可以将这个临时节点信息删除。
  • 相对路径
    原始API中已经支持相对路径,但是相对路径对于内部维护Watcher和临时节点并不方便,使用绝对路径不容易出错,因此在封装的API中,自己维护了相对路径的处理,所有操作都会在内部通过ChangeToAbsPath接口转换为绝对路径后再传给原始API处理,即没有用到原始API的相对路径功能。
  • 使用Client ID恢复Session
    ZooKeeper每个连接在Server端都会使用一个Client ID进行标识,客户端在连接Server成功后可以获得这个Client ID和对应随机生成的密码。原始API也支持在连接的时候传入Client ID和密码。这个Client ID和密码是用来重连恢复Session的。
    当程序意外退出,但是连接没关闭,Server端并不会释放Session,Session管理的临时节点也不会被删除,而是等到Session超时时间达到后再进行释放。
    此时如果服务重启,可以使用之前的Cliend ID和密码进行连接,恢复Session。如果Session已经超时,全局Watcher会收到Session超时的事件。如果Session恢复成功,那么用户端需要重建所有Watcher和临时节点信息。

    • 对于Watcher,重新注册即可。
    • 对于临时节点,不能删除原有的临时节点,而是要进行“托管”,即,如果临时节点存在,并且节点的OWNER(可通过Get节点的Stat数据获得)是自己的Client ID,则不作处理,将其加入到内部临时节点数据即可。
测试

在封装API的测试上,使用了gtest测试框架,辅以覆盖率数据作为参考,尽可能提高覆盖率,目前测试用例的覆盖率在80%以上。
对于ZooKeeper的超时重连的测试,在测试用例中使用iptabls可以模拟ZooKeeper与客户端之间断线:

具体代码可以参考测试用例代码:
https://github.com/godmoon/MoonLib/blob/master/gtest/src/CppZooKeeperTest.cpp

后续工作

上面对于Zookeeper的特性都只是建立在白盒测试观察的基础上,还有很多问题待解决,比如对于异步回调函数,如果请求发出后,没有回包,回调函数是否还会调用?如果不调用,参数的内存是否能够释放?如果要确定内部工作方式,还是需要花多点时间研究一下源代码才能确定,这部分后面再补充。

参考文档

zookeeper使用笔记
Zookeeper C API 指南二(监视(Wathes), 基本常量和结构体介绍)
zookeeper c 客户端源码分析以及使用注意点