购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

6.4 ZooKeeper Java API操作

除了可以使用命令行方式对ZooKeeper进行操作外,ZooKeeper还提供了Java API操作接口,本节将对ZooKeeper的常用Java API接口进行介绍。

6.4.1 创建Java工程

在编写Java API之前,首先需要新建一个ZooKeeper项目。ZooKeeper项目的结构与普通的JavaSE项目一样,只是依赖的jar包不同。

1.Maven项目

在Eclipse中新建一个Maven项目zk_demo(Maven项目的搭建此处不做过多讲解),项目结构如图6-5所示。

然后在该项目的pom.xml文件中添加以下代码,以引入ZooKeeper的Java API依赖包:

     <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.6.3</version>
     </dependency>

配置好pom.xml后,即可进行ZooKeeper Java API的编写。

2.普通JavaSE项目

若用户不想使用Maven构建项目,也可以创建普通JavaSE项目。普通JavaSE项目依赖的ZooKeeper jar包主要有:两个ZooKeeper核心包zookeeper-3.6.3.jar、zookeeper-jute-3.6.3.jar和三个日志包slf4j-log4j12-1.7.25.jar、slf4j-api-1.7.25.jar、log4j-1.2.17.jar。这5个jar包都可以在ZooKeeper的安装文件apache-zookeeper-3.6.3-bin.tar.gz的lib文件夹中找到。

一个ZooKeeper的普通JavaSE项目结构如图6-6所示。

图6-5 ZooKeeper Maven项目结构

图6-6 ZooKeeper普通JavaSE项目结构

6.4.2 创建节点

ZooKeeper创建节点不支持递归调用,即无法在父节点不存在的情况下创建一个子节点。如在/zk01节点不存在的情况下创建/zk01/ch01节点,并且如果一个节点已经存在,那么创建同名节点时,会抛出NodeExistsException异常。

下面创建一个节点/ zk001,节点的元数据为zk001_data。

1.编写代码

在新建的zk_demo项目中新建Java类CreatePath.java,然后在main()方法中写入创建节点的代码。完整代码如下所示:

2.程序解读

新建一个ZooKeeper对象,传入三个参数,解析如下:

·第一个参数为以逗号分隔的服务器连接字符串,格式为“IP地址:端口”或“主机名:端口”(使用主机名,需要在系统本地配置主机名IP映射),这里需要把所有的ZooKeeper服务器的地址都写上,而不是只写其中一台。ZooKeeper客户端对象将从连接串中挑选任意一个服务器进行连接,如果连接失败,将尝试连接另外一个服务器,直到建立连接。这样的好处是能保证ZooKeeper服务的高可靠性,防止因为其中一台服务器宕机而导致连接失败。

·第二个参数为连接超时时间,这里是3秒。

·第三个参数为观察者对象,连接成功后会调用观察者对象中的回调方法,这里传入null即可。

调用ZooKeeper对象的创建节点方法create(),返回创建的节点路径,并需要传入四个参数,解析如下:

·第一个参数为节点名称。

·第二个参数为节点数据,需要转成字节数组。

·第三个参数为权限控制,这里使用ZooKeeper自带的完全开放权限Ids.OPEN_ACL_UNSAFE。

·第四个参数为创建模式CreateMode,它是一个枚举类型,共有4个取值:PERSISTENT(持久节点,这种目录节点存储的数据不会丢失,即客户端失去连接之后不会被自动删除)、PERSISTENT_SEQUENTIAL(持久顺序节点,这种节点在命名上会自动编号,根据当前已经存在的节点数自动加1)、EPHEMERAL(临时节点,客户端断开连接时,这种节点会被自动删除)、EPHEMERAL_SEQUENTIAL(临时顺序节点,客户端断开连接时,这种节点也会被自动删除)。

create()方法的定义源码如下:

     /**
      * 创建一个节点
      * @param path 节点路径
      * @param data 节点元数据
      * @param acl 节点的ACL
      * @param createMode 节点的创建模式(持久、持久顺序、临时、临时顺序)
      * @return 被创建的节点的路径
      */
     public String create(final String path, byte data[], List<ACL> acl,
             CreateMode createMode)
         throws KeeperException, InterruptedException {
     }

CreateMode枚举类的部分源码如下:

     /***
      * 定义节点创建模式
      */
     public enum CreateMode {
       //持久节点
       PERSISTENT(0, false, false),
       //持久顺序节点
       PERSISTENT_SEQUENTIAL(2, false, true),
       //临时节点
       EPHEMERAL(1, true, false),
       //临时顺序节点
       EPHEMERAL_SEQUENTIAL(3, true, true);
     }

3.运行程序

