前言
本篇文章将会对锁冲突场景下,常用的解决锁冲突的接口,进行个人对代码的理解与解析,希望对大家理解 TIKV
分布式事务有所帮助
CheckTxnStatus
这个接口的作用比较明显,主要是事务中,遇到锁冲突 (一般是悲观事务过程中加悲观锁,或者 prewrite
过程中加的锁),从锁信息中获取到其 Primary KEY
,进而通过这个接口来看 primary
key
的当前事务状态。事务状态可能是已经提交,已经回滚,或者还在事务中。
特别的,如果发现 Primary KEY
的锁已经过期了,CheckTxnStatus
还会主动将其进行保护模式的回滚。
参数
假如目前有两个并发事务,t1
和 t2
t1
在事务过程中,发现了 t2
的锁,因此 t1
调用了 CheckTxnStatus
接口来查看 t2
当前的状态
primary_key
: 需要查看的t2
主键KEY
lock_ts
:t2
的start_ts
caller_start_ts
:t1
的start_ts
current_ts
: 当前的ts
rollback_if_not_exist
: 如果没有发现任何提交记录或者回滚记录的时候,是直接回滚还是返回错误-
force_sync_commit
:async_commit
的场景下,是否强制推进async_commit
进程,否则返回uncommitted
-
resolving_pessimistic_lock
:false
代表本意是想解析prewrite lock
,true
代表本意是想解析悲观锁。 -
verify_is_primary
: 验证主键上面的锁是主键锁 ( issue 42937 ),目前默认开启该校验功能
简化代码
-
和其他接口一样,首先需要获取
Primary
的锁信息 -
如果找到的锁是 符合预期 的
t2
的锁,那么调用check_txn_status_lock_exists
-
首先需要校验这个
lock
的合法性:如果verify_is_primary
参数是true
,结果发现这个锁信息中的primary key
和请求参数的primary
对应不上,那么需要返回错误PrimaryMismatch
,这种情况可能是primary key
被替换了。Corner Case
:但是存在一个特殊情况 (lock
是悲观锁 &&resolving_pessimistic_lock
是false
)。就是说本来想要解析的是prewrite lock
,结果发现是悲观锁,而且锁的primary
主键key
还对应不上,这种场景下会网开一面并不会报错,而是会清理悲观锁,并且使用check_txn_status_missing_lock
来进行进一步查看事务状态。这种场景的出现可能是因为悲观事务的acquire_pessimistic_lock
接口被stale
调用导致的
-
如果是
use_async_commit
类型的lock
,非强制模式下 (force_sync_commit
为false
),直接返回Ok(TxnStatus::uncommitted)
后续可能会进行重试。否则的话,继续执行 -
如果
lock
是prewrite lock
的话,是符合预期的进一步判断lock
的过期时间- 如果已经过期,那么直接回滚,返回
TxnStatus::TtlExpire
。(这里的回滚好像是非保护模式的?按理说应该对primary lock
进行保护模式的回滚,笔者比较疑惑) - 如果还未过期,那么更新
lock
的min_commit_ts
,并且返回TxnStatus::uncommitted
- 如果已经过期,那么直接回滚,返回
-
如果
lock
是悲观锁的话,需要使用check_txn_status_from_pessimistic_primary_lock
进一步处理-
特别地,如果悲观锁信息显示该
lock
是通过公平锁功能写入的,那么这个lock
需要进一步进行检查验证,防止 issue 43540 ,进一步查看该lock
对应的事务是已经提交或者回滚,防止其是stale
的lock
。如果确实没有任何提交记录或者回滚记录,那么可以才可以认为该悲观锁是可用的有效的。否则的话,直接清楚该悲观锁,返回事务状态即可。 -
如果悲观锁已经过期
- 如果预期检查的
lock
就是悲观锁,那么只需要清理悲观锁,返回Ok(TxnStatus::PessimisticRollBack)
即可,无需回滚 - 如果预期检查的
lock
是prewrite lock
,我们就需要清理悲观锁的同时,还需要留下回滚的记录 (非保护模式下,笔者目前不太了解为何还是非保护模式)
- 如果预期检查的
-
如果悲观锁未过期,那么更新
lock
的min_commit_ts
,并且返回TxnStatus::uncommitted
-
-
-
如果没有找到锁,或者找到的锁是不是符合预期的
t2
的锁,属于非预期场景,那么调用check_txn_status_missing_lock
check_txn_status_missing_lock
这个函数我们应该很熟悉了,这个函数在 rollback
、cleanup
函数中也会被调用,但是由于 MissingLockAction
的不同,逻辑稍微有些变化:
-
如果发现有本事务的
OverlappedRollback
的记录或者回滚记录 (SingleRecord::Rollback
),说明已经回滚完成,直接返回OK
-
如果发现有本事务提交记录的话,返回
ErrorInner::Committed
-
如果没有找到任何本事务
write
记录的话,属于非预期场景-
如果
rollback_if_not_exist
为false
,那么直接返回ErrorInner::TxnNotFound
-
如果
resolving_pessimistic_lock
参数为true
的话,就是说目标是解析悲观锁,结果并没有发现该锁,这时候会返回Ok(TxnStatus::LockNotExistDoNothing)
-
如果
rollback_if_not_exist
为true
,那么需要进行保护模式的回滚操作:- 调用
mark_rollback_on_mismatching_lock
在这个LOCK
上面添加回滚LockTS
标记,这样这个lock
所涉及的事务在提交后,如果发现自己的commitTS
和LockTS
重叠的话,需要设置一下overlap
标记 - 调用
make_rollback
写入保护模式的rollback
记录,确保这个回滚记录不会被删除 - 删除
collapse
以前的非保护rollback
记录
- 调用
-
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
let (txn_status, released) = match reader.load_lock(&self.primary_key)? {
Some(lock) if lock.ts == self.lock_ts => check_txn_status_lock_exists(
&mut txn,
&mut reader,
self.primary_key,
lock,
self.current_ts,
self.caller_start_ts,
self.force_sync_commit,
self.resolving_pessimistic_lock,
self.verify_is_primary,
self.rollback_if_not_exist,
)?,
l => (
check_txn_status_missing_lock(
&mut txn,
&mut reader,
self.primary_key,
l,
MissingLockAction::rollback(self.rollback_if_not_exist),
self.resolving_pessimistic_lock,
)?,
None,
),
};
...
Ok(WriteResult {
...
})
}
pub fn check_txn_status_lock_exists(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
mut lock: Lock,
current_ts: TimeStamp,
caller_start_ts: TimeStamp,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
verify_is_primary: bool,
rollback_if_not_exist: bool,
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
if verify_is_primary && !primary_key.is_encoded_from(&lock.primary) {
return match (resolving_pessimistic_lock, lock.is_pessimistic_lock()) {
(false, true) => {
...
let txn_status = check_txn_status_missing_lock(
...
MissingLockAction::rollback(rollback_if_not_exist),
resolving_pessimistic_lock,
)?;
Ok((txn_status, released))
}
_ => {
Err(
ErrorInner::PrimaryMismatch... )
}
};
}
// Never rollback or push forward min_commit_ts in check_txn_status if it's
// using async commit. Rollback of async-commit locks are done during
// ResolveLock.
if lock.use_async_commit {
if force_sync_commit {
...
} else {
return Ok((TxnStatus::uncommitted(lock, false), None));
}
}
let is_pessimistic_txn = !lock.for_update_ts.is_zero();
if lock.is_pessimistic_lock() {
let check_result = check_txn_status_from_pessimistic_primary_lock(
...
resolving_pessimistic_lock,
)?;
...
} else if lock.ts.physical() + lock.ttl < current_ts.physical() {
let released = rollback_lock(txn, reader, primary_key, &lock, is_pessimistic_txn, true)?;
return Ok((TxnStatus::TtlExpire, released));
}
if !lock.min_commit_ts.is_zero()
&& !caller_start_ts.is_max()
// Push forward the min_commit_ts so that reading won't be blocked by locks.
&& caller_start_ts >= lock.min_commit_ts
{
lock.min_commit_ts = ...
}
Ok((TxnStatus::uncommitted(lock, min_commit_ts_pushed), None))
}
fn check_txn_status_from_pessimistic_primary_lock(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
lock: &Lock,
current_ts: TimeStamp,
resolving_pessimistic_lock: bool,
) -> Result<(Option<TxnStatus>, Option<ReleasedLock>)> {
if lock.is_pessimistic_lock_with_conflict() {
if let Some(txn_status) = check_determined_txn_status(reader, &primary_key)? {
...
let released = txn.unlock_key(primary_key, true, TimeStamp::zero());
return Ok((Some(txn_status), released));
}
}
if lock.ts.physical() + lock.ttl < current_ts.physical() {
return if resolving_pessimistic_lock {
let released = txn.unlock_key(primary_key, true, TimeStamp::zero());
Ok((Some(TxnStatus::PessimisticRollBack), released))
} else {
let released = rollback_lock(txn, reader, primary_key, lock, true, true)?;
Ok((Some(TxnStatus::TtlExpire), released))
};
}
Ok((None, None))
}
CheckSecondaryLocks
CheckSecondaryLocks
接口主要是应用与 Async Commit
所用,用来查看异步 commit
的过程中,通过 primary lock
上面的 secondary
来查看所有的 prewrite lock
,进而分析事务到底是否提交。
/// Check secondary locks of an async commit transaction.
///
/// If all prewritten locks exist, the lock information is returned.
/// Otherwise, it returns the commit timestamp of the transaction.
///
/// If the lock does not exist or is a pessimistic lock, to prevent the
/// status being changed, a rollback may be written.
参数
- keys:事务涉及到被加锁的 keys
- start_ts: 事务的开始 ts
简化代码
-
对每个
key
查询所对应的lock
- 如果通过某一个
key
发现了提交或者回滚记录,那么直接可以break
,返回结果。 - 如果没有记录,也没有找到锁的话,那么就需要回滚,并且是以保护模式下进行回滚,然后
break
,返回结果。 - 否则的话需要遍历所有的
key
,收集lock
信息
- 如果通过某一个
-
如果如预期一样查询到了事务的
lock
,那么就会使用check_status_from_lock
进行进一步检查-
如果
lock
是悲观锁- 和
checkTxnStatus
一样,如果lock
是公平锁冲突加锁的话,需要进一步查看提交、回滚、无记录状态。如果是提交或者回滚状态,那么直接可以终止CheckSecondaryLocks
返回结果。如果是无记录状态的话,可以将其当做普通的悲观锁 - 悲观锁是非预期状态,这个时候需要清理悲观锁,将其当做无记录也没有找到
lock
的场景来看,也就是执行回滚操作,然后终止CheckSecondaryLocks
- 和
-
如果
lock
是prewrite lock
,符合预期,返回锁信息,继续检查其他key
的状态
-
-
如果没有
lock
,或者没有查询到预期事务的lock
,那么就会check_determined_txn_status
进一步查询提交或者回滚的记录
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
let mut released_locks = ReleasedLocks::new();
let mut result = SecondaryLocksStatus::Locked(Vec::new());
for key in self.keys {
let mut released_lock = None;
let mut mismatch_lock = None;
// Checks whether the given secondary lock exists.
let (status, need_rollback, rollback_overlapped_write) = match reader.load_lock(&key)? {
// The lock exists, the lock information is returned.
Some(lock) if lock.ts == self.start_ts => {
let (status, need_rollback, rollback_overlapped_write, lock_released) =
check_status_from_lock(&mut txn, &mut reader, lock, &key, region_id)?;
released_lock = lock_released;
(status, need_rollback, rollback_overlapped_write)
}
// Searches the write CF for the commit record of the lock and returns the commit
// timestamp (0 if the lock is not committed).
l => {
mismatch_lock = l;
check_determined_txn_status(&mut reader, &key)?
}
};
if need_rollback {
if let Some(l) = mismatch_lock {
txn.mark_rollback_on_mismatching_lock(&key, l, true);
}
// We must protect this rollback in case this rollback is collapsed and a stale
// acquire_pessimistic_lock and prewrite succeed again.
if let Some(write) = make_rollback(self.start_ts, true, rollback_overlapped_write) {
txn.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes());
collapse_prev_rollback(&mut txn, &mut reader, &key)?;
}
}
released_locks.push(released_lock);
match status {
SecondaryLockStatus::Locked(lock) => {
result.push(lock.into_lock_info(key.to_raw()?));
}
SecondaryLockStatus::Committed(commit_ts) => {
result = SecondaryLocksStatus::Committed(commit_ts);
break;
}
SecondaryLockStatus::RolledBack => {
result = SecondaryLocksStatus::RolledBack;
break;
}
}
}
...
}
}
fn check_status_from_lock<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut ReaderWithStats<'_, S>,
lock: Lock,
key: &Key,
region_id: u64,
) -> Result<(
SecondaryLockStatus,
bool,
Option<OverlappedWrite>,
Option<ReleasedLock>,
)> {
let mut overlapped_write = None;
if lock.is_pessimistic_lock_with_conflict() {
let (status, need_rollback, rollback_overlapped_write) =
check_determined_txn_status(reader, key)?;
if !need_rollback {
let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero());
return Ok((
...
));
}
overlapped_write = rollback_overlapped_write;
}
if lock.is_pessimistic_lock() {
let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero());
let overlapped_write_res = if lock.is_pessimistic_lock_with_conflict() {
overlapped_write
} else {
reader.get_txn_commit_record(key)?.unwrap_none(region_id)
};
Ok((
...
))
} else {
Ok((SecondaryLockStatus::Locked(lock), false, None, None))
}
}
fn check_determined_txn_status<S: Snapshot>(
reader: &mut ReaderWithStats<'_, S>,
key: &Key,
) -> Result<(SecondaryLockStatus, bool, Option<OverlappedWrite>)> {
match reader.get_txn_commit_record(key)? {
TxnCommitRecord::SingleRecord { commit_ts, write } => {
let status = if write.write_type != WriteType::Rollback {
SecondaryLockStatus::Committed(commit_ts)
} else {
SecondaryLockStatus::RolledBack
};
// We needn't write a rollback once there is a write record for it:
// If it's a committed record, it cannot be changed.
// If it's a rollback record, it either comes from another
// check_secondary_lock (thus protected) or the client stops commit
// actively. So we don't need to make it protected again.
Ok((status, false, None))
}
TxnCommitRecord::OverlappedRollback { .. } => {
Ok((SecondaryLockStatus::RolledBack, false, None))
}
TxnCommitRecord::None { overlapped_write } => {
Ok((SecondaryLockStatus::RolledBack, true, overlapped_write))
}
}
}
ResolveLock
通过 checkTxnStatus
查询到 primary key
的事务状态后,就需要 ResolveLock
对 secondary key
进行提交或者回滚。如果 primary key
已经提交了,那么 ResolveLock
对 secondary key
进行提交。如果 primary key
已经回滚了,那么 ResolveLock
对 secondary key
进行回滚。
参数
- start_ts:事务的开始 ts
- commit_ts: 事务的提交 ts。当需要回滚的时候,该值为 0;否则的话,该值不为 0
- resolve_keys: 需要提交或者回滚的
secondary keys
简化代码
代码非常简单了,直接调用提交或者回滚的函数即可。注意对于 secondary key
来说,回滚是非保护模式的。
impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for ResolveLockLite {
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
for key in self.resolve_keys {
released_locks.push(if !self.commit_ts.is_zero() {
commit(&mut txn, &mut reader, key, self.commit_ts)?
} else {
cleanup(&mut txn, &mut reader, key, TimeStamp::zero(), false)?
});
}
Ok(WriteResult {
...
})
}
}