Apache Curator是ApacheZooKeeper的高级Java客户端。它提供了许多有用的分布式系统解决方案,包括Elections、Locks、Barriers、Counters、Caches、Nodes/Watches、Queues。所有依赖Zookeeper的Apache项目,都使用Apache Curator,如:Hadoop、Flink、HBase等等。之前已经发文介绍基本操作、事务操作;本文介绍Cache实战案例,原理是基于Zookeeper节点Watch监听机制实现。完整的实战代码见文末我的github地址。
Curator与Zookeeper版本存在兼容关系:对于Zookeeper3.6.x之前版本,Curator提供的Cache分为Path Cache、NodeCache、Tree Cache,三者隶属于三个独立的类。对于之后的版本,统一使用一个工具类CuratorCache实现。就是说,Zookeeper3.6.x之后版本,Path Cache、Node Cache、Tree Cache均标记为废弃,建议使用Curator Cache。
Curator Cache - A utility that attempts to keep the data from a node locally cached. Optionally the entire tree of children below the node can also be cached. Will respond to update/create/delete events, pull down the data, etc. You can register listeners that will get notified when changes occur.Path Cache - (For preZooKeeper 3.6.x) A Path Cache is used to watch a ZNode. Whenever a child is added, updated or removed, the Path Cache will change its state to contain the current set of children, the children's data and the children's state. Path caches in the Curator Framework are provided by the PathChildrenCache class. Changes to the path are passed to registered PathChildrenCacheListener instances.Node Cache - (For preZooKeeper 3.6.x) A utility that attempts to keep the data from a node locally cached. This class will watch the node, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.Tree Cache - (For preZooKeeper 3.6.x) A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.
NodeCache类使用方法:
//NodeCache:监听数据节点的变化final NodeCache nodeCache = new NodeCache(cfClient, "/zookeeper", false);nodeCache.start(true);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {String newData = new String(nodeCache.getCurrentData().getData());System.out.println("new Data: " + newData);//将新数据更新到数据缓存nodeCacheData = newData;countDownLatch.countDown();}}, executorService);
PathCache类使用方法:
//PathChildrenCache:监听子节点的变化final PathChildrenCache pathChildrenCache = new PathChildrenCache(cfClient, "/zookeeper", true);//Mode1:POST_INITIALIZED_EVENT:获取已有Path信息,建议使用pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//Mode2: NORMAL:获取已有Path信息,较POST_INITIALIZED_EVENT,少了INITIALIZED事件//pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);//Mode3:BUILD_INITIAL_CACHE:不获取已有Path信息,仅捕获后续发生的变更事件//pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("==INITIALIZED: " + event.getInitialData());break;case CHILD_ADDED:System.out.println("==CHILD_ADDED: " + event.getData());break;case CHILD_REMOVED:System.out.println("==CHILD_REMOVED: " + event.getData());break;case CHILD_UPDATED:System.out.println("==CHILD_UPDATED: " + event.getData());break;case CONNECTION_LOST:System.out.println("==CONNECTION_LOST: " + event.getData());break;case CONNECTION_SUSPENDED:System.out.println("==CONNECTION_SUSPENDED: " + event.getData());break;case CONNECTION_RECONNECTED:System.out.println("==CONNECTION_RECONNECTED: " + event.getData());break;default:break;}System.out.println("currentData: " + countDownLatch.getCount());countDownLatch.countDown();}}, executorService);
TreeCache类使用方法:
//TreeCache:监听节点树的变化final TreeCache treeCache = TreeCache.newBuilder(cfClient, "/").setMaxDepth(5).setCacheData(true).setCreateParentNodes(true).setDataIsCompressed(false).setExecutor(executorService)/*.setSelector(new TreeCacheSelector() {@Overridepublic boolean traverseChildren(String fullPath) {return false;}@Overridepublic boolean acceptChild(String fullPath) {return false;}})*/.build();treeCache.start();treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("==INITIALIZED: " + event.getOldData());break;case NODE_ADDED:System.out.println("==CHILD_ADDED: " + event.getData());break;case NODE_REMOVED:System.out.println("==CHILD_REMOVED: " + event.getData());break;case NODE_UPDATED:System.out.println("==CHILD_UPDATED: " + event.getData());break;case CONNECTION_LOST:System.out.println("==CONNECTION_LOST: " + event.getData());break;case CONNECTION_SUSPENDED:System.out.println("==CONNECTION_SUSPENDED: " + event.getData());break;case CONNECTION_RECONNECTED:System.out.println("==CONNECTION_RECONNECTED: " + event.getData());break;default:break;}System.out.println("currentData: " + countDownLatch.getCount());countDownLatch.countDown();}});
CuratorCache类使用方法:
CuratorCache curatorCache = CuratorCache.builder(cfClient, "/")/*.withOptions(CuratorCache.Options.SINGLE_NODE_CACHE)*//*.withOptions(CuratorCache.Options.COMPRESSED_DATA)*//*.withOptions(CuratorCache.Options.DO_NOT_CLEAR_ON_CLOSE)*/.build();curatorCache.listenable().addListener(new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData oldData, ChildData data) {switch (type.name()) {case "NODE_CREATED":System.out.println("NODE_CREATED: " + oldData + " : " + data);break;case "NODE_CHANGED":System.out.println("NODE_CHANGED: " + oldData + " : " + data);break;case "NODE_DELETED":System.out.println("NODE_DELETED: " + oldData + " : " + data);break;default:break;}System.out.println("currentData: " + countDownLatch.getCount());countDownLatch.countDown();}}, executorService);
完整代码地址:
https://github.com/felixzh2020/felixzh-learning-java/tree/master/ZookeeperCase/src/main/java/c_apache_curator
文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




