2
2
1
0
专栏/.../

TIKV 源码学习笔记--分布式事务接口 Prewrite

 ylldty  发表于  2024-03-03

前言

目前对 TIKV 分布式事务接口(Prewrite/Commit/Rollback/CheckTxnStatus/ResolveLocks 等等), 原理解读的文章比较丰富,例如 TIKV 分布式事务--乐观事务 2PC 概览

如果你看了这些文章后,仍然不满足,还是想看一遍源码了解更多细节,那么本篇文章适合你。本篇文章将会进行一次简化版的源码走读,看看 TIKV 是如何从代码层面实现分布式事务,处理各种细节问题的。

同时期望对想要对 TIKV 贡献代码,但是刚刚接触 TIKV 庞大代码无所适从的同学有所帮助。

本文中一些例子涉及的建表语句如下:

CREATE TABLE `MANAGERS_UNIQUE` (
  `MANAGER_ID` int(11) NOT NULL,
  `FIRST_NAME` varchar(45) NOT NULL,
  `LAST_NAME` varchar(45) NOT NULL,
  `LEVER` int(11) DEFAULT NULL,
   PRIMARY KEY (`MANAGER_ID`) 
   UNIQUE KEY `FIRST` (`FIRST_NAME`),
   KEY `LEVEL` (`LEVEL`)
)

PreWrite 接口

接口调用场景举例

PreWrite 接口是 TIKV 分布式事务二阶段提交必然调用的接口,本小节将会以 INSERT/DELETE/UPDATE/INSERT 实际场景为例,查看 PreWrite 接口调用的参数,给大家一个大体的印象。

重要参数

PreWrite 的重要参数有:

  • Mutation 类型。

    • 目前定义的有 PutDelLockRollbackPessimisticLock 等等
  • Mutation KEY-VALUE。需要进行加锁的 KEY-VALUE

  • Mutation assertion。需要进行验证的选项,例如:

    • NotExist (需要验证加锁的 KEY 不存在 write 提交记录)
    • Exist (验证加锁的 KEY 一定存在 write 记录)
    • DoPessimisticCheck (悲观事务中,需要验证加锁的 KEY 已经有了悲观锁,防止锁丢失现象)
  • Primary KEY。整个分布式事务的 Primary KEY

    • 这个 Primary KEY 至关重要,Primary 被二阶段提交成功则代表整个分布式事务提交成功。整个分布式事务的提交可能涉及多个 RegionPrewrite 的接口调用。
  • try_one_pc 。尝试一次 RPC 完成二阶段提交。

    • 如果整个分布式事务只调用一次 Prewrite 即可完成,那么会设置该参数,将 Prewrite + Commit 两次接口调用变成一次接口调用
  • for_update_ts 。悲观事务+公平锁所需。

    • 悲观事务中,如果未采用公平锁,那么直接验证 lock.ts 和事务的 start_ts 即可,不相同说明该锁不是本事务的,返回错误。

    • 如果采用了公平锁,那么就会出现以下 case (多谢 TIKV 完整的注释):

      • 事务 t1 开启公平锁的优化,并且调用了 PessimisticLockRequest 接口
      • 由于某些原因发生了锁丢失现象,lock 记录不见了
      • 事务 t2KEY 进行了完整的二阶段提交,成功更改了数据
      • 事务 t1 由于某些原因 (可能是网络原因) 重复调用了 PessimisticLockRequest 接口,仍然开启了公平锁的优化。
      • PessimisticLockRequest 虽然发现了新的 write 记录,正常情况下需要发送 writeConflict 错误的。但是由于公平锁的优化,悲观锁仍然加锁成功,lock.ts 也就仍然是 t1.start_ts 。不同的是 lock.for_update_ts 和原 PessimisticLockRequest 请求的不一样。
      • 如果客户端没有去验证 PessimisticLockRequest 返回 Responsefor_update_ts ,直接发送 Prewrite 请求的话,会导致 t2 请求的提交记录被 t1 覆盖。
      • 因此 Prewrite 请求需要在公平锁优化命中的情况下,再次检查 for_update_ts 的一致性。

INSERT

SQL 为:

mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)

mysql> INSERT INTO MANAGERS_UNIQUE(MANAGER_ID,FIRST_NAME, LAST_NAME, LEVEL) VALUES (14276,'Brad10','Craven10',10);
Query OK, 1 row affected (0.01 sec)

mysql> commit;
Query OK, 0 rows affected (0.01 sec)

当用户调用 Commit 后,TIDB 开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite 接口:

sched_txn_command kv::command::pessimistic_prewrite
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000 
     assertion:NotExist, DoPessimisticCheck)
]) 

primary(74800000000000006A5F698000000000000002014272616431300000FD) 
secondary_len(Some(2))

@ start_ts:448078770886148097 lock_ttl:23155 txn_size:1 min_commit_ts:448078776168087554 max_commit_ts:448078776694210561 
try_one_pc:false assertion_level:Fast 

(for_update_ts constraints: []) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }





sched_txn_command kv::command::pessimistic_prewrite 
mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD value:007D0180000100000001010000 
     assertion:NotExist, SkipPessimisticCheck), 
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000B0013001400313432373642726164313043726176656E31300A 
     assertion:NotExist, DoPessimisticCheck)
]) 

primary(74800000000000006A5F698000000000000002014272616431300000FD) 
secondary_len(Some(0))

@ 448078770886148097 23155 2 448078776168087554 448078776694210561 
try_one_pc:false assertion_level:Fast  

(for_update_ts constraints: []) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }


通过解析可以知道,第一次 Prewrite 接口的 KEYBrad10 这个唯一索引,其 primary key 是它自身:

mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000 
     assertion:NotExist, DoPessimisticCheck)
]) 
primary(74800000000000006A5F698000000000000002014272616431300000FD) 


"7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC"
└─## decode hex key
  └─"t\200\000\000\000\000\000\000\377j_i\200\000\000\000\000\377\000\000\002\001Brad\37710\000\000\375\000\000\000\374"
    ├─## decode mvcc key
    │ └─"t\200\000\000\000\000\000\000j_i\200\000\000\000\000\000\000\002\001Brad10\000\000\375"
    │   ├─## table prefix
    │   │ └─table: 106
    │   └─## table index key
    │     ├─table: 106
    │     ├─index: 2
    │     └─"\001Brad10\000\000\375"
    │       └─## decode index values
    │         └─kind: Bytes, value: Brad10
    └─## table prefix
      └─table: 255

后面一个 Prewrite 接口的 KEY 是普通索引 LEVEL_RowID 的值 10_14276,和 RowID 的值 14276,其 primary key 还是Brad10 这个唯一索引

mutations([
(Put key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD value:007D0180000100000001010000 
     assertion:NotExist, SkipPessimisticCheck), 
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000B0013001400313432373642726164313043726176656E31300A 
     assertion:NotExist, DoPessimisticCheck)
]) 
primary(74800000000000006A5F698000000000000002014272616431300000FD) 


