前言
目前对 TIKV
分布式事务接口(Prewrite/Commit/Rollback/CheckTxnStatus/ResolveLocks
等等), 原理解读的文章比较丰富,例如 TIKV 分布式事务--乐观事务 2PC 概览 。
如果你看了这些文章后,仍然不满足,还是想看一遍源码了解更多细节,那么本篇文章适合你。本篇文章将会进行一次简化版的源码走读,看看 TIKV
是如何从代码层面实现分布式事务,处理各种细节问题的。
同时期望对想要对 TIKV
贡献代码,但是刚刚接触 TIKV
庞大代码无所适从的同学有所帮助。
本文中一些例子涉及的建表语句如下:
CREATE TABLE `MANAGERS_UNIQUE` (
`MANAGER_ID` int(11) NOT NULL,
`FIRST_NAME` varchar(45) NOT NULL,
`LAST_NAME` varchar(45) NOT NULL,
`LEVER` int(11) DEFAULT NULL,
PRIMARY KEY (`MANAGER_ID`)
UNIQUE KEY `FIRST` (`FIRST_NAME`),
KEY `LEVEL` (`LEVEL`)
)
PreWrite
接口
接口调用场景举例
PreWrite
接口是 TIKV
分布式事务二阶段提交必然调用的接口,本小节将会以 INSERT/DELETE/UPDATE/INSERT
实际场景为例,查看 PreWrite
接口调用的参数,给大家一个大体的印象。
重要参数
PreWrite
的重要参数有:
-
Mutation
类型。- 目前定义的有
Put
、Del
、Lock
、Rollback
、PessimisticLock
等等
- 目前定义的有
-
Mutation
KEY-VALUE
。需要进行加锁的KEY-VALUE
值 -
Mutation
assertion
。需要进行验证的选项,例如:NotExist
(需要验证加锁的KEY
不存在write
提交记录)-
Exist
(验证加锁的KEY
一定存在write
记录) DoPessimisticCheck
(悲观事务中,需要验证加锁的KEY
已经有了悲观锁,防止锁丢失现象)
-
Primary
KEY
。整个分布式事务的Primary
KEY
- 这个
Primary KEY
至关重要,Primary
被二阶段提交成功则代表整个分布式事务提交成功。整个分布式事务的提交可能涉及多个Region
的Prewrite
的接口调用。
- 这个
-
try_one_pc
。尝试一次RPC
完成二阶段提交。- 如果整个分布式事务只调用一次
Prewrite
即可完成,那么会设置该参数,将Prewrite + Commit
两次接口调用变成一次接口调用
- 如果整个分布式事务只调用一次
-
for_update_ts
。悲观事务+公平锁所需。-
悲观事务中,如果未采用公平锁,那么直接验证
lock
.ts 和事务的start_ts
即可,不相同说明该锁不是本事务的,返回错误。 -
如果采用了公平锁,那么就会出现以下
case
(多谢TIKV
完整的注释):- 事务
t1
开启公平锁的优化,并且调用了PessimisticLockRequest
接口 - 由于某些原因发生了锁丢失现象,
lock
记录不见了 - 事务
t2
对KEY
进行了完整的二阶段提交,成功更改了数据 - 事务
t1
由于某些原因 (可能是网络原因) 重复调用了PessimisticLockRequest
接口,仍然开启了公平锁的优化。 -
PessimisticLockRequest
虽然发现了新的write
记录,正常情况下需要发送writeConflict
错误的。但是由于公平锁的优化,悲观锁仍然加锁成功,lock.ts
也就仍然是t1.start_ts
。不同的是lock
.for_update_ts
和原PessimisticLockRequest
请求的不一样。 - 如果客户端没有去验证
PessimisticLockRequest
返回Response
的for_update_ts
,直接发送Prewrite
请求的话,会导致t2
请求的提交记录被t1
覆盖。 - 因此
Prewrite
请求需要在公平锁优化命中的情况下,再次检查for_update_ts
的一致性。
- 事务
-
INSERT
SQL 为:
mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)
mysql> INSERT INTO MANAGERS_UNIQUE(MANAGER_ID,FIRST_NAME, LAST_NAME, LEVEL) VALUES (14276,'Brad10','Craven10',10);
Query OK, 1 row affected (0.01 sec)
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
当用户调用 Commit
后,TIDB
开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite
接口:
sched_txn_command kv::command::pessimistic_prewrite
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(2))
@ start_ts:448078770886148097 lock_ttl:23155 txn_size:1 min_commit_ts:448078776168087554 max_commit_ts:448078776694210561
try_one_pc:false assertion_level:Fast
(for_update_ts constraints: []) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
sched_txn_command kv::command::pessimistic_prewrite
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD value:007D0180000100000001010000
assertion:NotExist, SkipPessimisticCheck),
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000B0013001400313432373642726164313043726176656E31300A
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(0))
@ 448078770886148097 23155 2 448078776168087554 448078776694210561
try_one_pc:false assertion_level:Fast
(for_update_ts constraints: []) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
通过解析可以知道,第一次 Prewrite
接口的 KEY
是 Brad10
这个唯一索引,其 primary
key
是它自身:
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
"7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC"
└─## decode hex key
└─"t\200\000\000\000\000\000\000\377j_i\200\000\000\000\000\377\000\000\002\001Brad\37710\000\000\375\000\000\000\374"
├─## decode mvcc key
│ └─"t\200\000\000\000\000\000\000j_i\200\000\000\000\000\000\000\002\001Brad10\000\000\375"
│ ├─## table prefix
│ │ └─table: 106
│ └─## table index key
│ ├─table: 106
│ ├─index: 2
│ └─"\001Brad10\000\000\375"
│ └─## decode index values
│ └─kind: Bytes, value: Brad10
└─## table prefix
└─table: 255
后面一个 Prewrite
接口的 KEY
是普通索引 LEVEL_RowID
的值 10_14276
,和 RowID 的值 14276
,其 primary
key
还是Brad10
这个唯一索引
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD value:007D0180000100000001010000
assertion:NotExist, SkipPessimisticCheck),
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000B0013001400313432373642726164313043726176656E31300A
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
"7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD"
└─## decode hex key
└─"t\200\000\000\000\000\000\000\377j_i\200\000\000\000\000\377\000\000\003\003\200\000\000\000\377\000\000\000\n\001142\37776\000\000\000\374\000\000\375"
├─## decode mvcc key
│ └─"t\200\000\000\000\000\000\000j_i\200\000\000\000\000\000\000\003\003\200\000\000\000\000\000\000\n\00114276\000\000\000\374"
│ ├─## table prefix
│ │ └─table: 106
│ └─## table index key
│ ├─table: 106
│ ├─index: 3
│ └─"\003\200\000\000\000\000\000\000\n\00114276\000\000\000\374"
│ └─## decode index values
│ ├─kind: Int64, value: 10
│ └─kind: Bytes, value: 14276
└─## table prefix
└─table: 255
"7480000000000000FF6A5F720131343237FF36000000FC000000FC"
└─## decode hex key
└─"t\200\000\000\000\000\000\000\377j_r\0011427\3776\000\000\000\374\000\000\000\374"
├─## decode mvcc key
│ └─"t\200\000\000\000\000\000\000j_r\00114276\000\000\000\374"
│ ├─## table prefix
│ │ └─table: 106
│ └─## table row key
│ ├─table: 106
│ └─"\00114276\000\000\000\374"
│ └─## decode index values
│ └─kind: Bytes, value: 14276
└─## table prefix
└─table: 255
由于悲观事务在 INSERT
的时候,已经调用了 PessimisticLockRequest
接口,对唯一索引和 RowID
进行了加锁,因此 Prewrite
的时候,需要进行 DoPessimisticCheck
,防止发生了锁丢失问题。普通索引 LEVEL
并没有加悲观锁,因此是 SkipPessimisticCheck
同时,需要确保 唯一索引和 RowID
的唯一性,因此加入了 NotExist
的 assertion
。由于普通索引的格式为 indexValue_RowID
,带有 RowID
,因此也是独一无二的,也需要 NotExist
的 assertion
。
INSERT
并没有命中公平锁,因此不需要进行 for_update_ts
的一致性验证。
INSERT
的PessimisticLockRequest
接口参数如下,其中allow_lock_with_conflict
就是开启公平锁的参数:
DELETE
SQL 为:
mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)
mysql> DELETE from MANAGERS_UNIQUE where FIRST_NAME='Brad10';
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
当用户调用 Commit
后,TIDB
开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite
接口:
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD
assertion:Exist, SkipPessimisticCheck),
(Delete key:7480000000000000FF6A5F720131343237FF36000000FC000000FC
assertion:Exist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(0))
@ 448099356132769793 40939 2 448099366081134600 448099366602539009
try_one_pc:false assertion_level:Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint index: 1 expected_for_update_ts: 448099356132769793]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC
assertion:Exist, DoPessimisticCheck)])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(2))
@ 448099356132769793 40939 1 448099366081134600 448099366602539009
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099356132769793]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
可以看到 DELETE
和 INSERT
非常类似,仍然是两次 Prewrite
请求,仍然还是以唯一索引 ‘Brad10
’ 为 Primary Key
稍微有点不同的是,这次 assertion
从 NotExist
变成了 Exist
。删除数据当然前提条件数据是存在的。
同时由于 DELETE
的 PessimisticLockRequest
触发了公平锁功能,因此需要加入 for_update_ts
的验证。
DELETE
语句 PessimisticLockRequest
的参数如下,其中 allow_lock_with_conflict
就是开启公平锁的参数:
UPDATE
SQL 为:
mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)
mysql> UPDATE MANAGERS_UNIQUE SET FIRST_NAME="Brad9" where FIRST_NAME='Brad10';
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
当用户调用 Commit
后,TIDB
开始进行二阶段提交。通过打印日志发现 TIKV
实际上收到了两次 PreWrite
接口:
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC
assertion:Exist, DoPessimisticCheck),
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF39000000FC000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000
assertion:NotExist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(2))
@ 448099651396042753 44613 2 448099662328233986 448099662829191169
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099651396042753]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000A00120013003134323736427261643943726176656E31300A assertion:Exist, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(0))
@ 448099651396042753 44613 1 448099662328233986 448099662829191169
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099651396042753]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
UPDATE
也涉及到了 3 个 KEY
,分别是 RowID、旧的唯一索引 Value
、新的唯一索引 Value
。
第一个请求中包含两个 KEY
,是唯一索引新旧的 VALUE
,操作类型是 Delete
旧的唯一索引,校验其存在。同时 PUT
新的唯一索引,校验其不存在。
第二个请求是 RowID
这个 KEY
。
由于 UPDATE 会触发公平锁优化,因此两个 Prewrite
都含有 for_update_ts
的一致性校验。
UPDATE
语句 PessimisticLockRequest
的参数如下,其中 allow_lock_with_conflict
就是开启公平锁的参数:
SELECT FOR UPDATE
SQL 为:
mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)
mysql> SELECT * FROM MANAGERS_UNIQUE WHERE FIRST_NAME='Brad10' FOR UPDATE;
+------------+------------+-----------+-------+
| MANAGER_ID | FIRST_NAME | LAST_NAME | LEVEL |
+------------+------------+-----------+-------+
| 14276 | Brad10 | Craven10 | 10 |
+------------+------------+-----------+-------+
1 row in set (0.02 sec)
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
当用户调用 Commit
后,TIDB
开始进行二阶段提交。通过打印日志发现 TIKV
实际上收到了两次 PreWrite
接口:
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Lock key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC
assertion:None, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(1))
@ 448099845197266945 44379 1 448099856050028546 448099856569073665
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099845197266945]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
sched_txn_command kv::command::pessimistic_prewrite mutations([
(Lock key:7480000000000000FF6A5F720131343237FF36000000FC000000FC
assertion:None, DoPessimisticCheck)
])
primary(74800000000000006A5F698000000000000002014272616431300000FD)
secondary_len(Some(0))
@ 448099845197266945 44379 1 448099856050028546 448099856569073665
false Fast
(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099845197266945]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }
这次 Prewrite
的 mutation
类型为 LOCK
类型,分别对 RowID
和唯一索引 'Brad10
' 进行操作,主键仍然是唯一索引值 Brad10
。
其他类似,不再赘述。
SELECT
语句 PessimisticLockRequest
的参数如下,其中 allow_lock_with_conflict
就是开启公平锁的参数:
源码快读
大概流程
Prewrite
for range Prewrite_Action:
Prewrite_Action 未返回错误:
更新 final_min_commit_ts
Prewrite_Action 返回错误:
WriteConflict && commit_ts > start_ts:
尝试继续查询本事务的提交记录:get_txn_commit_record:
-> 本事务已经被提交: write.start_ts = t1.ts1
------------------------------------------------> return ok,Prewrite 结束
Others:
-> 事务写冲突:未找到 write.start_ts = t1.ts1
------------------------------------------------> return 写冲突,Prewrite 结束
KeyIsLocked:
尝试获取本事务的提交记录:get_txn_commit_record:
-> 本事务已经被提交: (lock.ts != start_ts && write.start_ts = t1.ts1)
------------------------------------------------> continue
Others:
-> 锁冲突:未找到记录 write.start_ts = t1.ts1
------------------------------------------------> 更新 locks 数组
Others:
--------------------------------------------------------> return Err,Prewrite 结束
-> 循环完毕,返回结果
--------------------------------------------------------------------> return (locks, final_min_commit_ts)
Prewrite_Action:
load_lock
有锁:
check_lock
-> key 被其他事务先锁了:lock.ts != start_ts
----------------------------------------------------> return ErrorInner::KeyIsLocked
-> 重复 Prewrite 命令: lock.ts == start_ts
----------------------------------------------------> return LockStatus::Locked
没有锁:
check_for_newer_version :
-> 标准写冲突: commit_ts > start_ts
--------------------------------------------> return WriteConflict::Optimistic
-> 是回滚记录: commit_ts == start_ts && (write_type == WriteType::Rollback || write.has_overlapped_rollback)
--------------------------------------------> return WriteConflict:SelfRolledBack
-> 有一个旧记录:commit_ts < start_ts
-------------------------------------------->
write_lock: 写 lock,返回最新的 min_commit_ts
----------------------------------------> return (final_min_commit_ts)
-> none:没找到记录
-------------------------------------------->
write_lock: 写 lock,返回最新的 min_commit_ts
----------------------------------------> return (final_min_commit_ts)
Prewrite 接口场景:
预期操作: lock_cf 没有发现锁,write_cf 也没有新的 recorder, ------------------------------------> ok, 在 lock_cf 上写 lock 锁住 key
预期异常:
lock_cf 发现了其他事务的锁,write_cf 发现本事务提交的记录 -------------------------------> ok, 返回 commit_ts
lock_cf 发现了其他事务的锁,write_cf 发现本事务回滚的记录 -------------------------------> 返回 locks
lock_cf 发现了其他事务的锁,write_cf 未发现任何记录 ------------------------------------> 返回 locks
lock_cf 发现了本事务的锁,------------------------------------------------------------> 返回 LockStatus::Locked
lock_cf 未发现任何锁,write_cf 发现新的事务记录,也发现本事务提交的记录 -------------------> ok, 返回 commit_ts
lock_cf 未发现任何锁,write_cf 发现新的事务记录,也发现本事务回滚的记录 -------------------> 返回写冲突的 commit_ts
lock_cf 未发现任何锁,write_cf 发现新的事务记录,未发现本事务的记录 ----------------------> 返回写冲突的 commit_ts
lock_cf 未发现任何锁,write_cf 发现事务回滚记录 ---------------------------------------> 返回写冲突的 commit_ts
源码简读
commands::Prewrite
源码路径:src/storage/txn/commands/prewrite.rs
commands::Prewrite
的简化代码如下,源码逻辑中还涉及到了 Async Commit / 1PC
的判断逻辑,还有锁冲突、写冲突等等错误处理。如果对这些概念不太了解,可以先看一下: TIKV 分布式事务--Prewrite 接口乐观事务详解
-
首先对于有
secondary_keys
的请求,优先尝试进行Async Commit
的优化。1PC
的优化是否开启需要客户端来进行判断,利用参数传递给TIKV
Prewrite
接口。 -
对于每个
Prewrite
的mutation
,都调用一次actions::prewrite
函数进行处理。- 在调用之前,需要查看当前
mutation
的KEY
是否是Primary KEY
,如果是Primary KEY
的话,Async Commit
需要在对Primary Key
加锁的时候,在Lock
上面添加secondary key
的信息
- 在调用之前,需要查看当前
-
actions::prewrite
函数如果返回OK
结果,那么就更新final_min_commit_ts
,然后继续FOR
循环对下一个mutation
调用actions::prewrite
函数 -
actions::prewrite
函数返回异常-
如果是
PessimisticLockNotFound
错误,那么说明悲观事务过程中加的悲观锁发生了丢失现象,需要使用check_committed_record_on_err
函数查看一下,是否write
记录里面已经有本事务的提交成功记录了。- 如果确实本事务已经提交,那么
Prewrite
流程就可以提前结束了,返回OK
- 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么还是终止
Prewrite
流程,向客户端返回PessimisticLockNotFound
错误
- 如果确实本事务已经提交,那么
-
如果是写冲突
WriteConflict
,需要使用check_committed_record_on_err
函数查看一下,是否write
记录里面已经有本事务的提交成功记录了。- 如果确实本事务已经提交,那么
Prewrite
流程就可以提前结束了,返回OK
- 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么还是终止
Prewrite
流程,向客户端返回WriteConflict
错误
- 如果确实本事务已经提交,那么
-
如果是锁冲突
KeyIsLocked
错误,需要使用check_committed_record_on_err
函数查看一下,是否write
记录里面已经有本事务的提交成功记录了。- 如果确实本事务已经提交,那么
Prewrite
流程就可以提前结束了,返回OK
- 如果没有找到本事务的提交记录,或者找到了回滚的记录,这次并不会终止
Prewrite
流程,而是会记录下冲突的锁,更新locks
数组,继续调用下一个mutation
的actions::prewrite
函数
- 如果确实本事务已经提交,那么
-
如果尝试使用了
Async Commit/1PC
的优化,但是actions::prewrite
函数未能返回min_commit_ts
,那么将会降级Async Commit
优化,采用传统的Prewrite
2PC
方式,将Prewrite
接口Response
返回值final_min_commit_ts
重置为 0,这样客户端获取final_min_commit_ts
后就不会采用Async Commit/1PC
的方式提交事务 -
如果尝试使用了
Async Commit/1PC
的优化,但是actions::prewrite
函数返回了CommitTsTooLarge
错误,那么很有可能当前事务其实已经提交过了,本次提交是重复提交。还是使用check_committed_record_on_err
函数查看一下- 如果确实本事务已经提交,那么
Prewrite
流程就可以提前结束了,返回OK
- 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么继续降级
Async Commit
优化,采用传统的Prewrite
2PC
方式。
- 如果确实本事务已经提交,那么
-
fn prewrite(
&mut self,
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
extra_op: ExtraOp,
) -> Result<(Vec<std::result::Result<(), StorageError>>, TimeStamp)> {
let commit_kind = match (&self.secondary_keys, self.try_one_pc) {
(_, true) => CommitKind::OnePc(self.max_commit_ts),
(&Some(_), false) => CommitKind::Async(self.max_commit_ts),
(&None, false) => CommitKind::TwoPc,
};
...
let async_commit_pk = self
.secondary_keys
.as_ref()
.filter(|keys| !keys.is_empty())
.map(|_| Key::from_raw(&self.primary));
let mut async_commit_pk = async_commit_pk.as_ref();
let mut final_min_commit_ts = TimeStamp::zero();
let mut locks = Vec::new();
fn check_committed_record_on_err(
prewrite_result: MvccResult<(TimeStamp, OldValue)>,
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
key: &Key,
) -> Result<(Vec<std::result::Result<(), StorageError>>, TimeStamp)> {
match reader.get_txn_commit_record(key)? {
TxnCommitRecord::SingleRecord { commit_ts, write }
if write.write_type != WriteType::Rollback =>
{
txn.clear();
Ok((vec![], commit_ts))
}
_ => Err(prewrite_result.unwrap_err().into()),
}
}
for m in mem::take(&mut self.mutations) {
...
let mut secondaries = &self.secondary_keys.as_ref().map(|_| vec![]);
if Some(m.key()) == async_commit_pk {
secondaries = &self.secondary_keys;
}
let need_min_commit_ts = secondaries.is_some() || self.try_one_pc;
let prewrite_result = prewrite(
txn,
reader,
&props,
m,
secondaries,
pessimistic_action,
expected_for_update_ts,
);
match prewrite_result {
Ok((ts, old_value)) if !(need_min_commit_ts && ts.is_zero()) => {
if need_min_commit_ts && final_min_commit_ts < ts {
final_min_commit_ts = ts;
}
...
}
Ok((..)) => {
props.commit_kind = CommitKind::TwoPc;
...
fallback_1pc_locks(txn);
final_min_commit_ts = TimeStamp::zero();
}
Err(MvccError(box MvccErrorInner::WriteConflict {
start_ts,
conflict_commit_ts,
..
})) if conflict_commit_ts > start_ts => {
return check_committed_record_on_err(prewrite_result, txn, reader, &key);
}
Err(MvccError(box MvccErrorInner::PessimisticLockNotFound { .. })) => {
return check_committed_record_on_err(prewrite_result, txn, reader, &key);
}
Err(MvccError(box MvccErrorInner::CommitTsTooLarge { .. })) => {
// The prewrite might be a retry and the record may have been committed.
// So, we need to prevent the fallback to avoid duplicate commits.
if let Ok(res) =
check_committed_record_on_err(prewrite_result, txn, reader, &key)
{
return Ok(res);
}
// fallback to not using async commit or 1pc
props.commit_kind = CommitKind::TwoPc;
...
fallback_1pc_locks(txn);
final_min_commit_ts = TimeStamp::zero();
}
Err(MvccError(box MvccErrorInner::KeyIsLocked { .. })) => {
match check_committed_record_on_err(prewrite_result, txn, reader, &key) {
Ok(res) => return Ok(res),
Err(e) => locks.push(Err(e.into())),
}
}
Err(e @ MvccError(box MvccErrorInner::AssertionFailed { .. })) => {
if assertion_failure.is_none() {
assertion_failure = Some(e);
}
}
Err(e) => return Err(Error::from(e)),
}
}
if let Some(e) = assertion_failure {
return Err(Error::from(e));
}
Ok((locks, final_min_commit_ts))
}
get_txn_commit_record:扫描从 max_ts 到 t1.start_ts 之间 key 的 write record 来判断 t1 状态
return TxnCommitRecord::SingleRecord: 找到了 write.start_ts = t1.ts1 的 WriteRecord,这个记录可能 type 是 rollback,也可能是正常的记录
return TxnCommitRecord::OverlappedRollback: 找到了 t1.start_ts == t3.commit_ts, 而且 has_overlapped_write 是 true,实际上就是回滚记录
return TxnCommitRecord::None(Some(OverlappedWrite)): 找到了 t1.start_ts == t3.commit_ts, 而且 has_overlapped_write 为 false。实际上该记录和 rollback 记录重叠了,需要设置 has_overlapped_write
return TxnCommitRecord::None: 没找到 t1 的 commit 记录
commands::write_result
commands::Prewrite
调用结束后,TIKV 将会调用 write_result
:
- 如果成功触发了
1PC
功能,那么直接将事务提交,不需要再写LOCK
- 如果
Prewrite
流程过程中,没有锁冲突或者写冲突,那么直接调用txn.into_modifies()
函数将数据通过RAFT
协议更新到TIKV
集群 - 如果遇到了锁冲突,那么返回
result
,向客户端返回locks
数组
fn write_result(
self,
locks: Vec<std::result::Result<(), StorageError>>,
mut txn: MvccTxn,
final_min_commit_ts: TimeStamp,
rows: usize,
async_apply_prewrite: bool,
) -> WriteResult {
let async_commit_ts = if self.secondary_keys.is_some() {
final_min_commit_ts
} else {
TimeStamp::zero()
};
let mut result = if locks.is_empty() {
let (one_pc_commit_ts, released_locks) =
one_pc_commit(self.try_one_pc, &mut txn, final_min_commit_ts);
...
let mut to_be_write = WriteData::new(txn.into_modifies(), extra);
to_be_write.set_disk_full_opt(self.ctx.get_disk_full_opt());
WriteResult {...
}
} else {
...
WriteResult {...
};
// Currently if `try_one_pc` is set, it must have succeeded here.
if (!async_commit_ts.is_zero() || self.try_one_pc) && async_apply_prewrite {
result.response_policy = ResponsePolicy::OnCommitted
}
result
}
}
commands::handle_1pc_locks
如果 1PC
参数为 true
,并且已经通过 Prewrite
各种 Lock
、write
检测,那么会调用 handle_1pc_locks
直接进行二阶段的 commit
流程,跳过写 LOCK
的流程,直接将 KEY-VALUE
记录写入到 write CF
去,同时删除之前悲观事务过程中所写的悲观锁,完成提交。
fn handle_1pc_locks(txn: &mut MvccTxn, commit_ts: TimeStamp) -> ReleasedLocks {
let mut released_locks = ReleasedLocks::new();
for (key, lock, delete_pessimistic_lock) in std::mem::take(&mut txn.locks_for_1pc) {
let write = Write::new(
WriteType::from_lock_type(lock.lock_type).unwrap(),
txn.start_ts,
lock.short_value,
)
.set_last_change(lock.last_change)
.set_txn_source(lock.txn_source);
// Transactions committed with 1PC should be impossible to overwrite rollback
// records.
txn.put_write(key.clone(), commit_ts, write.as_ref().to_bytes());
if delete_pessimistic_lock {
released_locks.push(txn.unlock_key(key, true, commit_ts));
}
}
released_locks
}
actions::Prewrite
源码路径:src/storage/txn/actions/prewrite.rs
actions::prewrite
函数也是核心函数,主要流程是:
-
load_lock
函数来检查 KEY 是否已经被加锁-
如果存在锁的话,
check_lock
将会进一步检查锁的状态-
check_lock
可能返回LockStatus::Locked
,说明锁已经加锁成功了,应该属于接口重复调用,actions::prewrite
函数流程终结,直接返回OK
-
check_lock
可能返回LockStatus::Pessimistic
,说明本事务是 悲观事务,同时查到的锁是悲观锁,符合预期,后期continue
继续actions::prewrite
流程将悲观锁修改为正常锁。 -
check_lock
可能返回ERR:KeyIsLocked
, 说明该锁是其他事务加的,actions::prewrite
函数流程终结 -
check_lock
可能返回ERR:PessimisticLockNotFound
,说明悲观锁未能找到 (start_ts
没有匹配上),或者for_update_ts
不匹配,可能发生了锁丢失问题,actions::prewrite
函数流程终结 -
check_lock
可能返回ERR:LockTypeNotMatch
,事务是悲观事务,但是获取到了普通锁,actions::prewrite
函数流程终结
-
-
如果不存在锁的话
-
如果是悲观锁,那么理论上
load_lock
查询到才符合预期,此时是非预期情况,可能发生了锁丢失问题,调用amend_pessimistic_lock
继续查看是否存在补救的可能- 如果目前为止,
KEY
还没有新的提交记录,那么可以补救,直接对KEY
加普通锁,继续Prewrite
流程即可,同时对KEY
进行check_assertion
,验证其是否符合Priwrite
的参数要求。 - 如果已经有新事务更新了
KEY
,那么只能返回ERR:PessimisticLockNotFound
,终止actions::prewrite
函数
- 如果目前为止,
-
如果不是悲观锁,符合预期,那么继续
actions::prewrite
流程
-
-
-
如果
skip_constraint_check
为true
,跳过一致性校验 -
如果
skip_constraint_check
为false
,那么将会调用check_for_newer_version
查看write
记录,进行进一步检查-
如果找到了本事务的回滚记录,
commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
,返回写冲突错误,终止actions::prewrite
流程 -
如果找到了
commit_ts > txn.start_ts
记录,- 乐观事务下,直接返回写冲突错误,终止
actions::prewrite
流程 - 悲观事务性,如果
Assertion
参数带有DoConstraintCheck
返回写冲突错误,终止actions::prewrite
流程
- 乐观事务下,直接返回写冲突错误,终止
-
如果找到了
commit_ts > for_update_ts
, 悲观事务下,也需要返回PessimisticLockNotFound
错误,终止actions::prewrite
流程 (回滚记录除外,目前对这个场景不太了解) -
write
记录 符合预期,还需要验证should_not_exist
这个Assertion
,确保事务的正确性,否则会报AlreadyExist
错误,终止actions::prewrite
流程 -
没有找到任何记录,符合预期,返回 OK
-
-
接下来还要进行
check_assertion
流程-
如果悲观锁是被上述流程补救 (
amend_pessimistic_lock
) 后加的,那么实际上已经进行了check_assertion
,可以跳过 -
如果
skip_constraint_check
为false
,实际上write
记录已经加载完毕,那么直接对write
记录进行Assertion
验证即可- 当然也有可能
write
记录并不是PUT
或者DELETE
记录,这个时候需要从底层重新加载write
记录
- 当然也有可能
-
如果
skip_constraint_check
为true
,那么需要开启assertion_level=Strict
模式才会查询write
记录,并且进行Assertion
验证。否则的话,直接返回OK
-
-
如果当前
mutation
的类型是Mutation::CheckNotExists
的话,目前已经检查了LOCK
、write
记录和Assertion
验证,符合请求的预期,不需要加锁,直接返回OK
即可,终止actions::prewrite
流程 -
write_lock
函数将会构建LOCK
信息。特别的,如果是Async Commit
的Primary KEY
的话,还需要把secondary key
的信息写入到LOCK
中
pub fn prewrite<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<S>,
txn_props: &TransactionProperties<'_>,
mutation: Mutation,
secondary_keys: &Option<Vec<Vec<u8>>>,
pessimistic_action: PrewriteRequestPessimisticAction,
expected_for_update_ts: Option<TimeStamp>,
) -> Result<(TimeStamp, OldValue)> {
let mut mutation =
PrewriteMutation::from_mutation(mutation, secondary_keys, pessimistic_action, txn_props)?;
let mut lock_amended = false;
let lock_status = match reader.load_lock(&mutation.key)? {
Some(lock) => mutation.check_lock(lock, pessimistic_action, expected_for_update_ts)?,
None if matches!(pessimistic_action, DoPessimisticCheck) => {
amend_pessimistic_lock(&mut mutation, reader)?;
lock_amended = true;
LockStatus::None
}
None => LockStatus::None,
};
if let LockStatus::Locked(ts) = lock_status {
return Ok((ts, OldValue::Unspecified));
}
// Note that the `prev_write` may have invalid GC fence.
let (mut prev_write, mut prev_write_loaded) = if !mutation.skip_constraint_check() {
(mutation.check_for_newer_version(reader)?, true)
} else {
(None, false)
};
if !lock_amended {
let (reloaded_prev_write, reloaded) =
mutation.check_assertion(reader, &prev_write, prev_write_loaded)?;
if reloaded {
prev_write = reloaded_prev_write;
prev_write_loaded = true;
}
}
if mutation.should_not_write {
...
return Ok((min_commit_ts, OldValue::Unspecified));
}
...
let is_new_lock = !matches!(pessimistic_action, DoPessimisticCheck) || lock_amended;
let final_min_commit_ts = mutation.write_lock(lock_status, txn, is_new_lock)?;
Ok((final_min_commit_ts, ...))
}
actions::check_lock
源码路径:src/storage/txn/actions/prewrite.rs
check_lock
可能返回LockStatus::Locked
,说明锁已经加锁成功了,应该属于接口重复调用,actions::prewrite
函数流程终结,直接返回OK
-
check_lock
可能返回LockStatus::Pessimistic
,说明本事务是 悲观事务,同时查到的锁是悲观锁,符合预期,后期continue
继续actions::prewrite
流程将悲观锁修改为正常锁。 -
check_lock
可能返回ERR:KeyIsLocked
, 说明该锁是其他事务加的,actions::prewrite
函数流程终结 -
check_lock
可能返回ERR:PessimisticLockNotFound
,说明悲观锁未能找到 (start_ts
没有匹配上),或者for_update_ts
不匹配,可能发生了锁丢失问题,actions::prewrite
函数流程终结 -
check_lock
可能返回ERR:LockTypeNotMatch
,事务是悲观事务,但是获取到了普通锁,actions::prewrite
函数流程终结
简化代码如下:
fn check_lock(
&mut self,
lock: Lock,
pessimistic_action: PrewriteRequestPessimisticAction,
expected_for_update_ts: Option<TimeStamp>,
) -> Result<LockStatus> {
if lock.ts != self.txn_props.start_ts {
if matches!(pessimistic_action, DoPessimisticCheck) {
return ErrorInner::PessimisticLockNotFound...
}
return Err(ErrorInner::KeyIsLocked(self.lock_info(lock)?).into());
}
if lock.is_pessimistic_lock() {
if !self.txn_props.is_pessimistic() {
return Err(ErrorInner::LockTypeNotMatch...
}
if let Some(ts) = expected_for_update_ts
&& lock.for_update_ts != ts
{
return Err(ErrorInner::PessimisticLockNotFound...;
}
self.lock_ttl = std::cmp::max(self.lock_ttl, lock.ttl);
self.min_commit_ts = std::cmp::max(self.min_commit_ts, lock.min_commit_ts);
return Ok(LockStatus::Pessimistic(lock.for_update_ts));
}
let min_commit_ts = if lock.use_async_commit {
lock.min_commit_ts
} else {
TimeStamp::zero()
};
Ok(LockStatus::Locked(min_commit_ts))
}
actions::amend_pessimistic_lock
源码路径:src/storage/txn/actions/prewrite.rs
amend_pessimistic_lock
继续查看是否存在补救的可能
- 如果目前为止,
KEY
还没有新的提交记录,那么可以补救,直接对KEY
加普通锁,继续Prewrite
流程即可,同时对KEY
进行check_assertion
,验证其是否符合Priwrite
的参数要求。 - 如果已经有新事务更新了
KEY
,那么只能返回ERR:PessimisticLockNotFound
,终止actions::prewrite
函数
简化代码如下:
fn amend_pessimistic_lock<S: Snapshot>(
mutation: &mut PrewriteMutation<'_>,
reader: &mut SnapshotReader<S>,
) -> Result<()> {
let write = reader.seek_write(&mutation.key, TimeStamp::max())?;
if let Some((commit_ts, write)) = write.as_ref() {
if *commit_ts >= reader.start_ts {
return Err(ErrorInner::PessimisticLockNotFound...
}
}
// Check assertion after amending.
mutation.check_assertion(reader, &write.map(|(w, ts)| (ts, w)), true)?;
Ok(())
}
actions::check_for_newer_version
源码路径:src/storage/txn/actions/prewrite.rs
-
如果找到了本事务的回滚记录,返回写冲突错误,终止
actions::prewrite
流程 -
如果找到了
commit_ts > txn.start_ts
记录,- 乐观事务下,直接返回写冲突错误,终止
actions::prewrite
流程 - 悲观事务性,如果
Assertion
参数带有DoConstraintCheck
返回写冲突错误,终止actions::prewrite
流程
- 乐观事务下,直接返回写冲突错误,终止
-
如果找到了
commit_ts > for_update_ts
, 悲观事务下,也需要返回PessimisticLockNotFound
错误,终止actions::prewrite
流程 (回滚记录除外,详细见下) -
write
记录 符合预期,还需要验证should_not_exist
这个Assertion
,确保事务的正确性,否则会报AlreadyExist
错误,终止actions::prewrite
流程 -
没有找到任何记录,符合预期,返回 OK
对于悲观事务来说,如果找到了 commit_ts > t.for_update_ts
的回滚记录,一般是符合预期的,根据 PR:pingcap/tidb#35525 的描述,出现这种情况的场景如下:
The sequence of events can be like:
- Pessimistic txn-A locks row key using for_update_ts = 1.
- Optimistic txn-B with start_ts = 2 prewrites the index key.
- txn-A finds the locks on index keys and rolls back txn-B, leaving rollback records of ts=2 on index keys.
- txn-A prewrite index keys, and find a write conflict.
个人理解是因为悲观锁是事务中加锁的,因此悲观事务乐观事务混用的时候就比较容易出现出现这种情况,因此可以特意忽略其他事务的回滚记录来减少写冲突重试的成本。
对于乐观事务来说,加锁都是 prewrite
过程中发生的,时间比较短,出现这种回滚记录导致的写冲突的概率比较小。
关于其正确性 https://github.com/tikv/tikv/pull/13426:
The only risk of newer rollback records is that we may succeed in prewriting even if the lock was rolled back and collapsed.
If it's not an async-commit transaction, rollback records are written in two cases:
- The transaction aborts itself during 2PC. The primary lock must not be committed. In this case, the wrongly prewritten lock will be finally rolled back by others.
- A lock is rolled back by ResolveLock. Before that, the primary lock must have been rolled back. So, the new lock will also be rolled back by others.
If it's an async-commit transaction:
- The transaction aborts itself during 2PC. This means some other key must fail to prewrite. In this case, the wrongly prewritten lock will be finally rolled back by others.
- A lock is rolled back by ResolveLock. Before that, some other key must have been rolled back. So, the new lock will also be rolled back by other readers.
- The rollback is written by CheckSecondaryLocks. But CheckSecondaryLocks always writes a protected rollback. The rollback must be detected by our retried prewrite.
下面是我个人的理解:
其实回滚记录的存在就是为了防止网络原因,导致本来已经执行回滚的 key
,又被调用了 Prewrite
:
-
t1
事务二阶段提交开始,唯一索引UK
、普通索引iKey
被执行Prewrite
-
唯一索引
UK
、普通索引iKey
由于写冲突被执行cleanup
、rollback
、resolveLock
、CheckSecondaryLocks
清除了LOCK
,并且被提交了回滚记录rollback_record_uk
,rollback_record_iKey
-
rollback_record_uk
的回滚记录是Protected
模式的 -
由于
TIKV
的优化策略,普通索引的回滚记录rollback_record_iKey
可能是Non-Protected
模式 (一般对于非唯一索引都是这个模式):-
overlap
记录,可能根本不会真正写入到TIKV
存储层(为了减少写入压力,如果恰好有其他事务的提交记录commitTS
和事务t1
的startTS
相同,非保护模式下并不会其修改overlap
属性,所以实际上根本没有写rollback
记录。保护模式下,会修改其overlap
属性为true
) - 也可能被更大
commitTS
的回滚记录collapsed
掉,也就是删除掉(为了能够让mvcc
更快的找到KEY
的最新记录,TIKV
会删除中间大量的回滚记录,只保留最新的rollback
记录)
-
-
那么如果由于网络原因,这个
t1
事务对UK
、iKey
的prewrite
请求又被重复发送到了TIKV
上 -
UK
由于回滚记录是被保护的,因此会被这个判断语句拦截:commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
-
由于
iKey
的t1
回滚记录已被删除,即使有回滚记录,那么其commitTS
也不是t1
的startTS
, 因此通过了判定:commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
-
悲观事务中,唯一索引的话会被
DoConstraintCheck
检验,非唯一索引一般是不会做DoConstraintCheck
检验的。 -
对于
commit_ts > t.for_update_ts
的记录,如果忽略了commitTS
更高的回滚记录的话,实际上对于非唯一索引来说是可以prewrite
成功的 -
但是这也仅仅是二阶段提交的第一阶段成功了,等到真正
commit
的时候,由于唯一索引、rowID
实际上是Protected
模式的回滚,因此commit
的时候是肯定会找到回滚记录的,这时候就会阻止已经回滚的事务不会被再次提交成功。
因此,这个优化可以在避免悲观事务大量的写冲突同时,还可以保障正确性。
但是代价就是可能对于普通索引来说会 prewrite
成功然后再次被回滚,
同时非保护模式回滚、prewrite
忽略回滚这些优化可能增加了逻辑的复杂度
简化代码如下:
fn check_for_newer_version<S: Snapshot>(
&mut self,
reader: &mut SnapshotReader<S>,
) -> Result<Option<(Write, TimeStamp)>> {
let mut seek_ts = TimeStamp::max();
while let Some((commit_ts, write)) = reader.seek_write(&self.key, seek_ts)? {
if commit_ts == self.txn_props.start_ts
&& (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
{
self.write_conflict_error(&write, commit_ts, WriteConflictReason::SelfRolledBack)?;
}
match self.txn_props.kind {
TransactionKind::Optimistic(_) => {
if commit_ts > self.txn_props.start_ts {
self.write_conflict_error(...)?;
}
}
// Note: PessimisticLockNotFound can happen on a non-pessimistically locked key,
// if it is a retrying prewrite request.
TransactionKind::Pessimistic(for_update_ts) => {
if let DoConstraintCheck = self.pessimistic_action {
// Do the same as optimistic transactions if constraint checks are needed.
if commit_ts > self.txn_props.start_ts {
self.write_conflict_error(...)?;
}
}
if commit_ts > for_update_ts {
// Don't treat newer Rollback records as write conflicts. They can cause
// false positive errors because they can be written even if the pessimistic
// lock of the corresponding row key exists.
// Rollback records are only used to prevent retried prewrite from
// succeeding. Even if the Rollback record of the current transaction is
// collapsed by a newer record, it is safe to prewrite this non-pessimistic
// key because either the primary key is rolled back or it's protected
// because it's written by CheckSecondaryLocks.
if write.write_type == WriteType::Rollback {
seek_ts = commit_ts.prev();
continue;
}
return Err(ErrorInner::PessimisticLockNotFound...
}
}
}
check_data_constraint(reader, self.should_not_exist, &write, commit_ts, &self.key)?;
return Ok(Some((write, commit_ts)));
}
Ok(None)
}
actions::check_assertion
源码路径:src/storage/txn/actions/prewrite.rs
-
如果
skip_constraint_check
为false
,实际上write
记录已经加载完毕,那么直接对write
记录进行Assertion
验证即可- 当然也有可能
write
记录并不是PUT
或者DELETE
记录,这个时候需要从底层重新加载write
记录
- 当然也有可能
-
如果
skip_constraint_check
为true
,那么需要开启assertion_level=Strict
模式才会查询write
记录,并且进行Assertion
验证。否则的话,直接返回OK
-
分别对
Assertion::Exist
和Assertion::NotExist
进行验证 -
如果验证失败的话,
skip_constraint_check
为false
,那么可能需要由check_for_newer_version
函数报错,报错信息更准确- 否则的话,返回
AssetionError
错误
-
验证成功,返回
OK
简化代码如下:
fn check_assertion<S: Snapshot>(
&mut self,
reader: &mut SnapshotReader<S>,
write: &Option<(Write, TimeStamp)>,
write_loaded: bool,
) -> Result<(Option<(Write, TimeStamp)>, bool)> {
if self.assertion == Assertion::None
|| self.txn_props.assertion_level == AssertionLevel::Off
{
return Ok((None, false));
}
if self.txn_props.assertion_level != AssertionLevel::Strict && !write_loaded {
return Ok((None, false));
}
let mut reloaded_write = None;
let mut reloaded = false;
// To pass the compiler's lifetime check.
let mut write = write;
if write_loaded
&& write.as_ref().map_or(
false,
|(w, _)| matches!(w.gc_fence, Some(gc_fence_ts) if !gc_fence_ts.is_zero()),
)
{
// The previously-loaded write record has an invalid gc_fence. Regard it as
// none.
write = &None;
}
// Load the most recent version if prev write is not loaded yet, or the prev
// write is not a data version (`Put` or `Delete`)
let need_reload = !write_loaded
|| write.as_ref().map_or(false, |(w, _)| {
w.write_type != WriteType::Put && w.write_type != WriteType::Delete
});
if need_reload {
let reload_ts = write.as_ref().map_or(TimeStamp::max(), |(_, ts)| *ts);
reloaded_write = reader.get_write_with_commit_ts(&self.key, reload_ts)?;
write = &reloaded_write;
reloaded = true;
}
let assertion_err = match (self.assertion, write) {
(Assertion::Exist, None) => {
self.assertion_failed_error(TimeStamp::zero(), TimeStamp::zero())
}
(Assertion::Exist, Some((w, commit_ts))) if w.write_type == WriteType::Delete => {
self.assertion_failed_error(w.start_ts, *commit_ts)
}
(Assertion::NotExist, Some((w, commit_ts))) if w.write_type == WriteType::Put => {
self.assertion_failed_error(w.start_ts, *commit_ts)
}
_ => Ok(()),
};
// Assertion error can be caused by a rollback. So make up a constraint check if
// the check was skipped before.
if assertion_err.is_err() {
if self.skip_constraint_check() {
self.check_for_newer_version(reader)?;
}
let (write, commit_ts) = write
.as_ref()
.map(|(w, ts)| (Some(w), Some(ts)))
.unwrap_or((None, None));
error!("assertion failure"; "assertion" => ?self.assertion, "write" => ?write,
"commit_ts" => commit_ts, "mutation" => ?self);
assertion_err?;
}
Ok((reloaded_write, reloaded))
}
actions::write_lock
源码路径:src/storage/txn/actions/prewrite.rs
write_lock
函数将会构建 LOCK
信息。
特别的,如果是 Async Commit
的 Primary KEY
的话,还需要把 secondary key
的信息写入到 LOCK
。
同时 Async Commit
下,还需要调用 async_commit_timestamps
计算 final_min_commit_ts
,其值是 :
max(concurrency_manager.max_ts(), start_ts, lock.min_commit_ts)
但是如果这个值大于 Prewrite
请求的 max_commit_ts
的话,会返回 CommitTsTooLarge
,回退为普通 2PC
.
简化代码如下:
fn write_lock(
self,
lock_status: LockStatus,
txn: &mut MvccTxn,
is_new_lock: bool,
) -> Result<TimeStamp> {
let mut try_one_pc = self.try_one_pc();
let for_update_ts_to_write = match (self.txn_props.for_update_ts(), lock_status) {
(from_prewrite_req, LockStatus::Pessimistic(from_pessimistic_lock)) => {
std::cmp::max(from_prewrite_req, from_pessimistic_lock)
}
(for_update_ts_from_req, _) => for_update_ts_from_req,
};
let mut lock = Lock::new(
self.lock_type.unwrap(),
self.txn_props.primary.to_vec(),
self.txn_props.start_ts,
self.lock_ttl,
None,
for_update_ts_to_write,
self.txn_props.txn_size,
self.min_commit_ts,
false,
)
.set_txn_source(self.txn_props.txn_source);
...
if let Some(value) = self.value {
if is_short_value(&value) {
// If the value is short, embed it in Lock.
lock.short_value = Some(value);
} else {
// value is long
txn.put_value(self.key.clone(), self.txn_props.start_ts, value);
}
}
if let Some(secondary_keys) = self.secondary_keys {
lock.use_async_commit = true;
lock.secondaries = secondary_keys.to_owned();
}
let final_min_commit_ts = if lock.use_async_commit || try_one_pc {
let res = async_commit_timestamps(
&self.key,
&mut lock,
self.txn_props.start_ts,
self.txn_props.for_update_ts(),
self.txn_props.max_commit_ts(),
txn,
);
if let Err(Error(box ErrorInner::CommitTsTooLarge { .. })) = &res {
try_one_pc = false;
lock.use_async_commit = false;
lock.secondaries = Vec::new();
}
res
} else {
Ok(TimeStamp::zero())
};
if try_one_pc {
txn.put_locks_for_1pc(self.key, lock, lock_status.has_pessimistic_lock());
} else {
txn.put_lock(self.key, &lock, is_new_lock);
}
final_min_commit_ts
}
fn async_commit_timestamps(
key: &Key,
lock: &mut Lock,
start_ts: TimeStamp,
for_update_ts: TimeStamp,
max_commit_ts: TimeStamp,
txn: &mut MvccTxn,
) -> Result<TimeStamp> {
// This operation should not block because the latch makes sure only one thread
// is operating on this key.
let key_guard = ::futures_executor::block_on(txn.concurrency_manager.lock_key(key));
let final_min_commit_ts = key_guard.with_lock(|l| {
let max_ts = txn.concurrency_manager.max_ts();
fail_point!("before-set-lock-in-memory");
let min_commit_ts = cmp::max(cmp::max(max_ts, start_ts), for_update_ts).next();
let min_commit_ts = cmp::max(lock.min_commit_ts, min_commit_ts);
if (!max_commit_ts.is_zero() && min_commit_ts > max_commit_ts) {
return Err(ErrorInner::CommitTsTooLarge...
}
lock.min_commit_ts = min_commit_ts;
*l = Some(lock.clone());
Ok(min_commit_ts)
})?;
txn.guards.push(key_guard);
Ok(final_min_commit_ts)
}