站起来活动活动吧~

大数据

Zookeeper入门教程(二):Zookeeper API 编程操作教程

2021年03月26日 11:41:58 · 本文共 2,287 字阅读时间约 8分钟 · 3,096 次浏览
Zookeeper入门教程(二):Zookeeper API 编程操作教程

上一篇文章我们已经成功安装了 Zookeeper,虽然可以使用命令行操作 Zookeeper,但大部分时间还是通过编程调用操作 Zookeeper 的,所以本文就简单介绍一下入门级的基础操作。

本文全部代码公开在:https://github.com/renfei/demo/tree/master/zookeeper/zookeeper-zpi

基础条件

因为演示多个操作,Zookeeper 的客户端需要创建很多次,那为了偷懒,我们使用单元测试的 @Before 来为我们每次新建一个客户端:

public class ZookeeperApiDemo {
    private static final String CONNECT_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 2000;
    private ZooKeeper zkClient = null;
    @Before
    public void init() throws Exception {
        zkClient = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
            // 收到事件通知后的回调函数(用户的业务逻辑)
            System.out.println(event.getType() + "--" + event.getPath());
            // 再次启动监听
            try {
                zkClient.getChildren("/", true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

完整代码公开在:https://github.com/renfei/demo/blob/master/zookeeper/zookeeper-zpi/src/main/java/net/renfei/zookeeper/ZookeeperApiDemo.java

基础增删查改

/**
 * 创建子节点
 *
 * @throws Exception
 */
@Test
public void create() throws Exception {
    // 参数:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
    String nodeCreated = zkClient.create("/renfei", "demo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    System.out.println(nodeCreated);
}
/**
 * 放置数据
 *
 * @throws KeeperException
 * @throws InterruptedException
 */
@Test
public void set() throws KeeperException, InterruptedException {
    Stat stat = zkClient.setData("/renfei", "多大的".getBytes(), 0);
    System.out.println(stat.toString());
}
/**
 * 获取数据
 *
 * @throws KeeperException
 * @throws InterruptedException
 */
@Test
public void get() throws KeeperException, InterruptedException {
    Stat stat = new Stat();
    byte[] dataBytes = zkClient.getData("/renfei", true, stat);
    // 注意这个 Version 版本号,版本不对写入不进去
    System.out.println(stat.getVersion());
    System.out.println(new String(dataBytes));
}
/**
 * 存在检测
 *
 * @throws KeeperException
 * @throws InterruptedException
 */
@Test
public void exists() throws KeeperException, InterruptedException {
    Stat stat = zkClient.exists("/renfei", false);
    if (stat == null) {
        System.out.println("节点不存在");
    } else {
        System.out.println(stat.getDataLength());
    }
}
/**
 * 删除数据
 *
 * @throws KeeperException
 * @throws InterruptedException
 */
@Test
public void delete() throws KeeperException, InterruptedException {
    Stat stat = zkClient.exists("/renfei", false);
    if (stat != null) {
        zkClient.delete("/renfei", stat.getVersion());
    }
}

注册监听

Zookeeper 为我们提供了节点变化通知的机制,所以我们可以监听某个节点的变化,当数据发生改变时,Zookeeper 会通知我们最新的数据和状态。

因为监听是异步操作,这里需要两个线程,所以主线程需要让你睡眠等待,要不主线程退出子线程也就死了。

public void register() throws KeeperException, InterruptedException {
    byte[] data = zkClient.getData("/renfei", watchedEvent -> {
        try {
            // 此处递归循环调用,一直监听变化
            register();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, new Stat());
    System.out.println(new String(data));
}
@Test
public void registerTest() throws InterruptedException {
    try {
        register();
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    // 让主线程延时阻塞,为了可以查看register()的循环递归执行
    Thread.sleep(Long.MAX_VALUE);
}


商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.renfei.net/posts/1003480
评论与留言

以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。

微信搜一搜:任霏博客