前言
上一篇介绍了 Prewrite 接口,这篇我们继续介绍 Commit/Rollback
接口,Cleanup
接口实际上和 Rollback
接口类似。
除此之外,还有 CheckTxnStatus
/ ResolveLock
/ CheckSecondaryLocks
关键接口,由于篇幅有限,只能后面有机会再聊
Commit
参数
KEYS
:Commit
提交的涉及的 KEYS,相关的KEYS
和Prewrite
相同LOCK_TS
:Commit
需要消除的LOCK TS
,一般也是事务的start_ts
COMMIT_TS
: 提交的最终commit_ts
以 UPDATE
语句为例:
UPDATE MANAGERS_UNIQUE SET FIRST_NAME="Brad9" where FIRST_NAME='Brad10';
sched_txn_command kv::command::commit [
7480000000000000FF6A5F720131343237FF36000000FC000000FC
]
start_ts:448099651396042753 -> commit_ts:448099662328233986
| region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981801 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295
sched_txn_command kv::command::commit [
7480000000000000FF6A5F698000000000FF0000020142726164FF31300000FD000000FC,
7480000000000000FF6A5F698000000000FF0000020142726164FF39000000FC000000FC
]
start_ts:448099651396042753 -> commit_ts:448099662328233986
| region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295
其他语句类似,区别不大,这里不在赘述。
代码简读
-
对每个
KEY
都调用commit
函数进行提交操作 -
使用
load_lock
函数来检查是否含有KEY
对应的LOCK
,我们预期应该存在Prewrite
留下的LOCK
-
如果没有发现
LOCK
或者不是本事务的LOCK
,:- 调用
get_txn_commit_record
观察是否已经提交完毕,如果已经提交,那么可以提前返回OK
- 如果发现了回滚记录,或者没有找到任何记录,那么返回
ERR: TxnLockNotFound
- 调用
-
如果发现了本事务的
LOCK
,首先检查一下lock.min_commit_ts
必须大于commit_ts
-
如果
LOCK
类型是正常的锁,那么删除锁,并且添加新的write
记录即可正常返回 -
如果
LOCK
类型是悲观锁,这个是非预期的,这时候commit
操作实际上就是删除悲观锁即可 (并不需要write CF
上的回滚记录)。- 可能是因为
pessimistic rollback
请求未能发送到TIKV
,也可能是TIKV
由于某种情况下突然收到了pessimistic lock
请求,个人理解这些特殊场景可能并不是二阶段过程中会发生的,因为Prewrite
成功后不可能将常规的锁转为悲观锁,根据注释大概率应该是resolve lock
过程中可能遇到的场景
- 可能是因为
-
还有一种比较特殊的情况,那就是存在并发的两个事务,
t1
与t2
,t1
开启的时间很早,也就是t1.start_ts < t2.start_ts
t1
对KEY
调用了Prewrite
进行了加锁-
t2
开启的时间比较晚,也想对KEY
进行加锁,发现有并发事务的锁冲突,因此采取了回滚。 - 回滚的时候,会留下
write
记录,该write
记录的write.commit_ts = t2.start_ts
- 那么其实会有一个隐患,如果
t1
提交的时候,t1.commit_ts
恰好和t2.start_ts
相同的话,那么t1
提交write
记录就会覆盖t2
的回滚记录 - 正常来说,如果在提交
t1
事务的时候,先来看一眼write
现有记录的话,可以简单的避免这个问题。但是每次提交都查询write
记录的话,代价稍微有点高。 - 但是我们每次进行
commit
的时候,都避免不了去加载LOCK
信息 - 因此,引入了
lock.rollback_ts
,每当其他事务发生锁冲突因此需要回滚的时候,我们都会更新这个rollback_ts
数组。如果t1
发现自己的commit_ts
命中了lock.rollback_ts
,那么写write
记录的时候需要小心一些,设置overlapped_rollback
为true
,标志这个write
记录其实是叠加了两个事务的commit
和rollback
-
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
if self.commit_ts <= self.lock_ts {
return Err(Error::from(ErrorInner::InvalidTxnTso...
}
...
for k in self.keys {
released_locks.push(commit(&mut txn, &mut reader, k, self.commit_ts)?);
}
let mut write_data = WriteData::from_modifies(txn.into_modifies());
Ok(WriteResult {...}
)
pub fn commit<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<S>,
key: Key,
commit_ts: TimeStamp,
) -> MvccResult<Option<ReleasedLock>> {
let (mut lock, commit) = match reader.load_lock(&key)? {
Some(lock) if lock.ts == reader.start_ts => {
if commit_ts < lock.min_commit_ts {
return Err(ErrorInner::CommitTsExpired...
}
if lock.is_pessimistic_lock() {
(lock, false)
} else {
(lock, true)
}
}
_ => {
return match reader.get_txn_commit_record(&key)?.info() {
Some((_, WriteType::Rollback)) | None => {
Err(ErrorInner::TxnLockNotFound...
}
Some((_, WriteType::Put))
| Some((_, WriteType::Delete))
| Some((_, WriteType::Lock)) => {
Ok(None)
}
};
}
};
if !commit {
// Rollback a stale pessimistic lock. This function must be called by
// resolve-lock in this case.
assert!(lock.is_pessimistic_lock());
return Ok(txn.unlock_key(key, lock.is_pessimistic_txn(), TimeStamp::zero()));
}
let mut write = Write::new(
WriteType::from_lock_type(lock.lock_type).unwrap(),
reader.start_ts,
lock.short_value.take(),
)...
for ts in &lock.rollback_ts {
if *ts == commit_ts {
write = write.set_overlapped_rollback(true, None);
break;
}
}
txn.put_write(key.clone(), commit_ts, write.as_ref().to_bytes());
Ok(txn.unlock_key(key, lock.is_pessimistic_txn(), commit_ts))
}
pub(crate) fn unlock_key(
&mut self,
key: Key,
pessimistic: bool,
commit_ts: TimeStamp,
) -> Option<ReleasedLock> {
let released = ReleasedLock::new(self.start_ts, commit_ts, key.clone(), pessimistic);
let write = Modify::Delete(CF_LOCK, key);
self.write_size += write.size();
self.modifies.push(write);
Some(released)
}
get_txn_commit_record:扫描从 max_ts 到 t1.start_ts 之间 key 的 write record 来判断 t1 状态
return TxnCommitRecord::SingleRecord: 找到了 write.start_ts = t1.ts1 的,WriteRecord 可能是回滚记录,也可能是提交记录
return TxnCommitRecord::OverlappedRollback: 找到了 t1.start_ts == t3.commit_ts, 而且 has_overlapped_write 是 true
return TxnCommitRecord::None(Some(OverlappedWrite)): 找到了 t1.start_ts == t3.commit_ts, 而且 has_overlapped_write 为 false。实际上该记录和 rollback 记录重叠了,需要设置 has_overlapped_write
return TxnCommitRecord::None: 没找到 t1 的 commit 记录
pub fn info(&self) -> Option<(TimeStamp, WriteType)> {
match self {
Self::None { .. } => None,
Self::SingleRecord { commit_ts, write } => Some((*commit_ts, write.write_type)),
Self::OverlappedRollback { commit_ts } => Some((*commit_ts, WriteType::Rollback)),
}
}
Rollback
场景
和直观认知可能不太一样,TIKV
的 Rollback
接口一般情况下并不是 sql
的 rollback
语句触发的。
对于乐观事务来说,由于事务过程中,没有加任何锁,因此 sql
rollback
语句实际上并不需要调用 tikv
的接口处理,只需要将 Buff
的 put
数据清空即可。
对于悲观事务来说,事务过程中加了悲观锁,但是 sql
rollback
语句实际触发的是 pessimistic_rollback
这个接口,专门用于清理悲观锁。
TIKV
的 Rollback
接口常见于乐观事务写冲突的时候,乐观事务在进行二阶段提交过程中,prewrite
过程中发现了写冲突,这时候就需要调用 TIKV
的 Rollback
。
t1: begin optimistic;
t1: DELETE FROM MANAGERS_UNIQUE where FIRST_NAME='Brad7';
t2: begin optimistic;
t2: DELETE FROM MANAGERS_UNIQUE where FIRST_NAME='Brad7';
t2: commit;
t1: commit; ERROR 9007 (HY000): Write conflict;
实际上对于写冲突的 Rollback
, 之前 prewrite
也大概率并没有加锁,因此 Rollback
不需要清理锁,也不需要清楚 default CF
的数据,只需要添加一个 Rollback write
记录。
参数
KEYS
:Commit
提交的涉及的KEYS
,相关的KEYS
和Prewrite
相同LOCK_TS
:Commit
需要消除的LOCK TS
,一般也是事务的start_ts
mysql> begin optimistic;
Query OK, 0 rows affected (0.00 sec)
mysql> DELETE FROM MANAGERS_UNIQUE where FIRST_NAME='Brad7';
Query OK, 1 row affected (0.00 sec)
mysql> commit;
ERROR 9007 (HY000): Write conflict, txnStartTS=448235833448988673, conflictStartTS=448235835493974020, conflictCommitTS=448235837853270018, key={tableID=106, tableName=test.MANAGERS_UNIQUE, indexID=2, indexValues={Brad7, }}, originalKey=74800000000000006a5f698000000000000002014272616437000000fc, primary={tableID=106, tableName=test.MANAGERS_UNIQUE, indexID=2, indexValues={Brad7, }}, originalPrimaryKey=74800000000000006a5f698000000000000002014272616437000000fc, reason=Optimistic [try again later
sched_txn_command kv::command::rollback keys([
7480000000000000FF6A5F698000000000FF0000020142726164FF37000000FC000000FC
])
@ start_ts:448235833448988673
| region_id: 129 region_epoch { conf_ver: 1 version: 61 } peer { id: 130 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295
sched_txn_command kv::command::rollback keys([
7480000000000000FF6A5F698000000000FF0000030380000000FF0000000701313432FF3733000000FC0000FD,
7480000000000000FF6A5F720131343237FF33000000FC000000FC
])
@ start_ts:448235833448988673
| region_id: 14 region_epoch { conf_ver: 1 version: 61 } peer { id: 15 store_id: 1 } isolation_level: Si max_execution_duration_ms: 20000 resource_group_tag: 0A209505CACB7C710ED17125FCC6CB3669E8DDCA6C8CD8AF6A31F6B3CD64604C30981802 request_source: "external_Commit" resource_control_context { resource_group_name: "default" penalty {} override_priority: 8 } keyspace_id: 4294967295
Overlapped Rollback 回滚记录
我们知道回滚记录是个比较特殊的 write
记录,不仅仅是 write.type
是 rollback
类型的,而且还因为其 write
记录的 commitTS
与事务的 startTS
是相同的,TIKV
这样设计应该是为了减少和 PD
的交互,少获取一次 TS
,节省系统消耗。
因此普通的提交记录是这样的 KEY-VALUE
格式:
{KEY_CommitTS
: { write.type=put,write.startTS=startTS
} }
而回滚记录一般是这样的 KEY-VALUE
格式:
{KEY_StartTS
: { write.type=rollback,write.startTS=startTS
} }
那么这样就会有一个问题,那就是很多事务的 commitTS
也不是 PD
获取的,而是通过计算得到的,例如 Async Commit
。那么就可能会遇到这个场景:
T1
事务启动startTS=start_t1
, 采用了Async Commit
的方式,计算出commitTS=commit_t1
,提交记录的KEY
是KEY_commit_t1
T2
事务启动startTS=start_t2
,然后被回滚,因此其回滚记录的KEY
是KEY_start_t2
- 由于
commit_t1
并不是 PD 获取的,而start_t2
是 PD 获取的 ts,因此就有概率commit_t1==start_t2
,也就是说两个事务的提交记录和回滚记录在write CF
上重叠了 - 这个时候,就需要一个属性值
Overlapped
,当一个提交记录的Overlapped
为true
的时候,就代表这其实是两个记录,一个提交记录一个回滚记录
保护模式和非保护模式回滚记录
当我们事务冲突很严重的时候,就容易有多条的回滚记录,这对于 TIKV
的 mvcc
扫描来说效率太慢了。因此 TIKV
有个优化,在 write CF
上面,对于一个 KEY
,只保留最新的那个回滚记录即可,其他回滚记录可以直接删除。
但是为了正确性考虑,必须防止已经对 KEY
进行了回滚操作,后面突然由于网络原因又出现对 KEY
调用了 prewrite
和 commit
,导致回滚的事件被错误的提交。
因此只能对部分 KEY
进行这种 collapse
删除优化。
具体的就是对于 rowID
、唯一索引来说,采用保护模式的回滚,该回滚记录不会被删除。这样每次事务被错误的 commit
的时候,都可以通过被保护的回滚记录了解到这个事务实际上已经被提交了。
对于普通索引,采用非保护模式,可能被其他事务更新的 rollback
记录删除,也可能遇到需要 Overlapped
的场景,并不设置 Overlapped
为 true
。
最后实际上最后结果就是:普通索引上面即使被回滚了,但是却找不到任何回滚的记录。
代码简读
-
对每个
KEY
都调用cleanup
函数进行回滚操作。(而且是以非保护模式下来调用) -
使用
load_lock
函数来检查是否含有KEY
对应的LOCK
,我们预期应该存在事务留下的LOCK
-
如果发现了本事务的锁,
lock.ts == txn.start_ts
,执行rollback_lock
进行回滚操作-
rollback_lock
为了保险起见,会再次通过get_txn_commit_record
函数查看write
的最新记录-
如果发现
write
上有当前事务的提交记录,直接panic
-
如果发现有
OverlappedRollback
的记录或者回滚记录 (SingleRecord::Rollback
),说明之前已经添加了write
回滚记录,删除了default
上面的value
数据,那么现在只需要把LOCK
记录删除即可 -
如果没有发现任何提交记录或者回滚记录,那么
-
如果
LOCK
是PUT
类型、且已经写入default CF Value
,那么需要删除default CF Value
-
非保护模式下利用
make_rollback
生成rollback
类型的write
记录- 特别需要注意的是,由于是非保护模式下,所以如果恰好
rollback
记录 {key_startTS
} 与其他事务的提交记录 {key_commitTS
} 重叠 (可能t1
的startTS
恰好是t2
事务的commitTS
),那么一般情况下可以省略rollback
记录的写入,为集群减少负担。 - 但是如果
KEY
是悲观事务的Primary KEY
的话,就需要将提交记录{key_commitTS
} 设置一个overlapped_rollback
标记
- 特别需要注意的是,由于是非保护模式下,所以如果恰好
-
删除
LOCK
记录
-
-
-
-
如果没有发现锁或者发现的锁并不是本事务的,而是其他事务的
LOCK
,那么需要调用check_txn_status_missing_lock
-
如果发现有本事务的
OverlappedRollback
的记录或者回滚记录 (SingleRecord::Rollback
),说明已经回滚完成,直接返回OK
即可终止回滚流程 -
如果发现有本事务提交记录的话,返回
ErrorInner::Committed
-
如果没有找到任何本事务
write
记录的话- 如果发现了其他事务的锁:首先需要调用
mark_rollback_on_mismatching_lock
在这个LOCK
上面添加回滚LockTS
标记,这样这个lock
所涉及的事务在提交后,如果发现自己的commitTS
和LockTS
重叠的话,需要设置一下overlap
标记 - 保护模式下调用
make_rollback
写入rollback
记录,确保这个回滚记录不会被删除 - 删除
collapse
以前的非保护rollback
记录
- 如果发现了其他事务的锁:首先需要调用
-
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
let rows = self.keys.len();
let mut released_locks = ReleasedLocks::new();
for k in self.keys {
// Rollback is called only if the transaction is known to fail. Under the
// circumstances, the rollback record needn't be protected.
let released_lock = cleanup(&mut txn, &mut reader, k, TimeStamp::zero(), false)?;
released_locks.push(released_lock);
}
let mut write_data = WriteData::from_modifies(txn.into_modifies());
Ok(WriteResult {
...
}
pub fn cleanup<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<S>,
key: Key,
current_ts: TimeStamp,
protect_rollback: bool,
) -> MvccResult<Option<ReleasedLock>> {
match reader.load_lock(&key)? {
Some(ref lock) if lock.ts == reader.start_ts => {
...
rollback_lock(
txn,
reader,
key,
lock,
lock.is_pessimistic_txn(),
!protect_rollback,
)
}
l => match check_txn_status_missing_lock(
txn,
reader,
key.clone(),
l,
MissingLockAction::rollback_protect(protect_rollback),
false,
)? {
TxnStatus::Committed { commit_ts } => {
Err(ErrorInner::Committed...
}
TxnStatus::RolledBack => {
Ok(None)
}
TxnStatus::LockNotExist => Ok(None),
_ => unreachable!(),
},
}
}
rollback_lock
-
为了保险起见,会再次通过
get_txn_commit_record
函数查看write
的最新记录-
如果发现
write
上有当前事务的提交记录,直接panic
-
如果发现有
OverlappedRollback
的记录或者回滚记录 (SingleRecord::Rollback
),说明之前已经添加了write
回滚记录,删除了default
上面的value
数据,那么现在只需要把LOCK
记录删除即可 -
如果没有发现任何提交记录或者回滚记录,那么
-
如果
LOCK
是PUT
类型、且已经写入default CF Value
,那么需要删除default CF Value
-
非保护模式下利用
make_rollback
生成rollback
类型的write
记录- 特别需要注意的是,由于是非保护模式下,所以如果恰好
rollback
记录 {key_startTS
} 与其他事务的提交记录 {key_commitTS
} 重叠 (可能t1
的startTS
恰好是t2
事务的commitTS
),那么一般情况下可以省略rollback
记录的写入,为集群减少负担。 - 但是如果
KEY
是悲观事务的Primary KEY
的话,就需要将提交记录{key_commitTS
} 设置一个overlapped_rollback
标记
- 特别需要注意的是,由于是非保护模式下,所以如果恰好
-
删除
collapse
以前的非保护rollback
记录 -
删除
LOCK
记录
-
-
pub fn rollback_lock(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
key: Key,
lock: &Lock,
is_pessimistic_txn: bool,
collapse_rollback: bool,
) -> Result<Option<ReleasedLock>> {
let overlapped_write = match reader.get_txn_commit_record(&key)? {
TxnCommitRecord::None { overlapped_write } => overlapped_write,
TxnCommitRecord::SingleRecord { write, commit_ts }
if write.write_type != WriteType::Rollback =>
{
panic!(
...
}
_ => return Ok(txn.unlock_key(key, is_pessimistic_txn, TimeStamp::zero())),
};
// If prewrite type is DEL or LOCK or PESSIMISTIC, it is no need to delete
// value.
if lock.short_value.is_none() && lock.lock_type == LockType::Put {
txn.delete_value(key.clone(), lock.ts);
}
// Only the primary key of a pessimistic transaction needs to be protected.
let protected: bool = is_pessimistic_txn && key.is_encoded_from(&lock.primary);
if let Some(write) = make_rollback(reader.start_ts, protected, overlapped_write) {
txn.put_write(key.clone(), reader.start_ts, write.as_ref().to_bytes());
}
if collapse_rollback {
collapse_prev_rollback(txn, reader, &key)?;
}
Ok(txn.unlock_key(key, is_pessimistic_txn, TimeStamp::zero()))
}
pub fn make_rollback(
start_ts: TimeStamp,
protected: bool,
overlapped_write: Option<OverlappedWrite>,
) -> Option<Write> {
match overlapped_write {
Some(OverlappedWrite { write, gc_fence }) => {
assert!(start_ts > write.start_ts);
if protected {
Some(write.set_overlapped_rollback(true, Some(gc_fence)))
} else {
// No need to update the original write.
None
}
}
None => Some(Write::new_rollback(start_ts, protected)),
}
}
check_txn_status_missing_lock
-
如果发现有本事务的
OverlappedRollback
的记录或者回滚记录 (SingleRecord::Rollback
),说明已经回滚完成,直接返回OK
即可终止回滚流程 -
如果发现有本事务提交记录的话,返回
ErrorInner::Committed
-
如果没有找到任何本事务
write
记录的话 (这个场景可能比较少见)- 首先需要调用
mark_rollback_on_mismatching_lock
在这个LOCK
上面添加回滚LockTS
标记,这样这个lock
所涉及的事务在提交后,如果发现自己的commitTS
和LockTS
重叠的话,需要设置一下overlap
标记 - 保护模式下调用
make_rollback
写入rollback
记录,确保这个回滚记录不会被删除 - 删除
collapse
以前的非保护rollback
记录
- 首先需要调用
pub fn check_txn_status_missing_lock(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
mismatch_lock: Option<Lock>,
action: MissingLockAction,
resolving_pessimistic_lock: bool,
) -> Result<TxnStatus> {
match reader.get_txn_commit_record(&primary_key)? {
TxnCommitRecord::SingleRecord { commit_ts, write } => {
if write.write_type == WriteType::Rollback {
Ok(TxnStatus::RolledBack)
} else {
Ok(TxnStatus::committed(commit_ts))
}
}
TxnCommitRecord::OverlappedRollback { .. } => Ok(TxnStatus::RolledBack),
TxnCommitRecord::None { overlapped_write } => {
...
let ts = reader.start_ts;
// collapse previous rollback if exist.
if action.collapse_rollback() {
collapse_prev_rollback(txn, reader, &primary_key)?;
}
if let (Some(l), None) = (mismatch_lock, overlapped_write.as_ref()) {
txn.mark_rollback_on_mismatching_lock(
&primary_key,
l,
action == MissingLockAction::ProtectedRollback,
);
}
// Insert a Rollback to Write CF in case that a stale prewrite
// command is received after a cleanup command.
if let Some(write) = action.construct_write(ts, overlapped_write) {
txn.put_write(primary_key, ts, write.as_ref().to_bytes());
}
Ok(TxnStatus::LockNotExist)
}
}
}
Cleanup
Cleanup
和 Rollback
实际上调用的代码区别不大,关键点就是调用 action::cleanup
函数的时候,传递的 protect_rollback
参数是 true
,也就是说 Cleanup
接口的回滚记录全部都是保护模式的。
Cleanup
比较重要的作用就是清理当前事务中,已经不需要的锁信息。因此,为了保险起见 ,Cleanup
接口会留下保护类型的回滚记录,防止网络异常原因导致的 stale
prewrite
请求,并且请求成功导致事务被错误提交。
关于何时调用 Cleanup
何时调用 Rollback
,需要具体看 tikv-client
的逻辑甚至看 TIDB
的逻辑,目前笔者对此了解不多。只能从 TIKV
的代码来猜测,Rollback
应该是用于非常确定的场景,即使出现了当前事务的 stale
prewrite
请求,也不会导致事务会被成功提交,因此其回滚记录可以是非保护模式的,即使被删除了也无所谓。其他场景都是需要 Cleanup
接口,把回滚记录保护起来,拦截阻止 stale
prewrite
请求的成功。
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
// It is not allowed for commit to overwrite a protected rollback. So we update
// max_ts to prevent this case from happening.
context.concurrency_manager.update_max_ts(self.start_ts);
...
let mut released_locks = ReleasedLocks::new();
released_locks.push(cleanup(
&mut txn,
&mut reader,
self.key,
self.current_ts,
true,
)?);
let new_acquired_locks = txn.take_new_locks();
let mut write_data = WriteData::from_modifies(txn.into_modifies());
Ok(WriteResult {
...
})