"7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD"
└─## decode hex key
  └─"t\200\000\000\000\000\000\000\377j_i\200\000\000\000\000\377\000\000\003\003\200\000\000\000\377\000\000\000\n\001142\37776\000\000\000\374\000\000\375"
    ├─## decode mvcc key
    │ └─"t\200\000\000\000\000\000\000j_i\200\000\000\000\000\000\000\003\003\200\000\000\000\000\000\000\n\00114276\000\000\000\374"
    │   ├─## table prefix
    │   │ └─table: 106
    │   └─## table index key
    │     ├─table: 106
    │     ├─index: 3
    │     └─"\003\200\000\000\000\000\000\000\n\00114276\000\000\000\374"
    │       └─## decode index values
    │         ├─kind: Int64, value: 10
    │         └─kind: Bytes, value: 14276
    └─## table prefix
      └─table: 255

"7480000000000000FF6A5F720131343237FF36000000FC000000FC"
└─## decode hex key
  └─"t\200\000\000\000\000\000\000\377j_r\0011427\3776\000\000\000\374\000\000\000\374"
    ├─## decode mvcc key
    │ └─"t\200\000\000\000\000\000\000j_r\00114276\000\000\000\374"
    │   ├─## table prefix
    │   │ └─table: 106
    │   └─## table row key
    │     ├─table: 106
    │     └─"\00114276\000\000\000\374"
    │       └─## decode index values
    │         └─kind: Bytes, value: 14276
    └─## table prefix
      └─table: 255

由于悲观事务在 INSERT 的时候,已经调用了 PessimisticLockRequest 接口,对唯一索引和 RowID 进行了加锁,因此 Prewrite 的时候,需要进行 DoPessimisticCheck ,防止发生了锁丢失问题。普通索引 LEVEL 并没有加悲观锁,因此是 SkipPessimisticCheck

同时,需要确保 唯一索引和 RowID 的唯一性,因此加入了 NotExistassertion。由于普通索引的格式为 indexValue_RowID,带有 RowID,因此也是独一无二的,也需要 NotExistassertion

INSERT 并没有命中公平锁,因此不需要进行 for_update_ts 的一致性验证。

INSERTPessimisticLockRequest 接口参数如下,其中 allow_lock_with_conflict 就是开启公平锁的参数:

image.png

DELETE

SQL 为:

mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)

mysql> DELETE from MANAGERS_UNIQUE where FIRST_NAME='Brad10';

mysql> commit;
Query OK, 0 rows affected (0.01 sec)

当用户调用 Commit 后,TIDB 开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite 接口:

sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000030380000000FF0000000A01313432FF3736000000FC0000FD 
        assertion:Exist, SkipPessimisticCheck), 
(Delete key:7480000000000000FF6A5F720131343237FF36000000FC000000FC 
        assertion:Exist, DoPessimisticCheck)
]) 

primary(74800000000000006A5F698000000000000002014272616431300000FD) 
secondary_len(Some(0))

@ 448099356132769793 40939 2 448099366081134600 448099366602539009 
try_one_pc:false assertion_level:Fast 

(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint index: 1 expected_for_update_ts: 448099356132769793]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }






sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC 
        assertion:Exist, DoPessimisticCheck)]) 

primary(74800000000000006A5F698000000000000002014272616431300000FD) 
secondary_len(Some(2))

@ 448099356132769793 40939 1 448099366081134600 448099366602539009 
false Fast 

(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099356132769793]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }

可以看到 DELETEINSERT 非常类似,仍然是两次 Prewrite 请求,仍然还是以唯一索引 ‘Brad10’ 为 Primary Key

稍微有点不同的是,这次 assertionNotExist 变成了 Exist。删除数据当然前提条件数据是存在的。

同时由于 DELETEPessimisticLockRequest 触发了公平锁功能,因此需要加入 for_update_ts 的验证。

DELETE 语句 PessimisticLockRequest 的参数如下,其中 allow_lock_with_conflict 就是开启公平锁的参数:

image.png

UPDATE

SQL 为:

mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)

mysql> UPDATE MANAGERS_UNIQUE SET FIRST_NAME="Brad9" where FIRST_NAME='Brad10';

mysql> commit;
Query OK, 0 rows affected (0.01 sec)

当用户调用 Commit 后,TIDB 开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite 接口:

sched_txn_command kv::command::pessimistic_prewrite mutations([
(Delete key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC 
        assertion:Exist, DoPessimisticCheck), 
(Put key:7480000000000000FF6A5F698000000000FF0000020142726164FF39000000FC000000FC value:007D017F000A013134323736000000FC8000020000000102010002000000 
     assertion:NotExist, DoPessimisticCheck)
]) 

primary(74800000000000006A5F698000000000000002014272616431300000FD) 
secondary_len(Some(2))


@ 448099651396042753 44613 2 448099662328233986 448099662829191169 
false Fast 

(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099651396042753]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }







sched_txn_command kv::command::pessimistic_prewrite mutations([
(Put key:7480000000000000FF6A5F720131343237FF36000000FC000000FC value:8000040000000102030505000A00120013003134323736427261643943726176656E31300A assertion:Exist, DoPessimisticCheck)
]) 

primary(74800000000000006A5F698000000000000002014272616431300000FD) 
secondary_len(Some(0))

@ 448099651396042753 44613 1 448099662328233986 448099662829191169 
false Fast 

(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099651396042753]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }

UPDATE 也涉及到了 3 个 KEY,分别是 RowID、旧的唯一索引 Value、新的唯一索引 Value

第一个请求中包含两个 KEY,是唯一索引新旧的 VALUE,操作类型是 Delete 旧的唯一索引,校验其存在。同时 PUT 新的唯一索引,校验其不存在。

第二个请求是 RowID 这个 KEY

由于 UPDATE 会触发公平锁优化,因此两个 Prewrite 都含有 for_update_ts 的一致性校验。

UPDATE 语句 PessimisticLockRequest 的参数如下,其中 allow_lock_with_conflict 就是开启公平锁的参数:

image.png

SELECT FOR UPDATE

SQL 为:

mysql> BEGIN PESSIMISTIC;
Query OK, 0 rows affected (0.00 sec)

mysql> SELECT * FROM MANAGERS_UNIQUE WHERE FIRST_NAME='Brad10' FOR UPDATE;
+------------+------------+-----------+-------+
| MANAGER_ID | FIRST_NAME | LAST_NAME | LEVEL |
+------------+------------+-----------+-------+
| 14276      | Brad10     | Craven10  |    10 |
+------------+------------+-----------+-------+
1 row in set (0.02 sec)

mysql> commit;
Query OK, 0 rows affected (0.01 sec)

