暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SpringBoot整合Zookeeper

原创 我为啥没洁癖 2023-05-31
448

五、SpringBoot整合

Curator客户端

Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。

5.1 配置

5.1.1 pom

低版本

ZooKeeper 3.4.x 已经停止维护。最新的Curator客户端停止了3.4的支持,最近的兼容版本是4.2.0。

  1. 无需引入curator-framework,因为curator-recipes自动关联依赖引入curator-framework
  2. curator会默认引入zookeeper的jar包,需要检查版本与服务器的版本是否一致,如果不一致则需要排除引入
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <!--自定义配置处理,spring提示--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> </dependency>
复制

高版本,这里使用5.5.0版本,服务端使用最新的3.8.1

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.5.0</version> </dependency> <!--自定义配置处理--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> </dependency>
复制

5.1.2 yml

zookeeper: servers: 192.168.10.108:2181 # servers: 192.168.10.108:2181,192.168.10.109:2181,192.168.10.110:2181
复制

5.2 注册Bean

封装curator的API和使用springboot的自动注册机制适配zk。提供ZooKeeperProperty属性配置,ZooKeeperTemplate工具类。

5.2.1 ZooKeeperProperty

配置实体类

package com.xrj.springbootzookeeper.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import java.io.Serializable; @Data @ConfigurationProperties(prefix = "zookeeper") public class ZooKeeperProperty implements Serializable { private static final long serialVersionUID = -6578547029768400583L; /** * zk连接集群,多个用逗号隔开 */ private String servers; /** * 会话超时时间 */ private int sessionTimeout = 60000; /** * 连接超时时间 */ private int connectionTimeout = 15000; /** * 初始重试等待时间(毫秒) */ private int baseSleepTime = 1000; /** * 重试最大次数 */ private int maxRetries = 10; }
复制

5.2.2 ZooKeeperTemplate

ZooKeeperTemplate操作工具类,封装acurator的api