直接在Eclipse中右击运行该程序即可,观察控制台的输出结果。若能成功输出节点路径,说明创建成功。

6.4.3 修改数据

使用ZooKeeper对象的setData()方法可以修改节点的元数据。例如,将节点/zk001的元数据修改为zk001_data_new,示例代码如下:

     /**
      * 修改节点数据
      */
     @Test
     public void setNodeData() throws Exception {
       String connectStr = "centos01:2181,centos02:2181,centos03:2181";
       ZooKeeper zk = new ZooKeeper(connectStr, 3000, null);
       Stat stat = zk.setData("/zk001", "zk001_data_new".getBytes(), -1);
       //输出节点版本号
       System.out.println(stat.getVersion());
     }

setData()方法的三个参数解析如下:

·第一个参数为节点路径。

·第二个参数为需要修改的元数据,并转成字节数组。

·第三个参数为版本号,-1代表所有版本。

数据添加(或修改)成功后,会返回节点的状态信息到Stat对象中,stat.getVersion()表示获取该节点的版本号,默认新节点的版本号为0,每次对节点进行修改,版本号都会增加1。

setData()方法的定义源码如下:

     /**
      * 修改指定节点的元数据
      * @param path 节点路径
      * @param data 节点元数据
      * @param version 期望修改的版本
      * @return 节点的状态
      */
     public Stat setData(final String path, byte data[], int version)
         throws KeeperException, InterruptedException {
     }

6.4.4 获取数据

使用ZooKeeper对象的getData()方法可以获得指定节点的元数据,示例代码如下:

     /**
      * 获取节点元数据
      */
     @Test
     public void getNodeData() throws Exception {
       //ZooKeeper连接字符串
       String connectStr = "centos01:2181,centos02:2181,centos03:2181";
       ZooKeeper zk = new ZooKeeper(connectStr, 3000, null);
       Stat stat = new Stat();
       //返回指定路径上的节点数据和节点状态,节点的状态会放入stat对象中
       byte[] bytes = zk.getData("/zk002", null, stat);
       //输出节点元数据
       System.out.println(new String(bytes));
     }

上述代码获取了节点/zk002的元数据,并将该节点的状态信息放入对象Stat中,最后将元数据转成字符串输出到控制台。如需查看节点状态信息,可以从对象Stat中进行输出查看。

getData()方法的第二个参数传入的是null,也可以指定一个观察者对象Watcher,对节点数据的变化进行监听,一旦有数据改变,就会触发Watcher指定的回调方法。

对上述代码进行改进,加入观察者回调方法后,代码如下:

上述代码分析如下:

Œ process()方法是Watcher接口中的一个抽象方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process()方法进行回调,从而实现对事件的处理。

process()方法包含WatchedEvent类型的参数,WatchedEvent则包含了每一个事件的三个基本属性:通知状态(KeeperState)、事件类型(EventType)、节点路径(Path)。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process()对服务端事件进行处理。

process()方法中通过代码System.out.println(event.getType());输出服务端的事件类型,此处控制台的输出结果为NodeDataChanged。从结果单词的含义可知,节点数据被改变了。

为了能够更好地验证是否触发了Watcher,不让程序一次执行到底,从而加入了此部分代码,让程序一直停留在此处。

上述代码实现了一次性监听,当触发Watcher后不会再次触发,若需要持续进行监听,可将上述代码进行改进:定义一个Watcher对象,在process()方法中重新设置监听,当ZooKeeper节点/zk002的状态发生改变时将会触发Watcher,输出改变的事件类型。改进后的代码如下:

     /**
      * 获取节点数据,并加入观察者对象Watcher,实现持续监听
      */
     @Test
     public void getNodeDataWatch2() throws Exception {
       String connectStr = "centos01:2181,centos02:2181,centos03:2181";
       final ZooKeeper zk = new ZooKeeper(connectStr, 3000, null);
       final Stat stat = new Stat();
       //定义Watcher对象
       Watcher watcher = new Watcher() {
         //实现process()方法
         public void process(WatchedEvent event) {
           //输出事件类型
           System.out.println(event.getType());
           //重新设置监听,参数this代表当前Watcher对象
           try {
             zk.getData("/zk002", this, stat);
           } catch (Exception e) {
             e.printStackTrace();
           }
         }
       };
  
       //返回指定路径上的节点数据和节点状态,并设置Watcher监听,节点的状态会放入stat对象中
       byte[] bytes = zk.getData("/zk002", watcher, stat);
       System.out.println(new String(bytes));
       //改变节点数据,触发Watcher
       zk.setData("/zk002", "zk002_data_testwatch".getBytes(), -1);
       //为了验证是否触发了Watcher,不让程序结束
       while (true) {
         Thread.sleep(3000);
       }
     }