当用户调用 Commit 后,TIDB 开始进行二阶段提交。通过打印日志发现 TIKV 实际上收到了两次 PreWrite 接口:

sched_txn_command kv::command::pessimistic_prewrite mutations([
(Lock key:7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC 
      assertion:None, DoPessimisticCheck)
]) 

primary(74800000000000006A5F698000000000000002014272616431300000FD) 
secondary_len(Some(1))

@ 448099845197266945 44379 1 448099856050028546 448099856569073665 
false Fast 


(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099845197266945]) | region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }





sched_txn_command kv::command::pessimistic_prewrite mutations([
(Lock key:7480000000000000FF6A5F720131343237FF36000000FC000000FC 
      assertion:None, DoPessimisticCheck)
]) 

primary(74800000000000006A5F698000000000000002014272616431300000FD) 
secondary_len(Some(0))


@ 448099845197266945 44379 1 448099856050028546 448099856569073665 
false Fast 


(for_update_ts constraints: [PrewriteRequestForUpdateTsConstraint expected_for_update_ts: 448099845197266945]) | region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295 source_stmt { connection_id: 3382706204 }

这次 Prewritemutation 类型为 LOCK 类型,分别对 RowID 和唯一索引 'Brad10' 进行操作,主键仍然是唯一索引值 Brad10

其他类似,不再赘述。

SELECT 语句 PessimisticLockRequest 的参数如下,其中 allow_lock_with_conflict 就是开启公平锁的参数:

image.png

源码快读

大概流程

Prewrite
    for range Prewrite_Action:
        Prewrite_Action 未返回错误:
            更新 final_min_commit_ts
        Prewrite_Action 返回错误:
            WriteConflict && commit_ts > start_ts: 
                尝试继续查询本事务的提交记录:get_txn_commit_record:
                             -> 本事务已经被提交: write.start_ts = t1.ts1
                        ------------------------------------------------> return ok,Prewrite 结束
                    Others:
                             -> 事务写冲突:未找到 write.start_ts = t1.ts1
                        ------------------------------------------------> return 写冲突,Prewrite 结束

            KeyIsLocked:
                尝试获取本事务的提交记录:get_txn_commit_record:
                             -> 本事务已经被提交: (lock.ts != start_ts && write.start_ts = t1.ts1)
                        ------------------------------------------------> continue
                    Others:
                             -> 锁冲突:未找到记录 write.start_ts = t1.ts1
                        ------------------------------------------------> 更新 locks 数组
            Others: 
                --------------------------------------------------------> return Err,Prewrite 结束

         -> 循环完毕,返回结果      
    --------------------------------------------------------------------> return (locks, final_min_commit_ts)



Prewrite_Action:
    load_lock
        有锁:
            check_lock 
                -> key 被其他事务先锁了:lock.ts != start_ts
            ----------------------------------------------------> return ErrorInner::KeyIsLocked
                -> 重复 Prewrite 命令: lock.ts == start_ts
            ----------------------------------------------------> return LockStatus::Locked
        没有锁:
            check_for_newer_version :
                            -> 标准写冲突: commit_ts > start_ts 
                    --------------------------------------------> return WriteConflict::Optimistic  
                            -> 是回滚记录:  commit_ts == start_ts && (write_type == WriteType::Rollback || write.has_overlapped_rollback)
                    --------------------------------------------> return WriteConflict:SelfRolledBack 
                   
                            -> 有一个旧记录:commit_ts < start_ts
                    -------------------------------------------->
                                       write_lock: 写 lock,返回最新的 min_commit_ts
                        ----------------------------------------> return (final_min_commit_ts)
                            -> none:没找到记录
                    -------------------------------------------->
                                       write_lock: 写 lock,返回最新的 min_commit_ts
                        ----------------------------------------> return (final_min_commit_ts)




Prewrite 接口场景:
    预期操作:  lock_cf 没有发现锁,write_cf 也没有新的 recorder, ------------------------------------> ok, 在 lock_cf 上写 lock 锁住 key
    预期异常:  
              lock_cf 发现了其他事务的锁,write_cf 发现本事务提交的记录 -------------------------------> ok, 返回 commit_ts
              lock_cf 发现了其他事务的锁,write_cf 发现本事务回滚的记录 -------------------------------> 返回 locks
              lock_cf 发现了其他事务的锁,write_cf 未发现任何记录 ------------------------------------> 返回 locks
              lock_cf 发现了本事务的锁,------------------------------------------------------------> 返回 LockStatus::Locked
              lock_cf 未发现任何锁,write_cf 发现新的事务记录,也发现本事务提交的记录 -------------------> ok, 返回 commit_ts
              lock_cf 未发现任何锁,write_cf 发现新的事务记录,也发现本事务回滚的记录 -------------------> 返回写冲突的 commit_ts
              lock_cf 未发现任何锁,write_cf 发现新的事务记录,未发现本事务的记录 ----------------------> 返回写冲突的 commit_ts
              lock_cf 未发现任何锁,write_cf 发现事务回滚记录 ---------------------------------------> 返回写冲突的 commit_ts

源码简读

commands::Prewrite

源码路径:src/storage/txn/commands/prewrite.rs

commands::Prewrite 的简化代码如下,源码逻辑中还涉及到了 Async Commit / 1PC 的判断逻辑,还有锁冲突、写冲突等等错误处理。如果对这些概念不太了解,可以先看一下: TIKV 分布式事务--Prewrite 接口乐观事务详解

  • 首先对于有 secondary_keys 的请求,优先尝试进行 Async Commit 的优化。1PC 的优化是否开启需要客户端来进行判断,利用参数传递给 TIKV Prewrite 接口。

  • 对于每个 Prewritemutation,都调用一次 actions::prewrite 函数进行处理。

    • 在调用之前,需要查看当前 mutationKEY 是否是 Primary KEY,如果是 Primary KEY 的话,Async Commit 需要在对 Primary Key 加锁的时候,在 Lock 上面添加 secondary key 的信息
  • actions::prewrite 函数如果返回 OK 结果,那么就更新 final_min_commit_ts ,然后继续 FOR 循环对下一个 mutation 调用 actions::prewrite 函数

  • actions::prewrite 函数返回异常

    • 如果是 PessimisticLockNotFound 错误,那么说明悲观事务过程中加的悲观锁发生了丢失现象,需要使用 check_committed_record_on_err 函数查看一下,是否 write 记录里面已经有本事务的提交成功记录了。

      • 如果确实本事务已经提交,那么 Prewrite 流程就可以提前结束了,返回 OK
      • 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么还是终止 Prewrite 流程,向客户端返回 PessimisticLockNotFound 错误
    • 如果是写冲突 WriteConflict ,需要使用 check_committed_record_on_err 函数查看一下,是否 write 记录里面已经有本事务的提交成功记录了。

      • 如果确实本事务已经提交,那么 Prewrite 流程就可以提前结束了,返回 OK
      • 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么还是终止 Prewrite 流程,向客户端返回 WriteConflict 错误
    • 如果是锁冲突 KeyIsLocked 错误,需要使用 check_committed_record_on_err 函数查看一下,是否 write 记录里面已经有本事务的提交成功记录了。

      • 如果确实本事务已经提交,那么 Prewrite 流程就可以提前结束了,返回 OK
      • 如果没有找到本事务的提交记录,或者找到了回滚的记录,这次并不会终止 Prewrite 流程,而是会记录下冲突的锁,更新 locks 数组,继续调用下一个 mutationactions::prewrite 函数
    • 如果尝试使用了 Async Commit/1PC 的优化,但是 actions::prewrite 函数未能返回 min_commit_ts,那么将会降级 Async Commit 优化,采用传统的 Prewrite 2PC 方式,将 Prewrite 接口 Response 返回值 final_min_commit_ts 重置为 0,这样客户端获取 final_min_commit_ts 后就不会采用 Async Commit/1PC 的方式提交事务

    • 如果尝试使用了 Async Commit/1PC 的优化,但是 actions::prewrite 函数返回了 CommitTsTooLarge 错误,那么很有可能当前事务其实已经提交过了,本次提交是重复提交。还是使用 check_committed_record_on_err 函数查看一下

      • 如果确实本事务已经提交,那么 Prewrite 流程就可以提前结束了,返回 OK
      • 如果没有找到本事务的提交记录,或者找到了回滚的记录,那么继续降级 Async Commit 优化,采用传统的 Prewrite 2PC 方式。

