除了可以使用命令行方式对ZooKeeper进行操作外,ZooKeeper还提供了Java API操作接口,本节将对ZooKeeper的常用Java API接口进行介绍。
在编写Java API之前,首先需要新建一个ZooKeeper项目。ZooKeeper项目的结构与普通的JavaSE项目一样,只是依赖的jar包不同。
在Eclipse中新建一个Maven项目zk_demo(Maven项目的搭建此处不做过多讲解),项目结构如图6-5所示。
然后在该项目的pom.xml文件中添加以下代码,以引入ZooKeeper的Java API依赖包:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.3</version> </dependency>
配置好pom.xml后,即可进行ZooKeeper Java API的编写。
若用户不想使用Maven构建项目,也可以创建普通JavaSE项目。普通JavaSE项目依赖的ZooKeeper jar包主要有:两个ZooKeeper核心包zookeeper-3.6.3.jar、zookeeper-jute-3.6.3.jar和三个日志包slf4j-log4j12-1.7.25.jar、slf4j-api-1.7.25.jar、log4j-1.2.17.jar。这5个jar包都可以在ZooKeeper的安装文件apache-zookeeper-3.6.3-bin.tar.gz的lib文件夹中找到。
一个ZooKeeper的普通JavaSE项目结构如图6-6所示。
图6-5 ZooKeeper Maven项目结构
图6-6 ZooKeeper普通JavaSE项目结构
ZooKeeper创建节点不支持递归调用,即无法在父节点不存在的情况下创建一个子节点。如在/zk01节点不存在的情况下创建/zk01/ch01节点,并且如果一个节点已经存在,那么创建同名节点时,会抛出NodeExistsException异常。
下面创建一个节点/ zk001,节点的元数据为zk001_data。
在新建的zk_demo项目中新建Java类CreatePath.java,然后在main()方法中写入创建节点的代码。完整代码如下所示:
新建一个ZooKeeper对象,传入三个参数,解析如下:
·第一个参数为以逗号分隔的服务器连接字符串,格式为“IP地址:端口”或“主机名:端口”(使用主机名,需要在系统本地配置主机名IP映射),这里需要把所有的ZooKeeper服务器的地址都写上,而不是只写其中一台。ZooKeeper客户端对象将从连接串中挑选任意一个服务器进行连接,如果连接失败,将尝试连接另外一个服务器,直到建立连接。这样的好处是能保证ZooKeeper服务的高可靠性,防止因为其中一台服务器宕机而导致连接失败。
·第二个参数为连接超时时间,这里是3秒。
·第三个参数为观察者对象,连接成功后会调用观察者对象中的回调方法,这里传入null即可。
调用ZooKeeper对象的创建节点方法create(),返回创建的节点路径,并需要传入四个参数,解析如下:
·第一个参数为节点名称。
·第二个参数为节点数据,需要转成字节数组。
·第三个参数为权限控制,这里使用ZooKeeper自带的完全开放权限Ids.OPEN_ACL_UNSAFE。
·第四个参数为创建模式CreateMode,它是一个枚举类型,共有4个取值:PERSISTENT(持久节点,这种目录节点存储的数据不会丢失,即客户端失去连接之后不会被自动删除)、PERSISTENT_SEQUENTIAL(持久顺序节点,这种节点在命名上会自动编号,根据当前已经存在的节点数自动加1)、EPHEMERAL(临时节点,客户端断开连接时,这种节点会被自动删除)、EPHEMERAL_SEQUENTIAL(临时顺序节点,客户端断开连接时,这种节点也会被自动删除)。
create()方法的定义源码如下:
/** * 创建一个节点 * @param path 节点路径 * @param data 节点元数据 * @param acl 节点的ACL * @param createMode 节点的创建模式(持久、持久顺序、临时、临时顺序) * @return 被创建的节点的路径 */ public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { }
CreateMode枚举类的部分源码如下:
/*** * 定义节点创建模式 */ public enum CreateMode { //持久节点 PERSISTENT(0, false, false), //持久顺序节点 PERSISTENT_SEQUENTIAL(2, false, true), //临时节点 EPHEMERAL(1, true, false), //临时顺序节点 EPHEMERAL_SEQUENTIAL(3, true, true); }
直接在Eclipse中右击运行该程序即可,观察控制台的输出结果。若能成功输出节点路径,说明创建成功。
使用ZooKeeper对象的setData()方法可以修改节点的元数据。例如,将节点/zk001的元数据修改为zk001_data_new,示例代码如下:
/** * 修改节点数据 */ @Test public void setNodeData() throws Exception { String connectStr = "centos01:2181,centos02:2181,centos03:2181"; ZooKeeper zk = new ZooKeeper(connectStr, 3000, null); Stat stat = zk.setData("/zk001", "zk001_data_new".getBytes(), -1); //输出节点版本号 System.out.println(stat.getVersion()); }
setData()方法的三个参数解析如下:
·第一个参数为节点路径。
·第二个参数为需要修改的元数据,并转成字节数组。
·第三个参数为版本号,-1代表所有版本。
数据添加(或修改)成功后,会返回节点的状态信息到Stat对象中,stat.getVersion()表示获取该节点的版本号,默认新节点的版本号为0,每次对节点进行修改,版本号都会增加1。
setData()方法的定义源码如下:
/** * 修改指定节点的元数据 * @param path 节点路径 * @param data 节点元数据 * @param version 期望修改的版本 * @return 节点的状态 */ public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException { }
使用ZooKeeper对象的getData()方法可以获得指定节点的元数据,示例代码如下:
/** * 获取节点元数据 */ @Test public void getNodeData() throws Exception { //ZooKeeper连接字符串 String connectStr = "centos01:2181,centos02:2181,centos03:2181"; ZooKeeper zk = new ZooKeeper(connectStr, 3000, null); Stat stat = new Stat(); //返回指定路径上的节点数据和节点状态,节点的状态会放入stat对象中 byte[] bytes = zk.getData("/zk002", null, stat); //输出节点元数据 System.out.println(new String(bytes)); }
上述代码获取了节点/zk002的元数据,并将该节点的状态信息放入对象Stat中,最后将元数据转成字符串输出到控制台。如需查看节点状态信息,可以从对象Stat中进行输出查看。
getData()方法的第二个参数传入的是null,也可以指定一个观察者对象Watcher,对节点数据的变化进行监听,一旦有数据改变,就会触发Watcher指定的回调方法。
对上述代码进行改进,加入观察者回调方法后,代码如下:
上述代码分析如下:
process()方法是Watcher接口中的一个抽象方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process()方法进行回调,从而实现对事件的处理。
process()方法包含WatchedEvent类型的参数,WatchedEvent则包含了每一个事件的三个基本属性:通知状态(KeeperState)、事件类型(EventType)、节点路径(Path)。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process()对服务端事件进行处理。
process()方法中通过代码System.out.println(event.getType());输出服务端的事件类型,此处控制台的输出结果为NodeDataChanged。从结果单词的含义可知,节点数据被改变了。
为了能够更好地验证是否触发了Watcher,不让程序一次执行到底,从而加入了此部分代码,让程序一直停留在此处。
上述代码实现了一次性监听,当触发Watcher后不会再次触发,若需要持续进行监听,可将上述代码进行改进:定义一个Watcher对象,在process()方法中重新设置监听,当ZooKeeper节点/zk002的状态发生改变时将会触发Watcher,输出改变的事件类型。改进后的代码如下:
/** * 获取节点数据,并加入观察者对象Watcher,实现持续监听 */ @Test public void getNodeDataWatch2() throws Exception { String connectStr = "centos01:2181,centos02:2181,centos03:2181"; final ZooKeeper zk = new ZooKeeper(connectStr, 3000, null); final Stat stat = new Stat(); //定义Watcher对象 Watcher watcher = new Watcher() { //实现process()方法 public void process(WatchedEvent event) { //输出事件类型 System.out.println(event.getType()); //重新设置监听,参数this代表当前Watcher对象 try { zk.getData("/zk002", this, stat); } catch (Exception e) { e.printStackTrace(); } } }; //返回指定路径上的节点数据和节点状态,并设置Watcher监听,节点的状态会放入stat对象中 byte[] bytes = zk.getData("/zk002", watcher, stat); System.out.println(new String(bytes)); //改变节点数据,触发Watcher zk.setData("/zk002", "zk002_data_testwatch".getBytes(), -1); //为了验证是否触发了Watcher,不让程序结束 while (true) { Thread.sleep(3000); } }
getData()方法的定义源码如下:
/** * 返回指定节点的元数据和状态 * @param path 节点路径 * @param watcher 观察者对象 * @param stat 节点的状态 * @return 节点元数据 */ public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { }
Watcher接口的源码如下:
package org.apache.zookeeper; /** * Watcher接口指定事件监听程序类必须实现的公共接口。 * 客户端通过注册回调对象来处理监听到的事件,回调对象应该是实现Watcher接口的类的实例 */ public interface Watcher { /** * 此接口定义事件可能的状态 */ public interface Event { /** * ZooKeeper事件状态 */ public enum KeeperState { //客户端断开连接状态 Disconnected(0), //客户端连接状态(连接到ZooKeeper集群中任何一台服务器) SyncConnected(3), //身份验证失败状态 AuthFailed(4), //客户端连接到只读服务器,即当前未连接到大多数的服务器 //接收到此状态后,唯一允许的操作是读取操作 //此状态仅为只读客户端生成,因为读写客户端不允许连接到只读服务器 ConnectedReadOnly(5), //SaslAuthenticated:用于通知客户端它们是sasl身份验证的, //这样客户端就可以使用sasl授权的权限执行ZooKeeper操作 SaslAuthenticated(6), //连接会话过期,ZooKeeper客户端连接(会话)不再有效 //如果要访问ZooKeeper,必须创建一个新的客户端连接(实例化一个新的ZooKeeper实例) Expired(-112); private final int intValue; KeeperState(int intValue) { this.intValue = intValue; } public int getIntValue() { return intValue; } public static KeeperState fromInt(int intValue) { switch (intValue) { case -1: return KeeperState.Unknown; case 0: return KeeperState.Disconnected; case 1: return KeeperState.NoSyncConnected; case 3: return KeeperState.SyncConnected; case 4: return KeeperState.AuthFailed; case 5: return KeeperState.ConnectedReadOnly; case 6: return KeeperState.SaslAuthenticated; case -112: return KeeperState.Expired; default: throw new RuntimeException( "Invalid integer value for conversion to KeeperState"); } } } /** * 发生在ZooKeeper中的事件类型 */ public enum EventType { None(-1), NodeCreated(1), NodeDeleted(2), NodeDataChanged(3), NodeChildrenChanged(4); private final int intValue; EventType(int intValue) { this.intValue = intValue; } public int getIntValue() { return intValue; } public static EventType fromInt(int intValue) { switch (intValue) { case -1: return EventType.None; case 1: return EventType.NodeCreated; case 2: return EventType.NodeDeleted; case 3: return EventType.NodeDataChanged; case 4: return EventType.NodeChildrenChanged; default: throw new RuntimeException( "Invalid integer value for conversion to EventType"); } } } } /** * 事件回调方法 */ abstract public void process(WatchedEvent event); }
使用ZooKeeper对象的delete()方法可以对指定路径节点进行删除。例如,删除节点/zk002,代码如下:
/** * 删除节点 */ @Test public void deletePath() throws Exception { String connectStr = "centos01:2181,centos02:2181,centos03:2181"; ZooKeeper zk = new ZooKeeper(connectStr, 3000, null); //删除节点 zk.delete("/zk002", -1); }
上述代码中,delete()方法的两个参数解析如下:
·第一个参数为需要删除的节点路径。
·第二个参数为节点版本,-1代表删除所有版本。
delete()方法的定义源码如下:
/** * 删除指定节点 * @param path 被删除的节点路径 * @param version 期望被删除的节点版本 */ public void delete(final String path, int version) throws InterruptedException, KeeperException{ }