package com.xrj.springbootzookeeper.config; import com.xrj.springbootzookeeper.exception.ZooKeeperException; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.zookeeper.CreateMode; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @Slf4j public class ZooKeeperTemplate { /** * 路径分隔符 */ private static final String PATH_SEPARATOR = "/"; /** * zk连接 */ private final CuratorFramework client; public ZooKeeperTemplate(CuratorFramework client) { this.client = client; } /** * 创建空节点,默认持久节点 * * @param path 节点路径 * @param node 节点名称 * @return 完整路径 */ public String createNode(String path, String node) { return createNode(path, node, CreateMode.PERSISTENT); } /** * 创建带类型的空节点 * * @param path 节点路径 * @param node 节点名称 * @param createMode 类型 * @return 路径 */ public String createNode(String path, String node, CreateMode createMode) { path = buildPath(path, node); log.info("create node for path: {} with createMode: {}", path, createMode.name()); try { client.create() .orSetData() .creatingParentsIfNeeded() .withMode(createMode) .forPath(path); return path; } catch (Exception e) { log.error("create node for path: {} with createMode: {} failed!", path, createMode.name(), e); return null; } } /** * 创建节点,默认持久节点 * * @param path 节点路径 * @param node 节点名称 * @param value 节点值 * @return 完整路径 */ public String createNode(String path, String node, String value) { return createNode(path, node, value, CreateMode.PERSISTENT); } /** * 创建节点,默认持久节点 * * @param path 节点路径 * @param node 节点名称 * @param value 节点值 * @param createMode 节点类型 * @return 完整路径 */ public String createNode(String path, String node, String value, CreateMode createMode) { if (Objects.isNull(value)) { throw new ZooKeeperException("ZooKeeper节点值不能为空!"); } path = buildPath(path, node); log.info("create node for path: {}, value: {}, with createMode: {}", path, value, createMode.name()); try { client.create() .orSetData() .creatingParentsIfNeeded() .withMode(createMode) .forPath(path, value.getBytes()); return path; } catch (Exception e) { log.error("create node for path: {}, value: {}, with createMode: {} failed!", path, value, createMode.name(), e); } return null; } /** * 获取节点数据 * * @param path 路径 * @param node 节点名称 * @return 完整路径 */ public String get(String path, String node) { path = buildPath(path, node); try { byte[] bytes = client.getData().forPath(path); if (bytes.length > 0) { return new String(bytes); } } catch (Exception e) { log.error("get value for path: {}, node: {} failed!", path, node, e); } return null; } /** * 更新节点数据 * * @param path 节点路径 * @param node 节点名称 * @param value 更新值 * @return 完整路径 */ public String update(String path, String node, String value) { if (Objects.isNull(value)) { throw new ZooKeeperException("ZooKeeper节点值不能为空!"); } path = buildPath(path, node); log.info("update path: {} to value: {}", path, value); try { client.setData().forPath(path, value.getBytes()); return path; } catch (Exception e) { log.error("update path: {} to value: {} failed!", path, value); } return null; } /** * 删除节点,并且递归删除子节点 * * @param path 路径 * @param node 节点名称 * @return 路径 */ public boolean delete(String path, String node) { path = buildPath(path, node); log.info("delete node for path: {}", path); try { client.delete().quietly().deletingChildrenIfNeeded().forPath(path); return true; } catch (Exception e) { log.error("delete node for path: {} failed!", path); } return false; } /** * 获取子节点 * * @param path 节点路径 * @return */ public List<String> getChildren(String path) { if (StringUtils.hasText(path)) { return null; } if (!path.startsWith(PATH_SEPARATOR)) { path = PATH_SEPARATOR + path; } try { return client.getChildren().forPath(path); } catch (Exception e) { log.info("获取{}子节点失败!", path, e); } return null; } /** * 判断节点是否存在 * * @param path 路径 * @param node 节点名称 * @return 结果 */ public boolean exists(String path, String node) { try { List<String> list = getChildren(path); return !CollectionUtils.isEmpty(list) && list.contains(node); } catch (Exception e) { return false; } } /** * 申请锁阻塞执行 * * @param path 加锁zk节点 * @param runnable 执行方法 */ public void lock(String path, Runnable runnable) { InterProcessMutex lock = new InterProcessMutex(client, path); try { lock.acquire(); try { //可以使用线程池处理提升性能 runnable.run(); } finally { lock.release(); } } catch (Exception e) { log.error("获取锁失败: {}!", path); } } /** * 申请锁,指定请求等待时间,无返回值 * * @param path 加锁zk节点 * @param time 时间 * @param unit 时间单位 * @param runnable 执行方法 */ public void lock(String path, long time, TimeUnit unit, Runnable runnable) { try { InterProcessMutex lock = new InterProcessMutex(client, path); if (lock.acquire(time, unit)) { try { runnable.run(); } finally { lock.release(); } } else { log.error("获取锁超时:{}!", path); } } catch (Exception e) { log.error("获取锁失败: {}!", path); } } /** * 申请锁,指定请求等待时间,有返回值 * * @param path 加锁zk节点 * @param time 时间 * @param unit 时间单位 * @param callable 执行方法 * @return 方法返回结果 */ public <V> V lock(String path, long time, TimeUnit unit, Callable<V> callable) { try { InterProcessMutex lock = new InterProcessMutex(client, path); if (lock.acquire(time, unit)) { try { return callable.call(); } finally { lock.release(); } } else { log.error("获取锁超时:{}!", path); } } catch (Exception e) { log.error("获取锁失败: {}!", path); } return null; } /** * 对一个节点进行监听,监听事件包括指定的路径节点的增、删、改的操作 * * @param path 节点路径 * @param listener 回调方法 */ public void watchNode(String path, NodeCacheListener listener) { try (CuratorCache curatorCache = CuratorCache.builder(client, path).build()) { curatorCache.listenable().addListener(CuratorCacheListener.builder().forNodeCache(listener).build()); curatorCache.start(); } } /** * 对指定的路径节点的一级子目录进行监听,不对该节点的操作进行监听,对其子目录的节点进行增、删、改的操作监听 * * @param path 节点路径 * @param listener 回调方法 */ public void watchChildren(String path, PathChildrenCacheListener listener) { try { PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true); pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL); pathChildrenCache.getListenable().addListener(listener); } catch (Exception e) { log.error("watch children failed for path: {}", path, e); } } /** * 将指定的路径节点作为根节点(祖先节点),对其所有的子节点操作进行监听,呈现树形目录的监听,可以设置监听深度,最大监听深度为2147483647(int类型的最大值) * * @param path 节点路径 * @param maxDepth 回调方法 * @param listener 监听 */ public void watchTree(String path, int maxDepth, TreeCacheListener listener) { try { TreeCache treeCache = TreeCache.newBuilder(client, path).setMaxDepth(maxDepth).build(); treeCache.start(); treeCache.getListenable().addListener(listener); } catch (Exception e) { log.error("watch tree failed for path: {}", path, e); } } public String buildPath(String path, String node) { if (StringUtils.hasText(path) || StringUtils.hasText(node)) { throw new ZooKeeperException("ZooKeeper路径或者节点名称不能为空!"); } if (!path.startsWith(PATH_SEPARATOR)) { path = PATH_SEPARATOR + path; } if (PATH_SEPARATOR.equals(path)) { return path + node; } else { return path + PATH_SEPARATOR + node; } } }
复制

5.2.3 ZooKeeperAutoConfiguration

SpringBoot自动注册类

package com.xrj.springbootzookeeper.config; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConditionalOnProperty(prefix = "zookeeper", name = "servers") @EnableConfigurationProperties(value = ZooKeeperProperty.class) @Slf4j public class ZooKeeperAutoConfiguration { /** * 初始化连接以及重试 * * @param zooKeeperProperty 配置属性 * @return 连接 */ @Bean @ConditionalOnMissingBean public CuratorFramework curatorFramework(ZooKeeperProperty zooKeeperProperty) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(zooKeeperProperty.getBaseSleepTime(), zooKeeperProperty.getMaxRetries()); return CuratorFrameworkFactory.builder() .connectString(zooKeeperProperty.getServers()) .connectionTimeoutMs(zooKeeperProperty.getConnectionTimeout()) .sessionTimeoutMs(zooKeeperProperty.getSessionTimeout()) .retryPolicy(retryPolicy) .build(); } @Bean @ConditionalOnMissingBean public ZooKeeperTemplate zooKeeperTemplate(CuratorFramework client) { return new ZooKeeperTemplate(client); } }
复制

5.2.4 spring.factories

注册META-INF/spring.factories文件,让springboot自动注册注入配置。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.xrj.springbootzookeeper.config.ZooKeeperAutoConfiguration
复制

5.3 Lock测试

5.3.1 可重入锁测试

模拟50个线程竞争将num值进行递增。

@RestController @Slf4j public class TestController { @Autowired private ZooKeeperTemplate zooKeeperTemplate; private final AtomicInteger num = new AtomicInteger(0); @GetMapping("/testLock") public void test() { log.info("进入testLock"); for (int i = 0; i < 50; i++) { new Thread(() -> { zooKeeperTemplate.lock("/lock", () -> { ThreadUtil.sleep(1000); int addThen = num.addAndGet(1); log.info("num+1,结果:{}", addThen); }); }).start(); } } }
复制
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论