fn prewrite(
        &mut self,
        txn: &mut MvccTxn,
        reader: &mut SnapshotReader<impl Snapshot>,
        extra_op: ExtraOp,
    ) -> Result<(Vec<std::result::Result<(), StorageError>>, TimeStamp)> {
        let commit_kind = match (&self.secondary_keys, self.try_one_pc) {
            (_, true) => CommitKind::OnePc(self.max_commit_ts),
            (&Some(_), false) => CommitKind::Async(self.max_commit_ts),
            (&None, false) => CommitKind::TwoPc,
        }; 

        ...

        let async_commit_pk = self
            .secondary_keys
            .as_ref()
            .filter(|keys| !keys.is_empty())
            .map(|_| Key::from_raw(&self.primary));
        let mut async_commit_pk = async_commit_pk.as_ref();

        let mut final_min_commit_ts = TimeStamp::zero();
        let mut locks = Vec::new();

        fn check_committed_record_on_err(
            prewrite_result: MvccResult<(TimeStamp, OldValue)>,
            txn: &mut MvccTxn,
            reader: &mut SnapshotReader<impl Snapshot>,
            key: &Key,
        ) -> Result<(Vec<std::result::Result<(), StorageError>>, TimeStamp)> {
            match reader.get_txn_commit_record(key)? {
                TxnCommitRecord::SingleRecord { commit_ts, write }
                    if write.write_type != WriteType::Rollback =>
                {
                    txn.clear();
                    Ok((vec![], commit_ts))
                }
                _ => Err(prewrite_result.unwrap_err().into()),
            }
        }

        for m in mem::take(&mut self.mutations) {
            ...
            
            let mut secondaries = &self.secondary_keys.as_ref().map(|_| vec![]);
            if Some(m.key()) == async_commit_pk {
                secondaries = &self.secondary_keys;
            }

            let need_min_commit_ts = secondaries.is_some() || self.try_one_pc;
            let prewrite_result = prewrite(
                txn,
                reader,
                &props,
                m,
                secondaries,
                pessimistic_action,
                expected_for_update_ts,
            );
            match prewrite_result {
                Ok((ts, old_value)) if !(need_min_commit_ts && ts.is_zero()) => {
                    if need_min_commit_ts && final_min_commit_ts < ts {
                        final_min_commit_ts = ts;
                    }
                    ...
                }
                Ok((..)) => {
                    props.commit_kind = CommitKind::TwoPc;
                    ...
                    fallback_1pc_locks(txn);
                    final_min_commit_ts = TimeStamp::zero();
                }
                Err(MvccError(box MvccErrorInner::WriteConflict {
                    start_ts,
                    conflict_commit_ts,
                    ..
                })) if conflict_commit_ts > start_ts => {
                    return check_committed_record_on_err(prewrite_result, txn, reader, &key);
                }
                Err(MvccError(box MvccErrorInner::PessimisticLockNotFound { .. })) => {
                    return check_committed_record_on_err(prewrite_result, txn, reader, &key);
                }
                Err(MvccError(box MvccErrorInner::CommitTsTooLarge { .. })) => {
                    // The prewrite might be a retry and the record may have been committed.
                    // So, we need to prevent the fallback to avoid duplicate commits.
                    if let Ok(res) =
                        check_committed_record_on_err(prewrite_result, txn, reader, &key)
                    {
                        return Ok(res);
                    }
                    // fallback to not using async commit or 1pc
                    props.commit_kind = CommitKind::TwoPc;
                    ...
                    fallback_1pc_locks(txn);
                    final_min_commit_ts = TimeStamp::zero();
                }
                Err(MvccError(box MvccErrorInner::KeyIsLocked { .. })) => {
                    match check_committed_record_on_err(prewrite_result, txn, reader, &key) {
                        Ok(res) => return Ok(res),
                        Err(e) => locks.push(Err(e.into())),
                    }
                }
                Err(e @ MvccError(box MvccErrorInner::AssertionFailed { .. })) => {
                    if assertion_failure.is_none() {
                        assertion_failure = Some(e);
                    }
                }
                Err(e) => return Err(Error::from(e)),
            }
        }

        if let Some(e) = assertion_failure {
            return Err(Error::from(e));
        }
        Ok((locks, final_min_commit_ts))
    }

get_txn_commit_record:扫描从 max_ts 到 t1.start_ts 之间 key 的 write record 来判断 t1 状态
    return TxnCommitRecord::SingleRecord:                 找到了 write.start_ts = t1.ts1 的 WriteRecord,这个记录可能 type 是 rollback,也可能是正常的记录
    return TxnCommitRecord::OverlappedRollback:           找到了 t1.start_ts == t3.commit_ts, 而且 has_overlapped_write 是 true,实际上就是回滚记录
    return TxnCommitRecord::None(Some(OverlappedWrite)):  找到了 t1.start_ts == t3.commit_ts, 而且 has_overlapped_write 为 false。实际上该记录和 rollback 记录重叠了,需要设置 has_overlapped_write
    return TxnCommitRecord::None:                         没找到 t1 的 commit 记录

commands::write_result