getData()方法的定义源码如下:

     /**
      * 返回指定节点的元数据和状态
      * @param path 节点路径
      * @param watcher 观察者对象
      * @param stat 节点的状态
      * @return 节点元数据
      */
     public byte[] getData(final String path, Watcher watcher, Stat stat)
         throws KeeperException, InterruptedException {
     }

Watcher接口的源码如下:

     package org.apache.zookeeper;
  
     /**
      * Watcher接口指定事件监听程序类必须实现的公共接口。
      * 客户端通过注册回调对象来处理监听到的事件,回调对象应该是实现Watcher接口的类的实例
      */
     public interface Watcher {
  
       /**
        * 此接口定义事件可能的状态
        */
       public interface Event {
         /**
          * ZooKeeper事件状态
          */
         public enum KeeperState {
           //客户端断开连接状态
           Disconnected(0),
           //客户端连接状态(连接到ZooKeeper集群中任何一台服务器)
           SyncConnected(3),
           //身份验证失败状态
           AuthFailed(4),
           //客户端连接到只读服务器,即当前未连接到大多数的服务器
           //接收到此状态后,唯一允许的操作是读取操作
           //此状态仅为只读客户端生成,因为读写客户端不允许连接到只读服务器
           ConnectedReadOnly(5),
           //SaslAuthenticated:用于通知客户端它们是sasl身份验证的,
           //这样客户端就可以使用sasl授权的权限执行ZooKeeper操作
           SaslAuthenticated(6),
           //连接会话过期,ZooKeeper客户端连接(会话)不再有效
           //如果要访问ZooKeeper,必须创建一个新的客户端连接(实例化一个新的ZooKeeper实例)
           Expired(-112);
  
           private final int intValue;
           KeeperState(int intValue) {
             this.intValue = intValue;
           }
           public int getIntValue() {
             return intValue;
           }
  
           public static KeeperState fromInt(int intValue) {
             switch (intValue) {
             case -1:
               return KeeperState.Unknown;
             case 0:
               return KeeperState.Disconnected;
             case 1:
               return KeeperState.NoSyncConnected;
             case 3:
               return KeeperState.SyncConnected;
             case 4:
               return KeeperState.AuthFailed;
             case 5:
               return KeeperState.ConnectedReadOnly;
             case 6:
               return KeeperState.SaslAuthenticated;
             case -112:
               return KeeperState.Expired;
             default:
               throw new RuntimeException(
                   "Invalid integer value for conversion to KeeperState");
             }
           }
         }
  
         /**
          * 发生在ZooKeeper中的事件类型
          */
         public enum EventType {
           None(-1),
           NodeCreated(1),
           NodeDeleted(2),
           NodeDataChanged(3),
           NodeChildrenChanged(4);
  
           private final int intValue;
           EventType(int intValue) {
                 this.intValue = intValue;
               }
               public int getIntValue() {
                 return intValue;
               }
  
               public static EventType fromInt(int intValue) {
                 switch (intValue) {
                 case -1:
                   return EventType.None;
                 case 1:
                   return EventType.NodeCreated;
                 case 2:
                   return EventType.NodeDeleted;
                 case 3:
                   return EventType.NodeDataChanged;
                 case 4:
                   return EventType.NodeChildrenChanged;
  
                 default:
                   throw new RuntimeException(
                       "Invalid integer value for conversion to EventType");
                 }
               }
             }
           }
          /**
           * 事件回调方法
           */
           abstract public void process(WatchedEvent event);
         }

6.4.5 删除节点

使用ZooKeeper对象的delete()方法可以对指定路径节点进行删除。例如,删除节点/zk002,代码如下:

     /**
      * 删除节点
      */
     @Test
     public void deletePath() throws Exception {
       String connectStr = "centos01:2181,centos02:2181,centos03:2181";
       ZooKeeper zk = new ZooKeeper(connectStr, 3000, null);
       //删除节点
       zk.delete("/zk002", -1);
     }

上述代码中,delete()方法的两个参数解析如下:

·第一个参数为需要删除的节点路径。

·第二个参数为节点版本,-1代表删除所有版本。

delete()方法的定义源码如下: 4jB4Sh+UPvwelNTZaRj1wu38sjIvEV7QjBQQmPgzBcpM87m3VIz83NK1uf7gekX+

     /**
      * 删除指定节点
      * @param path 被删除的节点路径
      * @param version 期望被删除的节点版本
      */
     public void delete(final String path, int version)
       throws InterruptedException, KeeperException{
     }
点击中间区域
呼出菜单
上一章
目录
下一章
×