
R2DBC
基于
Reactive Streams
反应流规范,它是一个开放的规范,为驱动程序供应商和使用方提供接口(
r2dbc-spi
),与
JDBC
的阻塞特性不同,它提供了完全反应式的非阻塞
API
与关系型数据库交互。
R2DBC
项目是支持使用反应式编程
API
访问关系型数据库的桥梁,定义统一接口规范,不同数据库厂家通过实现该规范提供驱动程序包。
R2DBC
定义了所有数据存储驱动程序必须实现的
SPI
,目前实现
R2DBC SPI
的驱动程序包括:
r2dbc-h2
:为H2
实现的驱动程序;r2dbc mariadb
:为Mariadb
实现的驱动程序;r2dbc mssql
:为Microsoft SQL Server
实现的本机驱动程序;r2dbc mysql
:为Mysql
实现的驱动程序;r2dbc postgres
:为PostgreSQL
实现的驱动程序;
r2dbc
还提供反应式连接池r2dbc-pool(https://github.com/r2dbc/r2dbc-pool)。
使用r2dbc-mysql驱动程序包与mysql数据库建立连接
使用r2dbc-pool获取数据库连接
Spring-Data-R2DBC增删改查API
事务的使用
R2DBC Repository
使用r2dbc-mysql驱动程序包与mysql数据库建立连接
r2dbc-mysql
依赖:
<!-- r2dbc mysql-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>复制
r2dbc-mysql
实现了
r2dbc
的
ConnectionFactory
SPI
接口。
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(DRIVER, "mysql")
.option(HOST, "127.0.0.1")
.option(USER, "root")
.option(PORT, 3306)
.option(PASSWORD, "")
.option(DATABASE, "r2dbc_stu")
.option(CONNECT_TIMEOUT, Duration.ofSeconds(3))
.build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);复制
Publisher<? extends Connection> connectionPublisher = connectionFactory.create(); 复制
sql
:
Mono.from(connectionPublisher)
.flatMapMany(conn -> conn.createStatement(
"insert into person (id,name,age) values ('1111','wujiuye',25)")
.execute())
.flatMap(Result::getRowsUpdated)
.switchIfEmpty(Mono.just(0))
.onErrorResume(throwable -> {
throwable.printStackTrace();
return Mono.empty();
})
.subscribe(System.out::println);复制
flatMapMany(conn -> conn.createStatement("sql").execute())
:创建Statement
执行sql
;flatMap(Result::getRowsUpdated)
:获取sql
执行影响的行数(select
语句没有该结果);switchIfEmpty
:如果insert
更新的行数为0
则会被执行;onErrorResume
:处理connection
连接异常、sql
语句执行异常;
使用r2dbc-pool获取数据库连接
r2dbc-pool
依赖
<dependencies>
<!-- r2dbc mysql -->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<!-- r2dbc-pool -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
</dependencies>复制
ConnectionFactory
创建连接池(
ConnectionPool
):
ConnectionFactory connectionFactory = ....
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMillis(1000))
.maxSize(20)
.build();
ConnectionPool pool = new ConnectionPool(configuration);复制
ConnectionPoolConfiguration
:连接池配置对象,指定连接池的大小、连接的最大空闲时间等;ConnectionPool
:连接池,也是ConnectionFactory
接口的实现类;
Mono<Connection> connectionMono = pool.create();
// 将连接释放回连接池
connectionMono.flatMapMany(Connection::close).subscribe();
// 销毁连接池
pool.dispose();复制
r2dbc
并没有定义连接池的接口,而
r2dbc-pool
通过实现
ConnectionFactory
接口巧妙的接管连接的创建,管理连接的生命周期。
spring-data-r2dbc
时,我们只需要将注册到
bean
工厂的
ConnectionFactory
替换为
ConnectionPool
即可使用连接池,例如:
@Bean
public ConnectionFactory connectionFactory(){
ConnectionFactory connectionFactory = ....
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMillis(1000))
.maxSize(20)
.build();
// ConnectionPool实现了ConnectionFactory接口,使用ConnectionFactory替换ConnectionFactory
return new ConnectionPool(configuration);
}复制
ConnectionFactory
,
spring-data-r2dbc
自动帮我们配置好了。
spring-data-r2dbc
时,像下面这样使用
DatabaseClient
执行
sql
,最终也是从连接池获取连接执行:
public class xxx{
@Resource
private ConnectionFactory connectionFactory;
@Test
public void test(){
DatabaseClient client = DatabaseClient.create(connectionFactory);
// .......
}
}复制