commands::Prewrite 调用结束后,TIKV 将会调用 write_result

  • 如果成功触发了 1PC 功能,那么直接将事务提交,不需要再写 LOCK
  • 如果Prewrite 流程过程中,没有锁冲突或者写冲突,那么直接调用 txn.into_modifies() 函数将数据通过 RAFT 协议更新到 TIKV 集群
  • 如果遇到了锁冲突,那么返回 result,向客户端返回 locks 数组
fn write_result(
        self,
        locks: Vec<std::result::Result<(), StorageError>>,
        mut txn: MvccTxn,
        final_min_commit_ts: TimeStamp,
        rows: usize,
        async_apply_prewrite: bool,
    ) -> WriteResult {
        let async_commit_ts = if self.secondary_keys.is_some() {
            final_min_commit_ts
        } else {
            TimeStamp::zero()
        };

        let mut result = if locks.is_empty() {
            let (one_pc_commit_ts, released_locks) =
                one_pc_commit(self.try_one_pc, &mut txn, final_min_commit_ts);

            ...
            let mut to_be_write = WriteData::new(txn.into_modifies(), extra);
            to_be_write.set_disk_full_opt(self.ctx.get_disk_full_opt());

            WriteResult {...
            }
        } else {
            ...
            WriteResult {...
        };

        // Currently if `try_one_pc` is set, it must have succeeded here.
        if (!async_commit_ts.is_zero() || self.try_one_pc) && async_apply_prewrite {
            result.response_policy = ResponsePolicy::OnCommitted
        }

        result
    }
}

commands::handle_1pc_locks

如果 1PC 参数为 true,并且已经通过 Prewrite 各种 Lockwrite 检测,那么会调用 handle_1pc_locks 直接进行二阶段的 commit 流程,跳过写 LOCK 的流程,直接将 KEY-VALUE 记录写入到 write CF 去,同时删除之前悲观事务过程中所写的悲观锁,完成提交。

fn handle_1pc_locks(txn: &mut MvccTxn, commit_ts: TimeStamp) -> ReleasedLocks {
    let mut released_locks = ReleasedLocks::new();

    for (key, lock, delete_pessimistic_lock) in std::mem::take(&mut txn.locks_for_1pc) {
        let write = Write::new(
            WriteType::from_lock_type(lock.lock_type).unwrap(),
            txn.start_ts,
            lock.short_value,
        )
        .set_last_change(lock.last_change)
        .set_txn_source(lock.txn_source);
        // Transactions committed with 1PC should be impossible to overwrite rollback
        // records.
        txn.put_write(key.clone(), commit_ts, write.as_ref().to_bytes());
        if delete_pessimistic_lock {
            released_locks.push(txn.unlock_key(key, true, commit_ts));
        }
    }

    released_locks
}

actions::Prewrite

源码路径:src/storage/txn/actions/prewrite.rs

actions::prewrite 函数也是核心函数,主要流程是:

  • load_lock 函数来检查 KEY 是否已经被加锁

    • 如果存在锁的话, check_lock 将会进一步检查锁的状态

      • check_lock 可能返回 LockStatus::Locked ,说明锁已经加锁成功了,应该属于接口重复调用,actions::prewrite 函数流程终结,直接返回 OK
      • check_lock 可能返回 LockStatus::Pessimistic,说明本事务是 悲观事务,同时查到的锁是悲观锁,符合预期,后期 continue 继续 actions::prewrite 流程将悲观锁修改为正常锁。
      • check_lock 可能返回 ERR:KeyIsLocked , 说明该锁是其他事务加的, actions::prewrite 函数流程终结
      • check_lock 可能返回 ERR:PessimisticLockNotFound,说明悲观锁未能找到 (start_ts 没有匹配上),或者 for_update_ts 不匹配,可能发生了锁丢失问题,actions::prewrite 函数流程终结
      • check_lock 可能返回 ERR:LockTypeNotMatch,事务是悲观事务,但是获取到了普通锁,actions::prewrite 函数流程终结
    • 如果不存在锁的话

      • 如果是悲观锁,那么理论上 load_lock 查询到才符合预期,此时是非预期情况,可能发生了锁丢失问题,调用 amend_pessimistic_lock 继续查看是否存在补救的可能

        • 如果目前为止,KEY 还没有新的提交记录,那么可以补救,直接对 KEY 加普通锁,继续 Prewrite 流程即可,同时对 KEY 进行 check_assertion,验证其是否符合 Priwrite 的参数要求。
        • 如果已经有新事务更新了 KEY,那么只能返回 ERR:PessimisticLockNotFound,终止 actions::prewrite 函数
      • 如果不是悲观锁,符合预期,那么继续 actions::prewrite 流程

  • 如果 skip_constraint_checktrue,跳过一致性校验

  • 如果 skip_constraint_checkfalse,那么将会调用 check_for_newer_version 查看 write 记录,进行进一步检查

    • 如果找到了本事务的回滚记录,commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback),返回写冲突错误,终止actions::prewrite 流程

    • 如果找到了 commit_ts > txn.start_ts 记录,

      • 乐观事务下,直接返回写冲突错误,终止actions::prewrite 流程
      • 悲观事务性,如果 Assertion 参数带有 DoConstraintCheck返回写冲突错误,终止actions::prewrite 流程
    • 如果找到了 commit_ts > for_update_ts , 悲观事务下,也需要返回 PessimisticLockNotFound 错误,终止actions::prewrite 流程 (回滚记录除外,目前对这个场景不太了解)

    • write 记录 符合预期,还需要验证 should_not_exist 这个 Assertion,确保事务的正确性,否则会报 AlreadyExist 错误,终止 actions::prewrite 流程

    • 没有找到任何记录,符合预期,返回 OK

  • 接下来还要进行 check_assertion 流程

    • 如果悲观锁是被上述流程补救 (amend_pessimistic_lock) 后加的,那么实际上已经进行了 check_assertion,可以跳过

    • 如果 skip_constraint_checkfalse,实际上 write 记录已经加载完毕,那么直接对 write 记录进行 Assertion 验证即可

      • 当然也有可能 write 记录并不是 PUT 或者 DELETE 记录,这个时候需要从底层重新加载 write 记录
    • 如果 skip_constraint_checktrue,那么需要开启 assertion_level=Strict 模式才会查询 write 记录,并且进行 Assertion 验证。否则的话,直接返回 OK

  • 如果当前 mutation 的类型是 Mutation::CheckNotExists 的话,目前已经检查了 LOCKwrite 记录和 Assertion 验证,符合请求的预期,不需要加锁,直接返回 OK 即可,终止 actions::prewrite 流程

  • write_lock 函数将会构建 LOCK 信息。特别的,如果是 Async CommitPrimary KEY 的话,还需要把 secondary key 的信息写入到 LOCK

