五、SpringBoot整合
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。
5.1 配置
5.1.1 pom
低版本
ZooKeeper 3.4.x 已经停止维护。最新的Curator客户端停止了3.4的支持,最近的兼容版本是4.2.0。
- 无需引入curator-framework,因为curator-recipes自动关联依赖引入curator-framework
- curator会默认引入zookeeper的jar包,需要检查版本与服务器的版本是否一致,如果不一致则需要排除引入
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--自定义配置处理,spring提示-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
复制
高版本,这里使用5.5.0版本,服务端使用最新的3.8.1
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.5.0</version>
</dependency>
<!--自定义配置处理-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
复制
5.1.2 yml
zookeeper:
servers: 192.168.10.108:2181
# servers: 192.168.10.108:2181,192.168.10.109:2181,192.168.10.110:2181
复制
5.2 注册Bean
封装curator的API和使用springboot的自动注册机制适配zk。提供ZooKeeperProperty属性配置,ZooKeeperTemplate工具类。
5.2.1 ZooKeeperProperty
配置实体类
package com.xrj.springbootzookeeper.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.io.Serializable;
@Data
@ConfigurationProperties(prefix = "zookeeper")
public class ZooKeeperProperty implements Serializable {
private static final long serialVersionUID = -6578547029768400583L;
/**
* zk连接集群,多个用逗号隔开
*/
private String servers;
/**
* 会话超时时间
*/
private int sessionTimeout = 60000;
/**
* 连接超时时间
*/
private int connectionTimeout = 15000;
/**
* 初始重试等待时间(毫秒)
*/
private int baseSleepTime = 1000;
/**
* 重试最大次数
*/
private int maxRetries = 10;
}
复制
5.2.2 ZooKeeperTemplate
ZooKeeperTemplate操作工具类,封装acurator的api
package com.xrj.springbootzookeeper.config;
import com.xrj.springbootzookeeper.exception.ZooKeeperException;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.zookeeper.CreateMode;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ZooKeeperTemplate {
/**
* 路径分隔符
*/
private static final String PATH_SEPARATOR = "/";
/**
* zk连接
*/
private final CuratorFramework client;
public ZooKeeperTemplate(CuratorFramework client) {
this.client = client;
}
/**
* 创建空节点,默认持久节点
*
* @param path 节点路径
* @param node 节点名称
* @return 完整路径
*/
public String createNode(String path, String node) {
return createNode(path, node, CreateMode.PERSISTENT);
}
/**
* 创建带类型的空节点
*
* @param path 节点路径
* @param node 节点名称
* @param createMode 类型
* @return 路径
*/
public String createNode(String path, String node, CreateMode createMode) {
path = buildPath(path, node);
log.info("create node for path: {} with createMode: {}", path, createMode.name());
try {
client.create()
.orSetData()
.creatingParentsIfNeeded()
.withMode(createMode)
.forPath(path);
return path;
} catch (Exception e) {
log.error("create node for path: {} with createMode: {} failed!", path, createMode.name(), e);
return null;
}
}
/**
* 创建节点,默认持久节点
*
* @param path 节点路径
* @param node 节点名称
* @param value 节点值
* @return 完整路径
*/
public String createNode(String path, String node, String value) {
return createNode(path, node, value, CreateMode.PERSISTENT);
}
/**
* 创建节点,默认持久节点
*
* @param path 节点路径
* @param node 节点名称
* @param value 节点值
* @param createMode 节点类型
* @return 完整路径
*/
public String createNode(String path, String node, String value, CreateMode createMode) {
if (Objects.isNull(value)) {
throw new ZooKeeperException("ZooKeeper节点值不能为空!");
}
path = buildPath(path, node);
log.info("create node for path: {}, value: {}, with createMode: {}", path, value, createMode.name());
try {
client.create()
.orSetData()
.creatingParentsIfNeeded()
.withMode(createMode)
.forPath(path, value.getBytes());
return path;
} catch (Exception e) {
log.error("create node for path: {}, value: {}, with createMode: {} failed!", path, value, createMode.name(), e);
}
return null;
}
/**
* 获取节点数据
*
* @param path 路径
* @param node 节点名称
* @return 完整路径
*/
public String get(String path, String node) {
path = buildPath(path, node);
try {
byte[] bytes = client.getData().forPath(path);
if (bytes.length > 0) {
return new String(bytes);
}
} catch (Exception e) {
log.error("get value for path: {}, node: {} failed!", path, node, e);
}
return null;
}
/**
* 更新节点数据
*
* @param path 节点路径
* @param node 节点名称
* @param value 更新值
* @return 完整路径
*/
public String update(String path, String node, String value) {
if (Objects.isNull(value)) {
throw new ZooKeeperException("ZooKeeper节点值不能为空!");
}
path = buildPath(path, node);
log.info("update path: {} to value: {}", path, value);
try {
client.setData().forPath(path, value.getBytes());
return path;
} catch (Exception e) {
log.error("update path: {} to value: {} failed!", path, value);
}
return null;
}
/**
* 删除节点,并且递归删除子节点
*
* @param path 路径
* @param node 节点名称
* @return 路径
*/
public boolean delete(String path, String node) {
path = buildPath(path, node);
log.info("delete node for path: {}", path);
try {
client.delete().quietly().deletingChildrenIfNeeded().forPath(path);
return true;
} catch (Exception e) {
log.error("delete node for path: {} failed!", path);
}
return false;
}
/**
* 获取子节点
*
* @param path 节点路径
* @return
*/
public List<String> getChildren(String path) {
if (StringUtils.hasText(path)) {
return null;
}
if (!path.startsWith(PATH_SEPARATOR)) {
path = PATH_SEPARATOR + path;
}
try {
return client.getChildren().forPath(path);
} catch (Exception e) {
log.info("获取{}子节点失败!", path, e);
}
return null;
}
/**
* 判断节点是否存在
*
* @param path 路径
* @param node 节点名称
* @return 结果
*/
public boolean exists(String path, String node) {
try {
List<String> list = getChildren(path);
return !CollectionUtils.isEmpty(list) && list.contains(node);
} catch (Exception e) {
return false;
}
}
/**
* 申请锁阻塞执行
*
* @param path 加锁zk节点
* @param runnable 执行方法
*/
public void lock(String path, Runnable runnable) {
InterProcessMutex lock = new InterProcessMutex(client, path);
try {
lock.acquire();
try {
//可以使用线程池处理提升性能
runnable.run();
} finally {
lock.release();
}
} catch (Exception e) {
log.error("获取锁失败: {}!", path);
}
}
/**
* 申请锁,指定请求等待时间,无返回值
*
* @param path 加锁zk节点
* @param time 时间
* @param unit 时间单位
* @param runnable 执行方法
*/
public void lock(String path, long time, TimeUnit unit, Runnable runnable) {
try {
InterProcessMutex lock = new InterProcessMutex(client, path);
if (lock.acquire(time, unit)) {
try {
runnable.run();
} finally {
lock.release();
}
} else {
log.error("获取锁超时:{}!", path);
}
} catch (Exception e) {
log.error("获取锁失败: {}!", path);
}
}
/**
* 申请锁,指定请求等待时间,有返回值
*
* @param path 加锁zk节点
* @param time 时间
* @param unit 时间单位
* @param callable 执行方法
* @return 方法返回结果
*/
public <V> V lock(String path, long time, TimeUnit unit, Callable<V> callable) {
try {
InterProcessMutex lock = new InterProcessMutex(client, path);
if (lock.acquire(time, unit)) {
try {
return callable.call();
} finally {
lock.release();
}
} else {
log.error("获取锁超时:{}!", path);
}
} catch (Exception e) {
log.error("获取锁失败: {}!", path);
}
return null;
}
/**
* 对一个节点进行监听,监听事件包括指定的路径节点的增、删、改的操作
*
* @param path 节点路径
* @param listener 回调方法
*/
public void watchNode(String path, NodeCacheListener listener) {
try (CuratorCache curatorCache = CuratorCache.builder(client, path).build()) {
curatorCache.listenable().addListener(CuratorCacheListener.builder().forNodeCache(listener).build());
curatorCache.start();
}
}
/**
* 对指定的路径节点的一级子目录进行监听,不对该节点的操作进行监听,对其子目录的节点进行增、删、改的操作监听
*
* @param path 节点路径
* @param listener 回调方法
*/
public void watchChildren(String path, PathChildrenCacheListener listener) {
try {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
pathChildrenCache.getListenable().addListener(listener);
} catch (Exception e) {
log.error("watch children failed for path: {}", path, e);
}
}
/**
* 将指定的路径节点作为根节点(祖先节点),对其所有的子节点操作进行监听,呈现树形目录的监听,可以设置监听深度,最大监听深度为2147483647(int类型的最大值)
*
* @param path 节点路径
* @param maxDepth 回调方法
* @param listener 监听
*/
public void watchTree(String path, int maxDepth, TreeCacheListener listener) {
try {
TreeCache treeCache = TreeCache.newBuilder(client, path).setMaxDepth(maxDepth).build();
treeCache.start();
treeCache.getListenable().addListener(listener);
} catch (Exception e) {
log.error("watch tree failed for path: {}", path, e);
}
}
public String buildPath(String path, String node) {
if (StringUtils.hasText(path) || StringUtils.hasText(node)) {
throw new ZooKeeperException("ZooKeeper路径或者节点名称不能为空!");
}
if (!path.startsWith(PATH_SEPARATOR)) {
path = PATH_SEPARATOR + path;
}
if (PATH_SEPARATOR.equals(path)) {
return path + node;
} else {
return path + PATH_SEPARATOR + node;
}
}
}
复制
5.2.3 ZooKeeperAutoConfiguration
SpringBoot自动注册类
package com.xrj.springbootzookeeper.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(prefix = "zookeeper", name = "servers")
@EnableConfigurationProperties(value = ZooKeeperProperty.class)
@Slf4j
public class ZooKeeperAutoConfiguration {
/**
* 初始化连接以及重试
*
* @param zooKeeperProperty 配置属性
* @return 连接
*/
@Bean
@ConditionalOnMissingBean
public CuratorFramework curatorFramework(ZooKeeperProperty zooKeeperProperty) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zooKeeperProperty.getBaseSleepTime(), zooKeeperProperty.getMaxRetries());
return CuratorFrameworkFactory.builder()
.connectString(zooKeeperProperty.getServers())
.connectionTimeoutMs(zooKeeperProperty.getConnectionTimeout())
.sessionTimeoutMs(zooKeeperProperty.getSessionTimeout())
.retryPolicy(retryPolicy)
.build();
}
@Bean
@ConditionalOnMissingBean
public ZooKeeperTemplate zooKeeperTemplate(CuratorFramework client) {
return new ZooKeeperTemplate(client);
}
}
复制
5.2.4 spring.factories
注册META-INF/spring.factories
文件,让springboot自动注册注入配置。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.xrj.springbootzookeeper.config.ZooKeeperAutoConfiguration
复制
5.3 Lock测试
5.3.1 可重入锁测试
模拟50个线程竞争将num值进行递增。
@RestController
@Slf4j
public class TestController {
@Autowired
private ZooKeeperTemplate zooKeeperTemplate;
private final AtomicInteger num = new AtomicInteger(0);
@GetMapping("/testLock")
public void test() {
log.info("进入testLock");
for (int i = 0; i < 50; i++) {
new Thread(() -> {
zooKeeperTemplate.lock("/lock", () -> {
ThreadUtil.sleep(1000);
int addThen = num.addAndGet(1);
log.info("num+1,结果:{}", addThen);
});
}).start();
}
}
}
复制
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。