先聊聊什么是分布式事务?


什么是Seata?
官网http://seata.io/zh-cn/

利用Seata如何理分布式事务

(1)全局事务的回滚是如何实现的呢?
(2)RM 是怎么自动和 TC 交互的?
(3)二阶段回滚失败怎么办?
事务协调器 TC
维护全局和分支事务的状态,指示全局提交或者回滚。
事务管理者 TM
开启、提交或者回滚一个全局事务。
资源管理者 RM
管理执行分支事务的那些资源,向TC注册分支事务、上报分支事务状态、控制分支事务的提交或者回滚。
Seata工作流程

TM 请求 TC,开始一个新的全局事务,TC 会为这个全局事务生成一个 XID。
XID 通过微服务的调用链传递到其他微服务。
RM 把本地事务作为这个XID的分支事务注册到TC。
TM 请求 TC 对这个 XID 进行提交或回滚。
TC 指挥这个 XID 下面的所有分支事务进行提交、回滚。
Seata环境配置
1.下载seata-server-1.4.2和seata-1.4.2源码
seate-server下载: https://seata.io/zh-cn/blog/download.html
seata-1.4.2源码下载: https://github.com/seata/seata/releases
2.创建undo_log日志表
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
在你项目参与全局事务的数据库中加入undo_log这张表。undo_log表脚本根据自身数据库类型来选择。
3.创建seata事务相关表
创建seata数据库,在seata1.4.2源码seata-1.4.2\script\server\db中运行mysql.sql脚本
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
4.修改seata-server的registry.conf和file.conf

配置注册中心为nacos,配置nacos相关参数
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = "public"
cluster = "default"
username = "nacos"
password = "nacos"
}
配置file.config的DB模式相关参数配置
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "db"
## rsa decryption public key
publicKey = ""
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"
user = "root"
password = "root8888"
minConn = 5
maxConn = 100
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
5.修改提交nacos脚本到nacos控制台
service.vgroupMapping.my_test_tx_group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
config.txt提供的脚本是seata-server服务自身所需要的配置文件信息,我们把相关脚本提交到nacos中,然后方便在nacos控制台上进行修改变更了。
运行仓库:https://github.com/seata/seata/tree/develop/script/config-center/nacos 中提供的nacos脚本nacos-config.sh,将以上信息提交到nacos控制台,如果有需要修改参数,可直接通过登录nacos控制台修改。
6.application.yml配置
#Seata分布式事务配置(AT模式)
seata:
enabled: true
application-id: ${spring.application.name}
#客户端和服务端在同一个事务组
tx-service-group: my_test_tx_group
enable-auto-data-source-proxy: true
service:
vgroup-mapping:
my_test_tx_group: default
config:
type: nacos
nacos:
namespace: "public"
serverAddr: 127.0.0.1:8848
group: SEATA_GROUP
username: "nacos"
password: "nacos"
#服务注册到nacos
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace: "public"
username: "nacos"
password: "nacos"
cluster: default
启动运行seata-server,成功后,运行自己的服务提供者,服务调用者。在全局事务调用者(发起全局事务的服务)的接口上加入@GlobalTransactional注解。通过nacos,我们能看到seata成功注册。

到此,我们的seata服务算是配置完了,并正常启动。
Seata入门案例
1.项目结构
|-- cloud-seata (父级工程)
|-- cloud-api (基础工程)
|-- cloud-storage (库存服务)
|-- cloud-order (订单服务)
|-- cloud-account (账户服务)
|-- cloud-business(业务服务)
2.seata主要依赖
<!-- seata依赖 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>${cloud.version}</version>
</dependency>
3.cloud-api提供的对外接口
//订单服务
public interface OrderService {
/**
* 创建订单
*/
Order create(String userId, String commodityCode, int orderCount);
}
//账户服务
public interface AccountService {
/**
* 从用户账户中借出
*/
void debit(String userId, int money);
}
//库存服务
public interface StorageService {
/**
* 扣除存储数量
*/
void deduct(String commodityCode, int count);
}
4.cloud-order服务中order-serviceImpl
@DubboService
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@DubboReference
private AccountService accountService;
@Override
public Order create(String userId, String commodityCode, int orderCount) {
int orderMoney = calculate(commodityCode, orderCount);
//账户减少余额
accountService.debit(userId, orderMoney);
Order order = new Order();
order.setUserId(userId)
.setCommodityCode(commodityCode)
.setOrderCount(orderCount)
.setMoney(orderMoney);
//创建订单
orderMapper.insert(order);
return order;
}
private int calculate(String commodityCode, int orderCount) {
//getPriceByCommodityCode(commodityCode);//查询商品服务得到商品单价
int price = 10;
return price*orderCount;
}
}
5.cloud-business服务businessService
@Service
public class BusinessService {
@DubboReference
private OrderService orderService;
@DubboReference
private StorageService storageService;
/**
* 购买
* @param userId
* @param commodityCode
* @param orderCount
*/
@GlobalTransactional(rollbackFor = Exception.class)
public void purchase(String userId, String commodityCode, int orderCount){
//扣减库存
storageService.deduct(commodityCode,orderCount);
//创建订单
orderService.create(userId,commodityCode,orderCount);
}
}
5.新建三张数据库表分别为account.account,storage.storage,order.order

6.businessController
@GetMapping("/purchase")
public String purchase(){
businessService.purchase("001","001",10);
return "下单成功"+"商品【001】x 10";
}
7.postman测试,查看数据库


可以看到订单库新增一条订单,库存表001商品库存数量减10,账户表001用户余额减少10*10=100。
7.增加异常测试,查看数据库(由于 Seata 删除回滚日志的速度很快,所以要想在表中看见回滚日志,必须要在某一个服务上打断点才看的更明显。)


回滚记录
可以看到发生异常后

数据还是没有发生变化,放行断点,再去查看seata回滚日志,已经被seata删除了。
更多seata学习样例请参考https://github.com/seata/seata-samples
喜欢就加个关注吧,


往期精选