pub fn prewrite<S: Snapshot>(
    txn: &mut MvccTxn,
    reader: &mut SnapshotReader<S>,
    txn_props: &TransactionProperties<'_>,
    mutation: Mutation,
    secondary_keys: &Option<Vec<Vec<u8>>>,
    pessimistic_action: PrewriteRequestPessimisticAction,
    expected_for_update_ts: Option<TimeStamp>,
) -> Result<(TimeStamp, OldValue)> {
    let mut mutation =
        PrewriteMutation::from_mutation(mutation, secondary_keys, pessimistic_action, txn_props)?;

    let mut lock_amended = false;

    let lock_status = match reader.load_lock(&mutation.key)? {
        Some(lock) => mutation.check_lock(lock, pessimistic_action, expected_for_update_ts)?,
        None if matches!(pessimistic_action, DoPessimisticCheck) => {
            amend_pessimistic_lock(&mut mutation, reader)?;
            lock_amended = true;
            LockStatus::None
        }
        None => LockStatus::None,
    };

    if let LockStatus::Locked(ts) = lock_status {
        return Ok((ts, OldValue::Unspecified));
    }

    // Note that the `prev_write` may have invalid GC fence.
    let (mut prev_write, mut prev_write_loaded) = if !mutation.skip_constraint_check() {
        (mutation.check_for_newer_version(reader)?, true)
    } else {
        (None, false)
    };

    
    if !lock_amended {
        let (reloaded_prev_write, reloaded) =
            mutation.check_assertion(reader, &prev_write, prev_write_loaded)?;
        if reloaded {
            prev_write = reloaded_prev_write;
            prev_write_loaded = true;
        }
    }

   
    if mutation.should_not_write {
        ...
        return Ok((min_commit_ts, OldValue::Unspecified));
    }

    ...

    let is_new_lock = !matches!(pessimistic_action, DoPessimisticCheck) || lock_amended;
    let final_min_commit_ts = mutation.write_lock(lock_status, txn, is_new_lock)?;

    Ok((final_min_commit_ts, ...))
}

actions::check_lock

源码路径:src/storage/txn/actions/prewrite.rs

  • check_lock 可能返回 LockStatus::Locked ,说明锁已经加锁成功了,应该属于接口重复调用,actions::prewrite 函数流程终结,直接返回 OK
  • check_lock 可能返回 LockStatus::Pessimistic,说明本事务是 悲观事务,同时查到的锁是悲观锁,符合预期,后期 continue 继续 actions::prewrite 流程将悲观锁修改为正常锁。
  • check_lock 可能返回 ERR:KeyIsLocked , 说明该锁是其他事务加的, actions::prewrite 函数流程终结
  • check_lock 可能返回 ERR:PessimisticLockNotFound,说明悲观锁未能找到 (start_ts 没有匹配上),或者 for_update_ts 不匹配,可能发生了锁丢失问题,actions::prewrite 函数流程终结
  • check_lock 可能返回 ERR:LockTypeNotMatch,事务是悲观事务,但是获取到了普通锁,actions::prewrite 函数流程终结

简化代码如下:

