MySQL的XA事务
介绍
在开发中,为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上),但是一个业务场景可能会同时处理两个表的操作。在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可能存在部分节点提交失败的情况,即事务的ACID特性需要在各个不同的数据库实例中保证。比如更新db1库的A表时,必须同步更新db2库的B表,两个更新形成一个事务,要么都成功,要么都失败。
那么我们如何利用mysql实现分布式数据库的事务呢?
mysql是从5.0开始支持分布式事务
这里先声明两个概念:
- 资源管理器(resource manager):用来管理系统资源,是通向事务资源的途径。数据库就是一种资源管理器。资源管理还应该具有管理事务提交或回滚的能力。
- 事务管理器(transaction manager):事务管理器是分布式事务的核心管理者。事务管理器与每个资源管理器(resource
manager)进行通信,协调并完成事务的处理。事务的各个分支由唯一命名进行标识。
mysql在执行分布式事务(外部XA)的时候,mysql服务器相当于xa事务资源管理器,与mysql链接的客户端相当于事务管理器。
Mysql的XA事务分为外部XA和内部XA
- 外部XA用于跨多MySQL实例的分布式事务,需要应用层作为协调者,通俗的说就是比如我们在PHP中写代码,那么PHP书写的逻辑就是协调者。应用层负责决定提交还是回滚,崩溃时的悬挂事务。MySQL数据库外部XA可以用在分布式数据库代理层,实现对MySQL数据库的分布式事务支持,例如开源的代理工具:网易的DDB,淘宝的TDDL等等。
- 内部XA事务用于同一实例下跨多引擎事务,由Binlog作为协调者,比如在一个存储引擎提交时,需要将提交信息写入二进制日志,这就是一个分布式内部XA事务,只不过二进制日志的参与者是MySQL本身。Binlog作为内部XA的协调者,在binlog中出现的内部xid,在crash recover时,由binlog负责提交。(这是因为,binlog不进行prepare,只进行commit,因此在binlog中出现的内部xid,一定能够保证其在底层各存储引擎中已经完成prepare)。
XA 事务支持限于 InnoDB 存储引擎。
MySQL XA 实施是针对外部 XA 的,其中,MySQL 服务器作为资源管理器,而客户端程序作为事务管理器。未实施“内部 XA”。这样,就允许 MySQL 服务器内的单独存储引擎作为 RM(资源管理器),而服务器本身作为 TM(事务管理器)。处理包含 1 个以上存储引擎的 XA 事务时,需要内部 XA。内部 XA 的实施是不完整的,这是因为,它要求存储引擎在表处理程序层面上支持两阶段提交,目前仅对 InnoDB 实现了该特性。
- 对于 XA START,不支持 JOIN 和 RESUME 子句。
- 对于 XA END,不支持 SUSPEND [FOR MIGRATE]子句。
在全局事务内,对于每个 XA 事务,xid 值的 bqual 部分应是不同的,该要求是对当前 MySQL XA 实施的限制。它不是 XA 规范的组成部分。
语法
mysql xa事务的语法
1、首先要确保mysql开启XA事务支持
1
|
SHOW VARIABLES LIKE '%xa%'
|
如果innodb_support_xa的值是ON就说明mysql已经开启对XA事务的支持了。 如果不是就执行:
1
|
SET innodb_support_xa = ON
|
主要有:
1
2
3
4
5
6
|
XA START 'any_unique_id'; // 'any_unique_id' 是用户给的,全局唯一在一台mysql中开启一个XA事务
XA END 'any_unique_id '; //标识XA事务的操作结束
XA PREPARE 'any_unique_id'; //告知mysql 准备提交这个xa事务
XA COMMIT 'any_unique_id'; //告知mysql提交这个xa事务
XA ROLLBACK 'any_unique_id'; //告知mysql回滚这个xa事务
XA RECOVER;//查看本机mysql目前有哪些xa事务处于prepare状态
|
XA事务恢复
如果执行分布式事务的mysql crash了,mysql 按照如下逻辑进行恢复:
- 如果这个xa事务commit了,那么什么也不用做
- 如果这个xa事务还没有prepare,那么直接回滚它
- 如果这个xa事务prepare了,还没commit, 那么把它恢复到prepare的状态,由用户去决定commit或rollback
当mysql crash后重新启动之后,执行“XA RECOVER;”查看当前处于prepare状态的xa事务,然后commit或rollback它们。
如果客户端连接中止而服务器继续运行,服务器将回滚任何未完成的 XA 事务,即使该事务已达到 PREPARED 状态也同样。它应能提交或回滚 PREPARED XA 事务,但在不更改二进制日志机制的情况下不能这样。
使用限制
XA事务和本地事务以及锁表操作是互斥的
开启了xa事务就无法使用本地事务和锁表操作
1
2
3
4
5
6
|
mysql> xa start 't1xa';
Query OK, 0 rows affected (0.04 sec)
mysql> begin;
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state
mysql> lock table t1 read;
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state
|
开启了本地事务就无法使用xa事务
1
2
3
4
|
mysql> begin;
Query OK, 0 rows affected (0.00 sec)
mysql> xa start 'rrrr';
ERROR 1400 (XAE09): XAER_OUTSIDE: Some work is done outside global transaction
|
xa start 之后必须xa end, 否则不能执行xa commit 和xa rollback
所以如果在执行xa事务过程中有语句出错了,你也需要先xa end一下,然后才能xarollback。
注意事项
mysql只是提供了xa事务的接口,分布式事务中的mysql实例之间是互相独立的不感知的。 所以用户必须自己实现分布式事务的调度器
场景重现
比如我们现在有两个数据库,mysql3306和mysql3307。这里我们使用docker来创建这两个实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# mysql3306创建命令
docker run -d -p 3306:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3306.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3306:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7
# msyql3306的配置
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
# mysql3307创建命令
docker run -d -p 3307:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3307.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3307:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7
# msyql3307的配置
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
server-id = 2
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
在mysql3306中
我们有一个user表
create table user (
id int,
name varchar(10),
score int
);
insert into user values(1, "foo", 10)
|
在mysql3307中,我们有一个wallet表。
1
2
3
4
5
6
|
create table wallet (
id int,
money float
);
insert into wallet values(1, 10.1)
|
我们可以看到,id为1的用户初始分数(score)为10,而它的钱,在wallet中初始钱(money)为10.1。
现在假设我们有一个操作,需要对这个用户进行操作:每次操作增加分数2,并且增加钱数1.2。
这个操作需要很强的一致性。
XA命令
这里是一个分布式事务的概念,我们可以使用2PC的方法进行保证事务,mysql为2PC的实现增加了xa命令.
同样,我用golang写了一个使用xa命令的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
package main
import (
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的连接
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的连接
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 开始前显示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 启动
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
|
首先看成功的情况:

一切完美。
如果我们在prepare阶段抛出panic,那么结果如下:

证明在第一阶段出现异常是可以回滚的。
但是如果我们在commit阶段抛出panic:

我们发现,这里的分数增加了,但是money却没有增加。
那么这个xa和单个事务有什么区别呢?我又陷入了深深的沉思…
XA的正确用法
2pc之所以分为两个阶段,是强调的是每个阶段都会持久化,就是第一个阶段完成了之后,每个mysql实例就把第一个阶段的请求实例化了,这个时候如果mysql实例停止了,每次重启的时候都会重新回复这个commit。
我们把这个代码的rollback去掉,假设commit必须成功。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
package main
import (
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的连接
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的连接
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 开始前显示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
// db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
// db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 启动
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
|

这个时候,我们停掉程序(停掉mysql的链接),使用xa recover可以发现,db2的xa事务还留在db2中了。

我们在控制台直接调用xa commit ‘1585644880’ 还能继续把这个xa事务进行提交。

这下money就进行了提交,又恢复了一致性。
所以呢,我琢磨了一下,我们写xa的代码应该如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
package main
import (
"database/sql"
"fmt"
"log"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的连接
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的连接
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 开始前显示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 启动
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
// TODO: 尝试重新提交COMMIT
// TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交
log.Println("xid:" + xid)
}
// panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
log.Println("xid:" + xid)
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
|
就是第二阶段的commit,我们必须设定它一定会“成功”,如果有不成功的情况,那么就需要记录下不成功的xid,有一个数据恢复逻辑,重新commit这个xid。来保证最终一致性。
binlog
其实我们使用binlog也能看出一些端倪
1
2
3
|
# 这里的mysql-bin.0003替换成为你当前的log
SHOW BINLOG EVENTS in 'mysql-bin.000003';
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
## XA的binlog
| mysql-bin.000003 | 1967 | Anonymous_Gtid | 1 | 2032 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2032 | Query | 1 | 2138 | XA START X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2138 | Table_map | 1 | 2190 | table_id: 108 (hade1.user) |
| mysql-bin.000003 | 2190 | Update_rows | 1 | 2252 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000003 | 2252 | Query | 1 | 2356 | XA END X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2356 | XA_prepare | 1 | 2402 | XA PREPARE X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2402 | Anonymous_Gtid | 1 | 2467 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2467 | Query | 1 | 2574 | XA COMMIT X'31353835363338363233',X'',1
## 非xa的事务
| mysql-bin.000003 | 2574 | Anonymous_Gtid | 1 | 2639 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2639 | Query | 1 | 2712 | BEGIN |
| mysql-bin.000003 | 2712 | Table_map | 1 | 2764 | table_id: 108 (hade1.user) |
| mysql-bin.000003 | 2764 | Update_rows | 1 | 2826 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000003 | 2826 | Xid | 1 | 2857 | COMMIT /*xid=67*/
|
我们很明显可以看到两阶段提交中是有两个GTID的,生成一个GTID就代表内部生成一个事务,所以第一个阶段prepare结束之后,第二个阶段commit的时候就持久化了第一个阶段的内容,并且生成了第二个事务。当commit失败的时候,最多就是第二个事务丢失,第一个事务实际上已经保存起来了了(只是还没commit)。
而非xa的事务,只有一个GTID,在commit之前任意一个阶段出现问题,整个事务就全部丢失,无法找回了。所以这就是mysql xa命令的机制。
参考
使用golang理解mysql的两阶段提交
Mysql分布式事务及优缺点详解
【分布式事务】面试官问我:MySQL 中的 XA 事务崩溃了如何恢复??