前言
本篇文章将会对锁冲突场景下,常用的解决锁冲突的接口,进行个人对代码的理解与解析,希望对大家理解 TIKV 分布式事务有所帮助
CheckTxnStatus
这个接口的作用比较明显,主要是事务中,遇到锁冲突 (一般是悲观事务过程中加悲观锁,或者 prewrite 过程中加的锁),从锁信息中获取到其 Primary KEY ,进而通过这个接口来看 primary key 的当前事务状态。事务状态可能是已经提交,已经回滚,或者还在事务中。
特别的,如果发现 Primary KEY 的锁已经过期了,CheckTxnStatus 还会主动将其进行保护模式的回滚。
参数
假如目前有两个并发事务,t1 和 t2
t1 在事务过程中,发现了 t2 的锁,因此 t1 调用了 CheckTxnStatus 接口来查看 t2 当前的状态
primary_key: 需要查看的t2主键KEYlock_ts:t2的start_tscaller_start_ts:t1的start_tscurrent_ts: 当前的tsrollback_if_not_exist: 如果没有发现任何提交记录或者回滚记录的时候,是直接回滚还是返回错误-
force_sync_commit:async_commit的场景下,是否强制推进async_commit进程,否则返回uncommitted -
resolving_pessimistic_lock:false代表本意是想解析prewrite lock,true代表本意是想解析悲观锁。 -
verify_is_primary: 验证主键上面的锁是主键锁 ( issue 42937 ),目前默认开启该校验功能
简化代码
-
和其他接口一样,首先需要获取
Primary的锁信息 -
如果找到的锁是 符合预期 的
t2的锁,那么调用check_txn_status_lock_exists-
首先需要校验这个
lock的合法性:如果verify_is_primary参数是true,结果发现这个锁信息中的primary key和请求参数的primary对应不上,那么需要返回错误PrimaryMismatch,这种情况可能是primary key被替换了。Corner Case:但是存在一个特殊情况 (lock是悲观锁 &&resolving_pessimistic_lock是false)。就是说本来想要解析的是prewrite lock,结果发现是悲观锁,而且锁的primary主键key还对应不上,这种场景下会网开一面并不会报错,而是会清理悲观锁,并且使用check_txn_status_missing_lock来进行进一步查看事务状态。这种场景的出现可能是因为悲观事务的acquire_pessimistic_lock接口被stale调用导致的
-
如果是
use_async_commit类型的lock,非强制模式下 (force_sync_commit为false),直接返回Ok(TxnStatus::uncommitted)后续可能会进行重试。否则的话,继续执行 -
如果
lock是prewrite lock的话,是符合预期的进一步判断lock的过期时间- 如果已经过期,那么直接回滚,返回
TxnStatus::TtlExpire。(这里的回滚好像是非保护模式的?按理说应该对primary lock进行保护模式的回滚,笔者比较疑惑) - 如果还未过期,那么更新
lock的min_commit_ts,并且返回TxnStatus::uncommitted
- 如果已经过期,那么直接回滚,返回
-
如果
lock是悲观锁的话,需要使用check_txn_status_from_pessimistic_primary_lock进一步处理-
特别地,如果悲观锁信息显示该
lock是通过公平锁功能写入的,那么这个lock需要进一步进行检查验证,防止 issue 43540 ,进一步查看该lock对应的事务是已经提交或者回滚,防止其是stale的lock。如果确实没有任何提交记录或者回滚记录,那么可以才可以认为该悲观锁是可用的有效的。否则的话,直接清楚该悲观锁,返回事务状态即可。 -
如果悲观锁已经过期
- 如果预期检查的
lock就是悲观锁,那么只需要清理悲观锁,返回Ok(TxnStatus::PessimisticRollBack)即可,无需回滚 - 如果预期检查的
lock是prewrite lock,我们就需要清理悲观锁的同时,还需要留下回滚的记录 (非保护模式下,笔者目前不太了解为何还是非保护模式)
- 如果预期检查的
-
如果悲观锁未过期,那么更新
lock的min_commit_ts,并且返回TxnStatus::uncommitted
-
-
-
如果没有找到锁,或者找到的锁是不是符合预期的
t2的锁,属于非预期场景,那么调用check_txn_status_missing_lock
check_txn_status_missing_lock 这个函数我们应该很熟悉了,这个函数在 rollback、cleanup 函数中也会被调用,但是由于 MissingLockAction 的不同,逻辑稍微有些变化:
-
如果发现有本事务的
OverlappedRollback的记录或者回滚记录 (SingleRecord::Rollback),说明已经回滚完成,直接返回OK -
如果发现有本事务提交记录的话,返回
ErrorInner::Committed -
如果没有找到任何本事务
write记录的话,属于非预期场景-
如果
rollback_if_not_exist为false,那么直接返回ErrorInner::TxnNotFound -
如果
resolving_pessimistic_lock参数为true的话,就是说目标是解析悲观锁,结果并没有发现该锁,这时候会返回Ok(TxnStatus::LockNotExistDoNothing) -
如果
rollback_if_not_exist为true,那么需要进行保护模式的回滚操作:- 调用
mark_rollback_on_mismatching_lock在这个LOCK上面添加回滚LockTS标记,这样这个lock所涉及的事务在提交后,如果发现自己的commitTS和LockTS重叠的话,需要设置一下overlap标记 - 调用
make_rollback写入保护模式的rollback记录,确保这个回滚记录不会被删除 - 删除
collapse以前的非保护rollback记录
- 调用
-
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
let (txn_status, released) = match reader.load_lock(&self.primary_key)? {
Some(lock) if lock.ts == self.lock_ts => check_txn_status_lock_exists(
&mut txn,
&mut reader,
self.primary_key,
lock,
self.current_ts,
self.caller_start_ts,
self.force_sync_commit,
self.resolving_pessimistic_lock,
self.verify_is_primary,
self.rollback_if_not_exist,
)?,
l => (
check_txn_status_missing_lock(
&mut txn,
&mut reader,
self.primary_key,
l,
MissingLockAction::rollback(self.rollback_if_not_exist),
self.resolving_pessimistic_lock,
)?,
None,
),
};
...
Ok(WriteResult {
...
})
}
pub fn check_txn_status_lock_exists(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
mut lock: Lock,
current_ts: TimeStamp,
caller_start_ts: TimeStamp,
force_sync_commit: bool,
resolving_pessimistic_lock: bool,
verify_is_primary: bool,
rollback_if_not_exist: bool,
) -> Result<(TxnStatus, Option<ReleasedLock>)> {
if verify_is_primary && !primary_key.is_encoded_from(&lock.primary) {
return match (resolving_pessimistic_lock, lock.is_pessimistic_lock()) {
(false, true) => {
...
let txn_status = check_txn_status_missing_lock(
...
MissingLockAction::rollback(rollback_if_not_exist),
resolving_pessimistic_lock,
)?;
Ok((txn_status, released))
}
_ => {
Err(
ErrorInner::PrimaryMismatch... )
}
};
}
// Never rollback or push forward min_commit_ts in check_txn_status if it's
// using async commit. Rollback of async-commit locks are done during
// ResolveLock.
if lock.use_async_commit {
if force_sync_commit {
...
} else {
return Ok((TxnStatus::uncommitted(lock, false), None));
}
}
let is_pessimistic_txn = !lock.for_update_ts.is_zero();
if lock.is_pessimistic_lock() {
let check_result = check_txn_status_from_pessimistic_primary_lock(
...
resolving_pessimistic_lock,
)?;
...
} else if lock.ts.physical() + lock.ttl < current_ts.physical() {
let released = rollback_lock(txn, reader, primary_key, &lock, is_pessimistic_txn, true)?;
return Ok((TxnStatus::TtlExpire, released));
}
if !lock.min_commit_ts.is_zero()
&& !caller_start_ts.is_max()
// Push forward the min_commit_ts so that reading won't be blocked by locks.
&& caller_start_ts >= lock.min_commit_ts
{
lock.min_commit_ts = ...
}
Ok((TxnStatus::uncommitted(lock, min_commit_ts_pushed), None))
}
fn check_txn_status_from_pessimistic_primary_lock(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<impl Snapshot>,
primary_key: Key,
lock: &Lock,
current_ts: TimeStamp,
resolving_pessimistic_lock: bool,
) -> Result<(Option<TxnStatus>, Option<ReleasedLock>)> {
if lock.is_pessimistic_lock_with_conflict() {
if let Some(txn_status) = check_determined_txn_status(reader, &primary_key)? {
...
let released = txn.unlock_key(primary_key, true, TimeStamp::zero());
return Ok((Some(txn_status), released));
}
}
if lock.ts.physical() + lock.ttl < current_ts.physical() {
return if resolving_pessimistic_lock {
let released = txn.unlock_key(primary_key, true, TimeStamp::zero());
Ok((Some(TxnStatus::PessimisticRollBack), released))
} else {
let released = rollback_lock(txn, reader, primary_key, lock, true, true)?;
Ok((Some(TxnStatus::TtlExpire), released))
};
}
Ok((None, None))
}
CheckSecondaryLocks
CheckSecondaryLocks 接口主要是应用与 Async Commit 所用,用来查看异步 commit 的过程中,通过 primary lock 上面的 secondary 来查看所有的 prewrite lock,进而分析事务到底是否提交。
/// Check secondary locks of an async commit transaction.
///
/// If all prewritten locks exist, the lock information is returned.
/// Otherwise, it returns the commit timestamp of the transaction.
///
/// If the lock does not exist or is a pessimistic lock, to prevent the
/// status being changed, a rollback may be written.
参数
- keys:事务涉及到被加锁的 keys
- start_ts: 事务的开始 ts
简化代码
-
对每个
key查询所对应的lock- 如果通过某一个
key发现了提交或者回滚记录,那么直接可以break,返回结果。 - 如果没有记录,也没有找到锁的话,那么就需要回滚,并且是以保护模式下进行回滚,然后
break,返回结果。 - 否则的话需要遍历所有的
key,收集lock信息
- 如果通过某一个
-
如果如预期一样查询到了事务的
lock,那么就会使用check_status_from_lock进行进一步检查-
如果
lock是悲观锁- 和
checkTxnStatus一样,如果lock是公平锁冲突加锁的话,需要进一步查看提交、回滚、无记录状态。如果是提交或者回滚状态,那么直接可以终止CheckSecondaryLocks返回结果。如果是无记录状态的话,可以将其当做普通的悲观锁 - 悲观锁是非预期状态,这个时候需要清理悲观锁,将其当做无记录也没有找到
lock的场景来看,也就是执行回滚操作,然后终止CheckSecondaryLocks
- 和
-
如果
lock是prewrite lock,符合预期,返回锁信息,继续检查其他key的状态
-
-
如果没有
lock,或者没有查询到预期事务的lock,那么就会check_determined_txn_status进一步查询提交或者回滚的记录
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
let mut released_locks = ReleasedLocks::new();
let mut result = SecondaryLocksStatus::Locked(Vec::new());
for key in self.keys {
let mut released_lock = None;
let mut mismatch_lock = None;
// Checks whether the given secondary lock exists.
let (status, need_rollback, rollback_overlapped_write) = match reader.load_lock(&key)? {
// The lock exists, the lock information is returned.
Some(lock) if lock.ts == self.start_ts => {
let (status, need_rollback, rollback_overlapped_write, lock_released) =
check_status_from_lock(&mut txn, &mut reader, lock, &key, region_id)?;
released_lock = lock_released;
(status, need_rollback, rollback_overlapped_write)
}
// Searches the write CF for the commit record of the lock and returns the commit
// timestamp (0 if the lock is not committed).
l => {
mismatch_lock = l;
check_determined_txn_status(&mut reader, &key)?
}
};
if need_rollback {
if let Some(l) = mismatch_lock {
txn.mark_rollback_on_mismatching_lock(&key, l, true);
}
// We must protect this rollback in case this rollback is collapsed and a stale
// acquire_pessimistic_lock and prewrite succeed again.
if let Some(write) = make_rollback(self.start_ts, true, rollback_overlapped_write) {
txn.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes());
collapse_prev_rollback(&mut txn, &mut reader, &key)?;
}
}
released_locks.push(released_lock);
match status {
SecondaryLockStatus::Locked(lock) => {
result.push(lock.into_lock_info(key.to_raw()?));
}
SecondaryLockStatus::Committed(commit_ts) => {
result = SecondaryLocksStatus::Committed(commit_ts);
break;
}
SecondaryLockStatus::RolledBack => {
result = SecondaryLocksStatus::RolledBack;
break;
}
}
}
...
}
}
fn check_status_from_lock<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut ReaderWithStats<'_, S>,
lock: Lock,
key: &Key,
region_id: u64,
) -> Result<(
SecondaryLockStatus,
bool,
Option<OverlappedWrite>,
Option<ReleasedLock>,
)> {
let mut overlapped_write = None;
if lock.is_pessimistic_lock_with_conflict() {
let (status, need_rollback, rollback_overlapped_write) =
check_determined_txn_status(reader, key)?;
if !need_rollback {
let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero());
return Ok((
...
));
}
overlapped_write = rollback_overlapped_write;
}
if lock.is_pessimistic_lock() {
let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero());
let overlapped_write_res = if lock.is_pessimistic_lock_with_conflict() {
overlapped_write
} else {
reader.get_txn_commit_record(key)?.unwrap_none(region_id)
};
Ok((
...
))
} else {
Ok((SecondaryLockStatus::Locked(lock), false, None, None))
}
}
fn check_determined_txn_status<S: Snapshot>(
reader: &mut ReaderWithStats<'_, S>,
key: &Key,
) -> Result<(SecondaryLockStatus, bool, Option<OverlappedWrite>)> {
match reader.get_txn_commit_record(key)? {
TxnCommitRecord::SingleRecord { commit_ts, write } => {
let status = if write.write_type != WriteType::Rollback {
SecondaryLockStatus::Committed(commit_ts)
} else {
SecondaryLockStatus::RolledBack
};
// We needn't write a rollback once there is a write record for it:
// If it's a committed record, it cannot be changed.
// If it's a rollback record, it either comes from another
// check_secondary_lock (thus protected) or the client stops commit
// actively. So we don't need to make it protected again.
Ok((status, false, None))
}
TxnCommitRecord::OverlappedRollback { .. } => {
Ok((SecondaryLockStatus::RolledBack, false, None))
}
TxnCommitRecord::None { overlapped_write } => {
Ok((SecondaryLockStatus::RolledBack, true, overlapped_write))
}
}
}
ResolveLock
通过 checkTxnStatus 查询到 primary key 的事务状态后,就需要 ResolveLock 对 secondary key 进行提交或者回滚。如果 primary key 已经提交了,那么 ResolveLock 对 secondary key 进行提交。如果 primary key 已经回滚了,那么 ResolveLock 对 secondary key 进行回滚。
参数
- start_ts:事务的开始 ts
- commit_ts: 事务的提交 ts。当需要回滚的时候,该值为 0;否则的话,该值不为 0
- resolve_keys: 需要提交或者回滚的
secondary keys
简化代码
代码非常简单了,直接调用提交或者回滚的函数即可。注意对于 secondary key 来说,回滚是非保护模式的。
impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for ResolveLockLite {
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> {
...
for key in self.resolve_keys {
released_locks.push(if !self.commit_ts.is_zero() {
commit(&mut txn, &mut reader, key, self.commit_ts)?
} else {
cleanup(&mut txn, &mut reader, key, TimeStamp::zero(), false)?
});
}
Ok(WriteResult {
...
})
}
}