Seata–微服务分布式事务组件
事务指的是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作都成功,要么所有的操作都被撤销。
本地事务是指基于关系型数据库的事务,也称为传统事务。大多数场景下,我们的应用都只需要提供单一的数据库,这种情况下的事务称之为本地事务。本地事务的ACID特性是数据库直接提供。
使用@Transational声明事务
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
比如:
下面的两种情况,一种是同一个事务用到两个数据库,一个是同一个事务中的操作在不同的微服务上,可能使用同一个数据库,也可能用两个数据库。
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。其中AT是阿里首推的模式。
官网:https://seata.io/zh-cn/
两阶段提交又称2PC(two-phase commit protocol),2pc是一个非常经典的强一致、中心化的原子提交协议。这里所说的中心化是指协议中有两类节点:一个是中心化协调者节点(coordinator)和N个参与者节点(partcipant)。
阶段1:预处理阶段/请求阶段
1.询问 协调者向所有参与者发送事务请求,询问是否可以执行事务操作,然后等待各个参与者的响应。
2 执行 各个参与者接收到协调者事务请求后,执行事务操作,并将Undo和Redo信息记录到事务日志中。
3 响应 如果参与者成功执行了事务并写入了Undo和Redo信息,则向协调者返回YES响应,否则返回NO响应。当然,参与者也可能宕机,从而不能返回响应。
阶段2:提交回滚阶段/执行阶段
1.Commit/rollback请求, 在该阶段,协调者将基于第一个阶段的投票结果进行决策:提交或取消。 当且仅当所有的参与者同意提交事务协调者才通知所有的参与者提交事务,否则协调者将通知所有的参与者取消事务。
2 事务提交 参与者收到Commit或者rollback请求,执行事务提交或回滚操作,完成后释放事务执行期占用的所有资源。
3 反馈结果 参与者执行事务提交后向协调者发送Ask响应
4 完成事务 协调者接收到所有参与者的Ack响应后,完成事务提交。
2pc会遇到的问题:
1.同步阻塞 在两阶段提交的执行过程中,所有的参与者操作的逻辑都是处于阻塞状态,各个参与者在等待其他参与者响应的过程中,将无法进行其他任何操作。coordinator如果在发起提议后宕机,那么participant将进入阻塞(block)状态、一直等待coordinator回应以完成该次决议。
2.单点问题。协调者会有单点问题
3.数据不一致。网络原因或者其他原因会导致部分commit部分没有执行commit,产生数据不一致。
4.太过保守。就是说在两阶段中,任意一个节点的失败都会导致整个事物失败。
三阶段提交协议在协调者和参与者中都引入超时机制,并且把两阶段提交协议的第一个阶段拆分成了两步:询问,然后再锁资源,最后真正提交。形成canCOmmit、PreCommit、和doCommit三个阶段组成的事物处理协议。
seata提供了4种事务模式(AT、TCC、Saga、XA )的分布式事务实现。
其中AT模式是阿里首推的模式。
AT(Auto Transaction)模式是一种对业务无侵入的分布式事务解决方案。是一种改进后的二阶段提交。
在AT模式下,用户只需关注自己的“业务SQL”,用户的“业务SQL”作为一阶段,Seata框架会自动生成事务的二阶段进行提交和回滚操作。
AT模式如何做到对业务的无侵入:
一阶段:
在第一阶段,Seata会拦截"业务SQL",首先解析SQL语义,找到”业务SQL“要更新的业务数据,在业务数据被更新前,将其保存成”before image“,然后执行”业务SQL"更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行所。以上操作全部在一个数据库事务中完成,这就保证了一阶段操作的原子性。
二阶段:
二阶段如果是提交的话,因为“业务SQL”在一阶段已经提交至数据库,所以Seata框架只需将一阶段保存的快照数据和行所删除,完成数据清理即可。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ENxNCHHG-1672223815680)(pic/6.png)]
二阶段如果是回滚的话,Seata就需要回滚一阶段已经执行的业务SQL,还原业务数据。回滚方式便是用"before image"还原业务数据;但在还原前要首先要校验脏写,对比"数据库当前业务数据"和"after image",如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。
TCC 与 Seata AT 事务一样都是两阶段事务,它与 AT 事务的主要区别为:
- TCC 对业务代码侵入严重
每个阶段的数据操作都要自己进行编码来实现,事务框架无法自动处理。 - TCC效率更高
不必对数据加全局锁,允许多个事务同时操作数据。
第一阶段 Try
以账户服务为例,当下订单时要扣减用户账户金额:
假如用户购买 100 元商品,要扣减 100 元。
TCC 事务首先对这100元的扣减金额进行预留,或者说是先冻结这100元:
第二阶段 Confirm
如果第一阶段能够顺利完成,那么说明“扣减金额”业务(分支事务)最终肯定是可以成功的。当全局事务提交时, TC会控制当前分支事务进行提交,如果提交失败,TC 会反复尝试,直到提交成功为止。
当全局事务提交时,就可以使用冻结的金额来最终实现业务数据操作。
第二阶段 Cancel
如果全局事务回滚,就把冻结的金额进行解冻,恢复到以前的状态,TC 会控制当前分支事务回滚,如果回滚失败,TC 会反复尝试,直到回滚完成为止。
TC 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
其中,TC为单独部署的Server服务器,TM和RM为嵌入到应用中的Client客户端。
db+nacos的方式部署高可用集群模式
Server端:存储模式(store.mode)支持三种:
file: 单机模式,全局事务会话信息内存中读写并持久化本地文件root.data,性能较高(默认)
db:高可用模式,全局事务会话信息通过db共享,相应性能差些
redis: Seata-Server 1.3及以上版本支持,性能较高,存在事务信息丢失风险,请提前配置适合当前场景的redis持久化配置。
我们这里配置db模式。
版本关系
下载地址:
https://github.com/seata/seata/releases
设置mode为db
设置数据库连接信息
找到数据库文件
https://seata.io/zh-cn/docs/ops/deploy-guide-beginner.html
复制Sql信息到数据库里执行
整合Nacos作为Seata的注册配置中心。
下载seata源码包
将里面的script文件夹复制到seata目录下
修改config-center里面的config.txt文件
先安装git工具,下载地址:https://git-scm.com/download/win
然后启动nacos服务器
再双击执行nacos-config.sh文件
执行完以后,就可以看到nacos服务器上多了很多配置文件
windows下双击执行 seata-server.bat
pom文件:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2020.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2021.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/db_orders?characterEncoding=utf8&serverTimezone=GMT%2B8
username: root
password: root
cloud:
nacos:
discovery:
server-addr: http://localhost:8848
loadbalancer:
ribbon:
enabled: false
application:
name: ORDER
mybatis:
type-aliases-package: com.test.pojo
server:
port: 8081
@SpringBootApplication
@MapperScan(basePackages = "com.test.mapper")
@EnableDiscoveryClient
@EnableFeignClients
public class OrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class,args);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Orders {
private int id;
private int pid;
private int state;
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.test.mapper.OrdersMapper">
<insert id="addOrder" parameterType="Orders">
insert into orders values(null,#{pid},#{state})
</insert>
</mapper>
package com.test.mapper;
import com.test.pojo.Orders;
public interface OrdersMapper {
public int addOrder(Orders orders);
}
package com.test.service.impl;
import com.test.client.StockClient;
import com.test.mapper.OrdersMapper;
import com.test.pojo.Orders;
import com.test.service.IOrdersService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrdersService implements IOrdersService {
@Autowired
private StockClient stockClient;
@Autowired
private OrdersMapper ordersMapper;
@Override
public void addOrder(Orders orders)
{
ordersMapper.addOrder(orders);
stockClient.updateStock(orders.getPid());
}
}
package com.test.config;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@Configuration
public class RemoteConfig {
@Bean
@LoadBalanced
public RestTemplate restTemplate()
{
return new RestTemplate();
}
}
package com.test.client;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient("STOCK")
public interface StockClient {
@RequestMapping(value = "/updateStock",method = RequestMethod.GET)
public int updateStock(@RequestParam("pid") int pid);
}
package com.test.controller;
import com.test.pojo.Orders;
import com.test.service.IOrdersService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrdersController {
@Autowired
private IOrdersService ordersService;
@GetMapping("/orders")
public void addOrder()
{
Orders orders=new Orders();
orders.setPid(1);
orders.setState(1);
ordersService.addOrder(orders);
}
}
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/db_stock?characterEncoding=utf8&serverTimezone=GMT%2B8
username: root
password: root
cloud:
nacos:
discovery:
server-addr: http://localhost:8848
application:
name: STOCK
mybatis:
type-aliases-package: com.test.pojo
server:
port: 8082
package com.test;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan(basePackages = "com.test.mapper")
public class StockServiceApplication {
public static void main(String[] args) {
SpringApplication.run(StockServiceApplication.class,args);
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Stock {
private int id;
private int pid;
private int num;
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.test.mapper.StockMapper">
<update id="updateStock" parameterType="int" >
update stock set num=num-1 where pid=#{pid}
</update>
</mapper>
package com.test.mapper;
public interface StockMapper {
public int updateStock(int pid);
}
package com.test.service.impl;
import com.test.mapper.StockMapper;
import com.test.service.IStockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class StockService implements IStockService {
@Autowired
private StockMapper stockMapper;
@Override
public int updateStock(int pid) {
return stockMapper.updateStock(pid);
}
}
package com.test.controller;
import com.test.service.IStockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class StockController {
@Autowired
private IStockService stockService;
@GetMapping("/updateStock")
public int updateStock(int pid)
{
return stockService.updateStock(pid);
}
}
order和stock服务中都添加
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
在order和stock数据库中都添加
CREATE TABLE IF NOT EXISTS `undo_log`
(
`id` BIGINT(20) NOT NULL PRIMARY KEY AUTO_INCREMENT,
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
在order和stock服务中都加
spring.cloud.alibaba.seata.tx-service-group: shanghai
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AEr9Aial-1672223815695)(pic/1665063201377.png)]
目的是告诉seata client 怎么去访问seata-server
在order和stock服务中都加
seata:
registry:
type: nacos
nacos:
server-addr: http://localhost:8848
application: seata-server
username: nacos
password: nacos
group: SEATA_GROUP
config:
type: nacos
nacos:
server-addr: http://localhost:8848
username: nacos
password: nacos
group: SEATA_GROUP
重启两个服务,可以发现出错后,回滚了。