fn check_lock(
        &mut self,
        lock: Lock,
        pessimistic_action: PrewriteRequestPessimisticAction,
        expected_for_update_ts: Option<TimeStamp>,
    ) -> Result<LockStatus> {
        if lock.ts != self.txn_props.start_ts {
            if matches!(pessimistic_action, DoPessimisticCheck) {
                return ErrorInner::PessimisticLockNotFound...
            }

            return Err(ErrorInner::KeyIsLocked(self.lock_info(lock)?).into());
        }

       
        if lock.is_pessimistic_lock() {
            if !self.txn_props.is_pessimistic() {
                return Err(ErrorInner::LockTypeNotMatch...
            }

            if let Some(ts) = expected_for_update_ts
                && lock.for_update_ts != ts
            {
                return Err(ErrorInner::PessimisticLockNotFound...;
            }

            self.lock_ttl = std::cmp::max(self.lock_ttl, lock.ttl);
            self.min_commit_ts = std::cmp::max(self.min_commit_ts, lock.min_commit_ts);

            return Ok(LockStatus::Pessimistic(lock.for_update_ts));
        }

        let min_commit_ts = if lock.use_async_commit {
            lock.min_commit_ts
        } else {
            TimeStamp::zero()
        };
        Ok(LockStatus::Locked(min_commit_ts))
    }

actions::amend_pessimistic_lock

源码路径:src/storage/txn/actions/prewrite.rs

amend_pessimistic_lock 继续查看是否存在补救的可能

  • 如果目前为止,KEY 还没有新的提交记录,那么可以补救,直接对 KEY 加普通锁,继续 Prewrite 流程即可,同时对 KEY 进行 check_assertion,验证其是否符合 Priwrite 的参数要求。
  • 如果已经有新事务更新了 KEY,那么只能返回 ERR:PessimisticLockNotFound,终止 actions::prewrite 函数

简化代码如下:

fn amend_pessimistic_lock<S: Snapshot>(
    mutation: &mut PrewriteMutation<'_>,
    reader: &mut SnapshotReader<S>,
) -> Result<()> {
    let write = reader.seek_write(&mutation.key, TimeStamp::max())?;
    if let Some((commit_ts, write)) = write.as_ref() {
        if *commit_ts >= reader.start_ts {
            return Err(ErrorInner::PessimisticLockNotFound...
        }
    } 

    // Check assertion after amending.
    mutation.check_assertion(reader, &write.map(|(w, ts)| (ts, w)), true)?;

    Ok(())
}

actions::check_for_newer_version

源码路径:src/storage/txn/actions/prewrite.rs

  • 如果找到了本事务的回滚记录,返回写冲突错误,终止actions::prewrite 流程

  • 如果找到了 commit_ts > txn.start_ts 记录,

    • 乐观事务下,直接返回写冲突错误,终止actions::prewrite 流程
    • 悲观事务性,如果 Assertion 参数带有 DoConstraintCheck返回写冲突错误,终止actions::prewrite 流程
  • 如果找到了 commit_ts > for_update_ts , 悲观事务下,也需要返回 PessimisticLockNotFound 错误,终止actions::prewrite 流程 (回滚记录除外,详细见下)

  • write 记录 符合预期,还需要验证 should_not_exist 这个 Assertion,确保事务的正确性,否则会报 AlreadyExist 错误,终止 actions::prewrite 流程

  • 没有找到任何记录,符合预期,返回 OK

对于悲观事务来说,如果找到了 commit_ts > t.for_update_ts 的回滚记录,一般是符合预期的,根据 PR:pingcap/tidb#35525 的描述,出现这种情况的场景如下:

The sequence of events can be like:

  1. Pessimistic txn-A locks row key using for_update_ts = 1.
  2. Optimistic txn-B with start_ts = 2 prewrites the index key.
  3. txn-A finds the locks on index keys and rolls back txn-B, leaving rollback records of ts=2 on index keys.
  4. txn-A prewrite index keys, and find a write conflict.

个人理解是因为悲观锁是事务中加锁的,因此悲观事务乐观事务混用的时候就比较容易出现出现这种情况,因此可以特意忽略其他事务的回滚记录来减少写冲突重试的成本。

对于乐观事务来说,加锁都是 prewrite 过程中发生的,时间比较短,出现这种回滚记录导致的写冲突的概率比较小。

关于其正确性 https://github.com/tikv/tikv/pull/13426

The only risk of newer rollback records is that we may succeed in prewriting even if the lock was rolled back and collapsed.

  • If it's not an async-commit transaction, rollback records are written in two cases:

    •  The transaction aborts itself during 2PC. The primary lock must not be committed. In this case, the wrongly prewritten lock will be finally rolled back by others.
    •  A lock is rolled back by ResolveLock. Before that, the primary lock must have been rolled back. So, the new lock will also be rolled back by others.
  • If it's an async-commit transaction:

    •  The transaction aborts itself during 2PC. This means some other key must fail to prewrite. In this case, the wrongly prewritten lock will be finally rolled back by others.
    •  A lock is rolled back by ResolveLock. Before that, some other key must have been rolled back. So, the new lock will also be rolled back by other readers.
    •  The rollback is written by CheckSecondaryLocks. But CheckSecondaryLocks always writes a protected rollback. The rollback must be detected by our retried prewrite.

下面是我个人的理解:

其实回滚记录的存在就是为了防止网络原因,导致本来已经执行回滚的 key,又被调用了 Prewrite

  • t1 事务二阶段提交开始,唯一索引 UK、普通索引 iKey 被执行 Prewrite

  • 唯一索引 UK、普通索引iKey 由于写冲突被执行 cleanuprollbackresolveLockCheckSecondaryLocks 清除了 LOCK,并且被提交了回滚记录 rollback_record_uk, rollback_record_iKey

  • rollback_record_uk 的回滚记录是 Protected 模式的

  • 由于 TIKV 的优化策略,普通索引的回滚记录 rollback_record_iKey 可能是 Non-Protected 模式 (一般对于非唯一索引都是这个模式):

    • overlap 记录,可能根本不会真正写入到 TIKV 存储层(为了减少写入压力,如果恰好有其他事务的提交记录 commitTS 和事务 t1startTS 相同,非保护模式下并不会其修改 overlap 属性,所以实际上根本没有写 rollback 记录。保护模式下,会修改其 overlap 属性为 true
    • 也可能被更大 commitTS 的回滚记录 collapsed 掉,也就是删除掉(为了能够让 mvcc 更快的找到 KEY 的最新记录,TIKV 会删除中间大量的回滚记录,只保留最新的 rollback 记录)
  • 那么如果由于网络原因,这个 t1 事务对UKiKeyprewrite 请求又被重复发送到了 TIKV

  • UK 由于回滚记录是被保护的,因此会被这个判断语句拦截: commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)

  • 由于 iKeyt1 回滚记录已被删除,即使有回滚记录,那么其 commitTS 也不是 t1startTS, 因此通过了判定: commit_ts == self.txn_props.start_ts && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)

  • 悲观事务中,唯一索引的话会被 DoConstraintCheck 检验,非唯一索引一般是不会做 DoConstraintCheck 检验的。

  • 对于 commit_ts > t.for_update_ts 的记录,如果忽略了 commitTS 更高的回滚记录的话,实际上对于非唯一索引来说是可以 prewrite 成功的

  • 但是这也仅仅是二阶段提交的第一阶段成功了,等到真正 commit 的时候,由于唯一索引、rowID 实际上是 Protected 模式的回滚,因此 commit 的时候是肯定会找到回滚记录的,这时候就会阻止已经回滚的事务不会被再次提交成功。

因此,这个优化可以在避免悲观事务大量的写冲突同时,还可以保障正确性。

但是代价就是可能对于普通索引来说会 prewrite 成功然后再次被回滚,

同时非保护模式回滚、prewrite 忽略回滚这些优化可能增加了逻辑的复杂度

简化代码如下:

fn check_for_newer_version<S: Snapshot>(
        &mut self,
        reader: &mut SnapshotReader<S>,
    ) -> Result<Option<(Write, TimeStamp)>> {
        let mut seek_ts = TimeStamp::max();
        while let Some((commit_ts, write)) = reader.seek_write(&self.key, seek_ts)? {
            
        if commit_ts == self.txn_props.start_ts
                && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
        {
            self.write_conflict_error(&write, commit_ts, WriteConflictReason::SelfRolledBack)?;
        }

            match self.txn_props.kind {
                TransactionKind::Optimistic(_) => {
                    if commit_ts > self.txn_props.start_ts {
                        self.write_conflict_error(...)?;
                    }
                }
                // Note: PessimisticLockNotFound can happen on a non-pessimistically locked key,
                // if it is a retrying prewrite request.
                TransactionKind::Pessimistic(for_update_ts) => {
                    if let DoConstraintCheck = self.pessimistic_action {
                        // Do the same as optimistic transactions if constraint checks are needed.
                        if commit_ts > self.txn_props.start_ts {
                            self.write_conflict_error(...)?;
                        }
                    }
                    if commit_ts > for_update_ts {
                        // Don't treat newer Rollback records as write conflicts. They can cause
                        // false positive errors because they can be written even if the pessimistic
                        // lock of the corresponding row key exists.
                        // Rollback records are only used to prevent retried prewrite from
                        // succeeding. Even if the Rollback record of the current transaction is
                        // collapsed by a newer record, it is safe to prewrite this non-pessimistic
                        // key because either the primary key is rolled back or it's protected
                        // because it's written by CheckSecondaryLocks.
                        if write.write_type == WriteType::Rollback {
                            seek_ts = commit_ts.prev();
                            continue;
                        }

                       
                        return Err(ErrorInner::PessimisticLockNotFound...
                    }
                }
            }

            check_data_constraint(reader, self.should_not_exist, &write, commit_ts, &self.key)?;

            return Ok(Some((write, commit_ts)));
        }

        Ok(None)
    }

actions::check_assertion

源码路径:src/storage/txn/actions/prewrite.rs

  • 如果 skip_constraint_checkfalse,实际上 write 记录已经加载完毕,那么直接对 write 记录进行 Assertion 验证即可

    • 当然也有可能 write 记录并不是 PUT 或者 DELETE 记录,这个时候需要从底层重新加载 write 记录
  • 如果 skip_constraint_checktrue,那么需要开启 assertion_level=Strict 模式才会查询 write 记录,并且进行 Assertion 验证。否则的话,直接返回 OK

  • 分别对 Assertion::ExistAssertion::NotExist 进行验证

  • 如果验证失败的话,

    • skip_constraint_checkfalse,那么可能需要由 check_for_newer_version 函数报错,报错信息更准确
    • 否则的话,返回 AssetionError 错误
  • 验证成功,返回 OK

简化代码如下:

fn check_assertion<S: Snapshot>(
        &mut self,
        reader: &mut SnapshotReader<S>,
        write: &Option<(Write, TimeStamp)>,
        write_loaded: bool,
    ) -> Result<(Option<(Write, TimeStamp)>, bool)> {
        if self.assertion == Assertion::None
            || self.txn_props.assertion_level == AssertionLevel::Off
        {  
            return Ok((None, false));
        }

        if self.txn_props.assertion_level != AssertionLevel::Strict && !write_loaded {
            return Ok((None, false));
        }

        let mut reloaded_write = None;
        let mut reloaded = false;

        // To pass the compiler's lifetime check.
        let mut write = write;

        if write_loaded
            && write.as_ref().map_or(
                false,
                |(w, _)| matches!(w.gc_fence, Some(gc_fence_ts) if !gc_fence_ts.is_zero()),
            )
        {
            // The previously-loaded write record has an invalid gc_fence. Regard it as
            // none.
            write = &None;
        }

        // Load the most recent version if prev write is not loaded yet, or the prev
        // write is not a data version (`Put` or `Delete`)
        let need_reload = !write_loaded
            || write.as_ref().map_or(false, |(w, _)| {
                w.write_type != WriteType::Put && w.write_type != WriteType::Delete
            });
        if need_reload {
            let reload_ts = write.as_ref().map_or(TimeStamp::max(), |(_, ts)| *ts);
            reloaded_write = reader.get_write_with_commit_ts(&self.key, reload_ts)?;
            write = &reloaded_write;
            reloaded = true;
        } 

        let assertion_err = match (self.assertion, write) {
            (Assertion::Exist, None) => {
                self.assertion_failed_error(TimeStamp::zero(), TimeStamp::zero())
            }
            (Assertion::Exist, Some((w, commit_ts))) if w.write_type == WriteType::Delete => {
                self.assertion_failed_error(w.start_ts, *commit_ts)
            }
            (Assertion::NotExist, Some((w, commit_ts))) if w.write_type == WriteType::Put => {
                self.assertion_failed_error(w.start_ts, *commit_ts)
            }
            _ => Ok(()),
        };

        // Assertion error can be caused by a rollback. So make up a constraint check if
        // the check was skipped before.
        if assertion_err.is_err() {
            if self.skip_constraint_check() {
                self.check_for_newer_version(reader)?;
            }
            let (write, commit_ts) = write
                .as_ref()
                .map(|(w, ts)| (Some(w), Some(ts)))
                .unwrap_or((None, None));
            error!("assertion failure"; "assertion" => ?self.assertion, "write" => ?write,
            "commit_ts" => commit_ts, "mutation" => ?self);
            assertion_err?;
        }

        Ok((reloaded_write, reloaded))
    }

actions::write_lock

源码路径:src/storage/txn/actions/prewrite.rs

write_lock 函数将会构建 LOCK 信息。

特别的,如果是 Async CommitPrimary KEY 的话,还需要把 secondary key 的信息写入到 LOCK

同时 Async Commit 下,还需要调用 async_commit_timestamps 计算 final_min_commit_ts,其值是 :

max(concurrency_manager.max_ts(), start_ts, lock.min_commit_ts)

但是如果这个值大于 Prewrite 请求的 max_commit_ts 的话,会返回 CommitTsTooLarge,回退为普通 2PC.

简化代码如下:

fn write_lock(
        self,
        lock_status: LockStatus,
        txn: &mut MvccTxn,
        is_new_lock: bool,
    ) -> Result<TimeStamp> {
        let mut try_one_pc = self.try_one_pc();

        let for_update_ts_to_write = match (self.txn_props.for_update_ts(), lock_status) {
            (from_prewrite_req, LockStatus::Pessimistic(from_pessimistic_lock)) => {
                std::cmp::max(from_prewrite_req, from_pessimistic_lock)
            }
            (for_update_ts_from_req, _) => for_update_ts_from_req,
        };

        let mut lock = Lock::new(
            self.lock_type.unwrap(),
            self.txn_props.primary.to_vec(),
            self.txn_props.start_ts,
            self.lock_ttl,
            None,
            for_update_ts_to_write,
            self.txn_props.txn_size,
            self.min_commit_ts,
            false,
        )
        .set_txn_source(self.txn_props.txn_source);
        ...

        if let Some(value) = self.value {
            if is_short_value(&value) {
                // If the value is short, embed it in Lock.
                lock.short_value = Some(value);
            } else {
                // value is long
                txn.put_value(self.key.clone(), self.txn_props.start_ts, value);
            }
        }

        if let Some(secondary_keys) = self.secondary_keys {
            lock.use_async_commit = true;
            lock.secondaries = secondary_keys.to_owned();
        }

        let final_min_commit_ts = if lock.use_async_commit || try_one_pc {
            let res = async_commit_timestamps(
                &self.key,
                &mut lock,
                self.txn_props.start_ts,
                self.txn_props.for_update_ts(),
                self.txn_props.max_commit_ts(),
                txn,
            );

            if let Err(Error(box ErrorInner::CommitTsTooLarge { .. })) = &res {
                try_one_pc = false;
                lock.use_async_commit = false;
                lock.secondaries = Vec::new();
            }
            res
        } else {
            Ok(TimeStamp::zero())
        };

        if try_one_pc {
            txn.put_locks_for_1pc(self.key, lock, lock_status.has_pessimistic_lock());
        } else {
            txn.put_lock(self.key, &lock, is_new_lock);
        }

        final_min_commit_ts
    }

fn async_commit_timestamps(
    key: &Key,
    lock: &mut Lock,
    start_ts: TimeStamp,
    for_update_ts: TimeStamp,
    max_commit_ts: TimeStamp,
    txn: &mut MvccTxn,
) -> Result<TimeStamp> {
    // This operation should not block because the latch makes sure only one thread
    // is operating on this key.
    let key_guard = ::futures_executor::block_on(txn.concurrency_manager.lock_key(key));

    let final_min_commit_ts = key_guard.with_lock(|l| {
        let max_ts = txn.concurrency_manager.max_ts();
        fail_point!("before-set-lock-in-memory");
        let min_commit_ts = cmp::max(cmp::max(max_ts, start_ts), for_update_ts).next();
        let min_commit_ts = cmp::max(lock.min_commit_ts, min_commit_ts);

        if (!max_commit_ts.is_zero() && min_commit_ts > max_commit_ts) {
            return Err(ErrorInner::CommitTsTooLarge...
        }

        lock.min_commit_ts = min_commit_ts;
        *l = Some(lock.clone());
        Ok(min_commit_ts)
    })?;

    txn.guards.push(key_guard);

    Ok(final_min_commit_ts)
}

2
2
1
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论