Spring-Data-R2DBC增删改查API
spring-data-r2dbc
可直接通过依赖它的
starter
,依赖
starter
会将所需的
jar
包也都导入到项目中:
<dependencies>
<!-- r2dbc mysql 库-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<!-- 同时也会将r2dbc-pool导入 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
</dependencies>复制
application.yml
配置文件中添加配置:
### r2dbc
spring:
r2dbc:
url: r2dbc:mysql://localhost:3306/r2dbc_stu?useUnicode=true&characterEncoding=UTF-8
username: root
password:
pool:
enabled: true
max-size: 10
initial-size: 10
validation-query: select 1复制
DatabaseClient
DatabaseClient
是
Spring Data R2DBC
提供的具有功能性的反应式非阻塞
API
,用于与数据库交互。
DatabaseClient
封装了资源的处理,例如打开和关闭连接,让我们可以更方便的执行增删改查
SQL
,而不必关心要释放连接。
DatabaseClient
由
spirng-data-r2dbc
的
org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration
类完成,我们也可以继承
AbstractR2dbcConfiguration
类,替换一些默认配置。
@Configuration
public class R2dbcConfiguration extends AbstractR2dbcConfiguration {
@Override
@Bean // 这个注解不能少
public ConnectionFactory connectionFactory() {
// ....
}
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}复制
insert api
person
表插入一条记录
client.insert()
.into("person")
.value("id", "123")
.value("name", "wujiuye")
.nullValue("age", Integer.class)
.fetch()
.rowsUpdated()
.subscribe(System.out::println);复制
into
:在xx
表中插入记录;value
:为新记录的某个字段(表的字段名)赋值;nullValue
:为字段赋空值;fetch
:执行sql
并取得响应;rowsUpdated
:获取更新的记录数,在此为插入记录数;subscribe
:订阅,触发流执行并消费最终结果。
sql
组合执行:
Mono<Void> insert1 = client.insert().into("person")
.value("id", "12345")
.value("name", "wujiuye")
.nullValue("age", Integer.class)
.then();
Mono<Void> insert2 = client.insert().into("person")
.value("id","123445555555")
.then();
insert1.then(insert2).subscribe();复制
then
:不消费任何结果,该方法返回一个Mono<Void>
,用于衔接下一步,但它不会将上一步的结果传递给下一步;subscribe()
:订阅,只触发流执行,不关心结果;
update api
person
表的
id
为
12345
的记录:
client.update().table("person")
.using(Update.update("name", "1111").set("age", 18))
.matching(Criteria.where("id").is("12345"))
.fetch()
.rowsUpdated()
.subscribe(rows -> System.out.println("更新记录数:" + rows));复制
using
:接收一个Update
对象,Update
决定更新哪些字段;matching
:接收一个Criteria
对象,设置匹配条件,即sql
的where
部分;
delete api
person
表的
name
为
1111
且
age
为
18
的记录:
client.delete().from("person")
.matching(Criteria.where("name").is("1111").and("age").is(18))
.fetch()
.rowsUpdated()
.subscribe(rows -> System.out.println("删除的记录总数为:" + rows));复制
select api
person
表
name
为
null
的记录:
Flux<Person> list = client.select().from("person")
.matching(Criteria.where("name").isNull())
.as(Person.class)
.fetch()
.all();
list.subscribe(System.out::println);复制
all
:获取所有结果;as
:将结果映射为Person
实例;
事务的使用
spring-data-r2dbc
在
DatabaseClient
中包含事务感知,允许使用
Spring
的事务管理在同一事务中对多个语句进行分组。
then
连接多条
sql
,只要有一条
sql
执行失败就回滚事务:
ReactiveTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator operator = TransactionalOperator.create(tm);
Mono<Void> atomicOperation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe") //绑定参数
.bind("name", "Joe")
.bind("age", 34)
.fetch().rowsUpdated()
.then(client.execute("INSERT INTO contacts (id, name) VALUES(:id, :name)")
.bind("id", "joe")
.bind("name", "Joe")
.fetch().rowsUpdated())
.then();
// 将atomicOperation放到事务中执行
operator.transactional(atomicOperation).subscribe();复制
ReactiveTransactionManager
:响应式编程事务管理器,R2dbcTransactionManager
是spring-data-r2dbc
提供的ReactiveTransactionManager
的一个实现;
spring-data-r2dbc
也支持基于注解的声明式事务。
spring-data-r2dbc
自动配置了
ReactiveTransactionManager
与
@EnableTransactionManagement
启动声明式事务管理,因此可直接使用,而不要添加额外配置。例如:
@Component
public class TxService {
@Resource
private DatabaseClient client;
@Transactional(rollbackFor = Throwable.class)
public Mono<Void> txInsert() {
Person person = new Person();
person.setId('12123');
return client.insert().into(Person.class)
.using(person)
.fetch().rowsUpdated()
.then(client.insert().into(Person.class)
.using(person)
.fetch().rowsUpdated()
.then());
}
}
// 测试
public class R2dbcTxTest extends SupporSpringBoot {
@Resource
private TxService txService;
@Test
public void testTx() {
txService.txInsert().doOnError(throwable -> {
System.out.println("执行失败");
throwable.printStackTrace();
}).subscribe();
}
}复制
R2DBC Repository
Spring-data-r2dbc
也实现了
spring data repository
的反应式
API
。
Repository
方法的操作。
PO
:
@Table("person")
public static class Person {
@Id
private String id;
private String name;
private int age;
}复制
Dao
(为了与领域驱动设计中的
Repository
做区分),继承
R2dbcRepository
接口:
public interface PersonDao extends R2dbcRepository<R2dbcStuMain.Person, String> {
@Modifying
@Query("insert into person (id,name,age) values(:id,:name,:age)")
Mono<Integer> insertPerson(String id, String name, Integer age);
}复制
Service
,在
Service
中创建一个事务方法,链接多次调用
PersonDao
的
insertPerson
方法。
@Service
public class PersonService {
@Resource
private PersonDao personDao;
@Transactional(rollbackFor = Throwable.class)
public Mono<Integer> addPerson(Person... persons) {
Mono<Integer> txOp = null;
for (Person person : persons) {
if (txOp == null) {
txOp = personDao.insertPerson(person.getId(), person.getName(), person.getAge());
} else {
txOp = txOp.then(personDao.insertPerson(person.getId(), person.getName(), person.getAge()));
}
}
return txOp;
}
}复制
@SpringBootApplication
@EnableR2dbcRepositories
public class R2dbcApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(R2dbcApplication.class);
PersonService personService = context.getBean(PersonService.class);
Person person = new Person();
person.setId("12347");
person.setName("wjy");
person.setAge(25);
// 测试事务方法,验证主键重复时是否还有数据插入成功
personService.addPerson(person, person)
.doOnError(Throwable::printStackTrace)
.subscribe(System.out::println);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}复制
参考文献
Spring Data R2DBC
官方文档:https://docs.spring.io/spring-data/r2dbc/docs/1.1.0.RELEASE/reference/htmlr2dbc.io
:http://r2dbc.io/
