在上一篇中我们知道了几种常用的redis client,分别分析了lettuce原生的pipeline处理方式和在使用spring data redis包装后的lettuce处理pipeline时源码细节,并知道了后者直接使用时并不是真正的pipeline操作。那么如果我既想要使用spring-data-redis来操作lettuce的pipeline,又想要真正做到pipeline该怎么处理呢?本节我们就来聊一聊这个问题。与此同时,我们会来进一步地分析下redis connection和pool的内容。
前言
我们先来了解下在spring-data-redis中是如何包装lettuce的连接的,然后会根据这些信息得到上一篇文章中留下的那个问题的解。
连接处理
会先后对r连接池、redisTemplate模式下的连接和shareNativeConnection模式下的连接处理方式进行分析。
连接池
如果想了解连接池的内容,就需要了解下LettuceConnectionFactory。我们来看一下它的属性:
private final LettuceClientConfiguration clientConfiguration;
private @Nullable AbstractRedisClient client;
private @Nullable LettuceConnectionProvider connectionProvider;
private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
private boolean validateConnection = false;
private boolean shareNativeConnection = true;
private @Nullable SharedConnection<byte[]> connection;
private @Nullable SharedConnection<ByteBuffer> reactiveConnection;
private @Nullable LettucePool pool;
/** Synchronization monitor for the shared Connection */
private final Object connectionMonitor = new Object();
private boolean convertPipelineAndTxResults = true;
private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);
private @Nullable RedisConfiguration configuration;
private @Nullable ClusterCommandExecutor clusterCommandExecutor;
复制
主要属性:
•LettuceClientConfiguration:client的配置,基于commons pool的连接池目前也是基于它;•AbstractRedisClient client:内部维持的redis client对象;•LettuceConnectionProvider connectionProvider: 连接提供者,连接池就是由它来提供•LettuceConnectionProvider reactiveConnectionProvider: reactive模式下的连接提供者•validateConnection:是否校验连接•shareNativeConnection:是否共享本地连接•SharedConnection<byte[]> connection:用于共享的连接,如果shareNativeConnection为false则此处为null•SharedConnection
主要方法:
1. 连接池
/**
* @param pool
* @deprecated since 2.0, use pooling via {@link LettucePoolingClientConfiguration}.
*/
@Deprecated
public LettuceConnectionFactory(LettucePool pool) {
this(new MutableLettuceClientConfiguration());
this.pool = pool;
}
复制
旧的连接工厂,目前不再使用了,目前使用的是根据LettucePoolingClientConfiguration的配置初始化的连接池。
LettuceConnectionFactory的创建部分见org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration#redisConnectionFactory方法:
@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources)
throws UnknownHostException {
LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources,
this.properties.getLettuce().getPool());
return createLettuceConnectionFactory(clientConfig);
}
复制
通过clientResources和配置信息来初始化LettuceClientConfiguration。
2. LettuceConnectionConfiguration#getLettuceClientConfiguration方法:
private LettuceClientConfiguration getLettuceClientConfiguration(ClientResources clientResources, Pool pool) {
LettuceClientConfigurationBuilder builder = createBuilder(pool);
applyProperties(builder);
if (StringUtils.hasText(this.properties.getUrl())) {
customizeConfigurationFromUrl(builder);
}
builder.clientResources(clientResources);
customize(builder);
return builder.build();
}
复制
这里通过LettuceClientConfigurationBuilder来构建LettuceClientConfiguration对象的,来看下它的实现:
那么这里会创建LettuceClientConfigurationBuilder对象还是LettucePoolingClientConfigurationBuilder对象呢?需要来看下createBuilder方法:
private LettuceClientConfigurationBuilder createBuilder(Pool pool) {
if (pool == null) {
return LettuceClientConfiguration.builder();
}
return new PoolBuilderFactory().createBuilder(pool);
}
复制
可以看出如果配置了spring.redis.lettuce.pool的相关信息,这里就会生成LettucePoolingClientConfigurationBuilder对象。我们来看下LettucePoolingClientConfigurationBuilder的build方法:
@Override
public LettucePoolingClientConfiguration build() {
return new DefaultLettucePoolingClientConfiguration(super.build(), poolConfig);
}
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig()
复制
这里最后生成的是DefaultLettucePoolingClientConfiguration对象,内部使用的连接池配置为commons-pool提供的GenericObjectPoolConfig。
3. 我们来看下LettuceConnectionFactory的初始化部分,.LettuceConnectionFactory#afterPropertiesSet:
public void afterPropertiesSet() {
this.client = createClient();
this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
--------省略集群模式下的---------
}
}
复制
这里省略掉集群模式下的初始化方式的分析,因为单机和集群版的流程大致相同,只是一些配置不太一样。
4. createConnectionProvider方法:
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);
if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
return new LettucePoolingConnectionProvider(connectionProvider,
(LettucePoolingClientConfiguration) this.clientConfiguration);
}
return connectionProvider;
}
复制
由于上面生成的是DefaultLettucePoolingClientConfiguration,这里最后生成的就是LettucePoolingConnectionProvider对象。也就是一个池化的对象。
接下来我们来看一看从它里面获取连接的方法org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider#getConnection:
private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap<>(32);
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
poolConfig, false);
});
try {
StatefulConnection<?, ?> connection = pool.borrowObject();
poolRef.put(connection, pool);
return connectionType.cast(connection);
} catch (Exception e) {
throw new PoolException("Could not get a resource from the pool", e);
}
}
复制
pools是用来维护Connection类型与GenericObjectPool连接池之间关系的一个map,每次获取连接时会根据连接类型获取到对应的连接池,然后从连接池中获取连接。在LettucePoolingConnectionProvider内部包装着一个StandaloneConnectionProvider类型的provider,它才是最终提供connection对象的provider,代码部分为:
ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
poolConfig, false)
复制
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getConnection
该方法的代码如下:
private @Nullable LettucePool pool;
public RedisConnection getConnection() {
if (isClusterAware()) {
return getClusterConnection();
}
LettuceConnection connection;
if (pool != null) {
connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase());
} else {
connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
}
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return connection;
}
复制
这里需要注意一点,这个pool是LettucePool对象,在上文中已经分析过,它是比较老的api中的使用的pool,在新的中使用commons-pool代替了。所以这里在新的配置环境中会走pool==null这个分支。LettuceConnection的第一个入参是StatefulConnection<byte[], byte[]> asyncSharedConn对象,也就是说getSharedConnection方法返回的结果是asyncSharedConn。这个会对LettuceConnection的getAsyncConnection方法产生影响,该方法代码如下:
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
if (isQueueing()) {
return getAsyncDedicatedConnection();
}
if (asyncSharedConn != null) {
if (asyncSharedConn instanceof StatefulRedisConnection) {
// 如果asyncSharedConn不为空,则会通过它的async方法来创建RedisClusterAsyncCommands对象
return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).async();
}
}
// 如果asyncSharedConn为空,则走getAsyncDedicatedConnection方法来创建RedisClusterAsyncCommands
return getAsyncDedicatedConnection();
}
复制
如果asyncSharedConn不为空,这里会通过asyncSharedConn的async()方法来生成RedisClusterAsyncCommands对象。
如果asyncSharedConn为空,则会调用getAsyncDedicatedConnection方法来生成RedisClusterAsyncCommands对象:
protected RedisClusterAsyncCommands<byte[], byte[]> getAsyncDedicatedConnection() {
if (asyncDedicatedConn == null) {
asyncDedicatedConn = doGetAsyncDedicatedConnection();
if (asyncDedicatedConn instanceof StatefulRedisConnection) {
((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex);
}
}
if (asyncDedicatedConn instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).async();
}
if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncDedicatedConn).async();
}
------------省略部分代码----------------
}
org.springframework.data.redis.connection.lettuce.LettuceConnection#doGetAsyncDedicatedConnection:
protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
return connectionProvider.getConnection(StatefulConnection.class);
}
复制
可以看出最终doGetAsyncDedicatedConnection方法也是通过connectionProvider对象来获取StatefulConnection类型的连接对象的。
那么,调用哪个方法会返回RedisClusterAsyncCommands对象呢?
在LettuceConnection中只有一个public的方法返回RedisClusterAsyncCommands对象的:
@Override
public RedisClusterAsyncCommands<byte[], byte[]> getNativeConnection() {
LettuceSubscription subscription = this.subscription;
return (subscription != null ? subscription.getNativeConnection().async() : getAsyncConnection());
}
复制
可以看出,它调用的实际上也是getAsyncConnection()方法。
shareNativeConnection 参数
来看一下org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getSharedConnection方法:
@Nullable
protected StatefulRedisConnection<byte[], byte[]> getSharedConnection() {
return shareNativeConnection ? (StatefulRedisConnection) getOrCreateSharedConnection().getConnection() : null;
}
复制
这里有一个很重要的参数——shareNativeConnection,如果shareNativeConnection为true,会使用getOrCreateSharedConnection().getConnection()来操作,它的第一步返回的是SharedConnection对象,然后通过getConnection()来获取native连接。我们来看下它们的方法:
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getOrCreateSharedConnection:
private SharedConnection<byte[]> getOrCreateSharedConnection() {
synchronized (this.connectionMonitor) {
if (this.connection == null) {
// 如果为空,则创建 SharedConnection
this.connection = new SharedConnection<>(connectionProvider);
}
// 如果不为空,则使用相同连接
return this.connection;
}
}
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.SharedConnection#getConnection:
@Nullable
StatefulConnection<E, E> getConnection() {
synchronized (this.connectionMonitor) {
if (this.connection == null) {
// 如果connectin为空则调用getNativeConnection方法获取连接
this.connection = getNativeConnection();
}
if (getValidateConnection()) {
// 校验连接
validateConnection();
}
// 如果内部连接已经存在,则返回相同的连接
return this.connection;
}
}
org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.SharedConnection#getNativeConnection:
/**
* Obtain a connection from the associated {@link LettuceConnectionProvider}.
*
* @return the connection.
*/
private StatefulConnection<E, E> getNativeConnection() {
try {
// 从provider中获取连接,这里也是从连接池中去获取连接的,返回的也是StatefulConnection类型的连接对象
return connectionProvider.getConnection(StatefulConnection.class);
} catch (RedisException e) {
throw new RedisConnectionFailureException("Unable to connect to Redis", e);
}
}
复制
这里主要是获取SharedConnection的步骤,最终会从connectionProvider中获取shared连接,连接为StatefulConnection类型。可以看出通过LettuceConnectionFactory#getSharedConnection方法最终获取到的连接为StatefulRedisConnection对象。
shareNativeConnection为true和false的区别
上面我们知道,当shareNativeConnection为true时会通过getOrCreateSharedConnection().getConnection()来初始化LettuceConnection的asyncSharedConn属性。它生成的是SharedConnection对象,然后通过它的getConnection方法获取具体连接的。在它们里面都有一个共同点,会先判断当连接对象为空时会创建新的连接,如果已经初始化过了,则使用已有的连接,即共享连接。
redisTemplate模式下的连接
咱们以this.redisTemplate.opsForValue().get(cacheKey)方法为切入点往下来看。
首先是org.springframework.data.redis.core.DefaultValueOperations#get(java.lang.Object)方法:
@Override
public V get(Object key) {
return execute(new ValueDeserializingRedisCallback(key) {
@Override
protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
return connection.get(rawKey);
}
}, true);
}
复制
接着往下走,继续往下看org.springframework.data.redis.core.AbstractOperations#execute方法:
@Nullable
<T> T execute(RedisCallback<T> callback, boolean b) {
return template.execute(callback, b);
}
org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback<T>, boolean):
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection) {
return execute(action, exposeConnection, false);
}
org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback<T>, boolean, boolean):
@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
RedisConnectionFactory factory = getRequiredConnectionFactory();
RedisConnection conn = null;
try {
if (enableTransactionSupport) {
// 开启事务时获取连接的方法
// only bind resources in case of potential transaction synchronization
conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
} else {
// 获取连接的方法
conn = RedisConnectionUtils.getConnection(factory);
}
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
RedisConnection connToUse = preProcessConnection(conn, existingConnection);
boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}
RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
// 执行的部分
T result = action.doInRedis(connToExpose);
// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}
// TODO: any other connection processing?
return postProcessResult(result, connToUse, existingConnection);
} finally {
// 释放连接
RedisConnectionUtils.releaseConnection(conn, factory);
}
}
复制
主要有三步操作:
1.通过RedisConnectionUtils.getConnection(factory)来获取连接,底层是通过connectionFactory.getConnection()来获取连接的;2.action.doInRedis:执行操作;3.使用RedisConnectionUtils.releaseConnection方法释放连接
spring-data-redis使用lettuce中假的pipeline的方法
看完了上面的内容,就能知道其实解决办法很简单:获取原生的lettuce连接、获取RedisClusterAsyncCommands对象,然后用原生的操作pipeline的方法来处理,继而释放连接即可。
RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
LettuceConnection connection = null;
try {
connection = (LettuceConnection) RedisConnectionUtils.getConnection(connectionFactory);
//LettuceConnection connection = (LettuceConnection)redisTemplate.getConnectionFactory().getConnection();
RedisClusterAsyncCommands<byte[], byte[]> commands = connection.getNativeConnection();
commands.setAutoFlushCommands(false);
List<RedisFuture<?>> futures = Lists.newArrayList();
for (int i = 0; i < 50; i++) {
futures.add(commands.set(("aaa-" + i).getBytes(), ("value-" + i).getBytes()));
futures.add(commands.expire(("key-" + i).getBytes(), 3600));
}
// write all commands to the transport layer
commands.flushCommands();
// synchronization example: Wait until all futures complete
boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,
futures.toArray(new RedisFuture[futures.size()]));
}finally {
if (connection != null){
RedisConnectionUtils.releaseConnection(connection,connectionFactory);
}
}
复制
上面的代码中主要包括四步:
•获取LettuceConnection对象,不管是否共享连接,底层实际上也都是从连接池中获取连接的,只是连接之间是否可以共享而已•获取RedisClusterAsyncCommands,通过connection.getNativeConnection()方法获取•操作部分:先关掉autoflush,然后将所有的操作加到commands列表中,最后直接flush出去•释放连接
后记
这里只是略显仓促地对上一篇的文章进行一个补充,如果能给你带来一些帮助,不甚荣幸!