【是否原创】是
【首发渠道】TiDB 社区
死锁检测 leader
每个 tikv 都会开启死锁检测进程,开启的进程有 leader 和 follow 两种角色可以切换,默认为 follow 角色。
leader 角色:维护锁的 DAG 信息,接受检测请求,计算 DAG 检测是否有锁存在等
follow 角色:通过 GRPC 将检测请求发送给 leader,接受检测结果
死锁检测 的 leader 为:包含 key 为空字符串的 region,也就是集群按 key 排序第一个 region 的 region leader 所在的 store 将成为死锁检测的 leader。
const LEADER_KEY: &[u8] = b"";
fn is_leader_region(region: &Region) -> bool {
// The key range of a new created region is empty which misleads the leader
// of the deadlock detector stepping down.
//
// If the peers of a region is not empty, the region info is complete.
is_region_initialized(region)
&& region.get_start_key() <= LEADER_KEY
&& (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key())
}
通过订阅 region 变化的信息,包括 region 创建、更新、role 角色变更来捕获 leader region 的角色变化,然后同步死锁检测的角色。
pub(crate) fn register(self, host: &mut CoprocessorHost<RocksEngine>) {
host.registry
.register_role_observer(1, BoxRoleObserver::new(self.clone())); // role 变化的时候调用
host.registry
.register_region_change_observer(1, BoxRegionChangeObserver::new(self)); // region 变化的时候改变
}
检测接口函数
死锁检测接口函数,当自己是 leader 时候,直接调用 local 函数接口,如果是 follower,那么通过 grpc 向 leader 查询。
/// Handles detect requests of itself.
/// 处理锁 detect.
fn handle_detect(&mut self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) {
if self.is_leader() {
self.handle_detect_locally(tp, txn_ts, lock);
} else {
for _ in 0..2 {
...
if self.send_request_to_leader(tp, txn_ts, lock) {
return;
}
...
}
...
}
}
从这里接口对应三种查询类型:
- DetectType::Detect,也就是死锁检测的接口,当悲观事务遇到一个锁的时候,就会通过这个接口来检测是否产生了死锁
- DetectType::CleanUpWaitFor,删除事务等待的一个锁,事务对这个锁没有等待了
- DetectType::CleanUp,删除这个事务所有等待的锁,比如事务回滚了,所以就清除这个事务的锁等待信息
fn handle_detect_locally(&self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) {
let detect_table = &mut self.inner.borrow_mut().detect_table; // 原子锁的。
match tp {
DetectType::Detect => { // 检测死锁是否存在。
if let Some(deadlock_key_hash) = detect_table.detect(txn_ts, lock.ts, lock.hash) {
self.waiter_mgr_scheduler
.deadlock(txn_ts, lock, deadlock_key_hash); // 处理死锁吧?
}
}
DetectType::CleanUpWaitFor => { // 清理一个索等待。
detect_table.clean_up_wait_for(txn_ts, lock.ts, lock.hash)
}
DetectType::CleanUp => detect_table.clean_up(txn_ts), // 删除这个事务的锁等待。
}
}
死锁检测算法
当悲观事务过程中,尝试锁定一个 key,发现 key 已经被上锁,这时候会调用死锁检测接口,假设当前事务是 txn_1,持有锁的事务是 txn_lock,检测当事务 txn_1 等待 txn_lock 的锁的情况下,存不存在死锁。
检测算法是构建一个 DAG 有向无环图,如果目前集群存在的锁中存在一条从 txn_lock 到 txn_1 的边,那么就代表死锁将会存在。
/// `Locks` is a set of locks belonging to one transaction.
struct Locks {
ts: TimeStamp, // 事务ts吧。
hashes: Vec<u64>,
last_detect_time: Instant,
}
/// Used to detect the deadlock of wait-for-lock in the cluster.
pub struct DetectTable {
/// Keeps the DAG of wait-for-lock. Every edge from `txn_ts` to `lock_ts` has a survival time -- `ttl`.
/// When checking the deadlock, if the ttl has elpased, the corresponding edge will be removed.
/// `last_detect_time` is the start time of the edge. `Detect` requests will refresh it.
// txn_ts => (lock_ts => Locks)
wait_for_map: HashMap<TimeStamp, HashMap<TimeStamp, Locks>>,
/// The ttl of every edge.
ttl: Duration,
/// The time of last `active_expire`.
last_active_expire: Instant,
now: Instant,
}
其中 wait_for_map 是个两层的 hashMap,第一层 key 是等待锁的事务 txn_ts,第二层 key 是等待的事务 txn_lock,第二层 value 是事务 txn_ts 等待事务 txn_lock 持有的锁列表。
wait_for_map 描述了集群事务锁等待的关系,通过 txn_lock,可以查询出当前事务在等待哪些事务的锁、等待哪些锁。
/// Returns the key hash which causes deadlock.
/// // 检查是否存在死锁。
pub fn detect(&mut self, txn_ts: TimeStamp, lock_ts: TimeStamp, lock_hash: u64) -> Option<u64> {
let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer();
TASK_COUNTER_METRICS.detect.inc();
self.now = Instant::now_coarse();
self.active_expire(); // 清理过期的。
// If `txn_ts` is waiting for `lock_ts`, it won't cause deadlock.
// 已经有 txn_tx 等待 lock_ts,那么就不会存在 lock_ts 等待 txn_ts,也就是不会存在死锁。
if self.register_if_existed(txn_ts, lock_ts, lock_hash) {
return None;
}
if let Some(deadlock_key_hash) = self.do_detect(txn_ts, lock_ts) {
ERROR_COUNTER_METRICS.deadlock.inc();
return Some(deadlock_key_hash); // 存在这个死锁。
}
self.register(txn_ts, lock_ts, lock_hash);
None
}
算法流程:
- 清理过期锁(一般很少走这里,只有等待的事务数量达到100000,且距离上次清理达到1个小时才会执行)
- 检查是否存在 txn_ts 在等待事务 txn_lock 的锁,如果已经存在,那么必然不存在 txn_lock 到 txn_ts 的边,必然不会有死锁,那么加入新的锁,返回
- 调用 do_detect 函数,遍历构建所有 DAG 检查是否有存在 txn_lock 到 txn_ts 的边,如果存在那么死锁就存在
- 如果没有死锁存在,那么说明 txn_ts 等待 txn_lock 不会产生死锁,把 txn_ts 等待 txn_lock 的锁信息添加进去
/// Checks if there is an edge from `wait_for_ts` to `txn_ts`.
/// 检查有没有从 wait_for_ts 到 txn_tx 的锁。
fn do_detect(&mut self, txn_ts: TimeStamp, wait_for_ts: TimeStamp) -> Option<u64> {
let now = self.now;
let ttl = self.ttl;
let mut stack = vec![wait_for_ts];
// Memorize the pushed vertexes to avoid duplicate search.
let mut pushed: HashSet<TimeStamp> = HashSet::default();
pushed.insert(wait_for_ts);
while let Some(wait_for_ts) = stack.pop() {
if let Some(wait_for) = self.wait_for_map.get_mut(&wait_for_ts) {
// Remove expired edges.
wait_for.retain(|_, locks| !locks.is_expired(now, ttl)); // 清理过期的。
if wait_for.is_empty() {
self.wait_for_map.remove(&wait_for_ts); // 清理掉。
} else {
for (lock_ts, locks) in wait_for {
if *lock_ts == txn_ts {
return Some(locks.hashes[0]);
}
if !pushed.contains(lock_ts) {
stack.push(*lock_ts);
pushed.insert(*lock_ts);
}
}
}
}
}
None
}
do_detect 函数构建 DAG 遍历所有从 wait_for_ts(txn_lock)出发的可能,检查有没有到 txn_ts 的边,如果有,那么返回一个存在的锁的 hash,告诉死锁的存在。
唤醒锁等待
死锁检测中维护了从事务出发可以找到所有等待的锁的信息,当锁被释放、超时、死锁存在情况下,需要唤醒等待锁的事务,这里就需要根据锁 id 找到等待的事务,进行唤醒操作。
锁等待信息
/// If a pessimistic transaction meets a lock, it will wait for the lock
/// released in `WaiterManager`.
///
/// `Waiter` contains the context of the pessimistic transaction. Each `Waiter`
/// has a timeout. Transaction will be notified when the lock is released
/// or the corresponding waiter times out.
pub(crate) struct Waiter {
pub(crate) start_ts: TimeStamp,
pub(crate) cb: StorageCallback,
/// The result of `Command::AcquirePessimisticLock`.
///
/// It contains a `KeyIsLocked` error at the beginning. It will be changed
/// to `WriteConflict` error if the lock is released or `Deadlock` error if
/// it causes deadlock.
pub(crate) pr: ProcessResult,
pub(crate) lock: Lock,
delay: Delay,
_lifetime_timer: HistogramTimer,
}
- start_ts: 代表等待锁的事务 ts
- lock: 代表等待的锁
- pr: 代表等待的锁的结果,例如锁冲突、死锁等
- delay: 等待超时时间
- cb: 回调函数,唤醒函数,把锁等待结果 pr 返回给等待锁的事务的钩子函数
// NOTE: Now we assume `Waiters` is not very long.
// Maybe needs to use `BinaryHeap` or sorted `VecDeque` instead.
type Waiters = Vec<Waiter>;
struct WaitTable {
// Map lock hash to waiters.
wait_table: HashMap<u64, Waiters>,
waiter_count: Arc<AtomicUsize>,
}
wait_table 维护了等待某个锁的所有事务列表,key 为锁的 hashId,value 是等待这个锁的所有 Waiter。
至此,当某个 key 上的锁被释放时候,根据锁的 hash ID 查找到所有的 Waiter,选择等待时间最早的事务进行直接唤醒。
锁唤醒
当事务提交或者回滚以后,事务持有的锁将会被释放,事务持有的每一个锁,都会对其 Waiter 进行唤醒操作(只唤醒等待锁最久的 Waiter)。
fn handle_wake_up(&mut self, lock_ts: TimeStamp, hashes: Vec<u64>, commit_ts: TimeStamp) {
...
for hash in hashes { // 对于事务的每一个锁都进行唤醒操作
let lock = Lock { ts: lock_ts, hash };
// 找到最老的 waiter 进行唤醒
if let Some((mut oldest, others)) = wait_table.remove_oldest_waiter(lock) {
// Notify the oldest one immediately.
self.detector_scheduler
.clean_up_wait_for(oldest.start_ts, oldest.lock);
oldest.conflict_with(lock_ts, commit_ts);
oldest.notify();
// Others will be waked up after `wake_up_delay_duration`.
//
// NOTE: Actually these waiters are waiting for an unknown transaction.
// If there is a deadlock between them, it will be detected after timeout.
if others.is_empty() {
// Remove the empty entry here.
wait_table.remove(lock);
} else {
others.iter_mut().for_each(|waiter| {
waiter.conflict_with(lock_ts, commit_ts);
waiter.reset_timeout(new_timeout);
});
}
}
}
}
参数 lock_ts 代表是有锁的事务,hashes 代表持有的锁信息。
- 找到等待时间最久的 Waiter,从 Waiter 列表中删除
- 删除死锁检测维护的 txn_ts 到 txn_lock 的锁等待信息
- 构建唤醒的 pr 结果,调用唤醒函数
- 对于其他的 waiter(除了等待最久剩余的),构建唤醒的 pr,通过等待超时方式唤醒
锁管理接口
/// `LockManager` has two components working in two threads:
/// * One is the `WaiterManager` which manages transactions waiting for locks.
/// * The other one is the `Detector` which detects deadlocks between transactions.
pub struct LockManager {
waiter_mgr_worker: Option<FutureWorker<waiter_manager::Task>>,
detector_worker: Option<FutureWorker<deadlock::Task>>,
waiter_mgr_scheduler: WaiterMgrScheduler,
detector_scheduler: DetectorScheduler,
waiter_count: Arc<AtomicUsize>,
/// Record transactions which have sent requests to detect deadlock.
detected: Arc<[CachePadded<Mutex<HashSet<TimeStamp>>>]>,
pipelined: Arc<AtomicBool>,
}
impl LockManagerTrait for LockManager {
fn wait_for(
&self,
start_ts: TimeStamp,
cb: StorageCallback,
pr: ProcessResult,
lock: Lock,
is_first_lock: bool,
timeout: Option<WaitTimeout>,
) {
let timeout = match timeout {
Some(t) => t,
None => {
cb.execute(pr);
return;
}
};
// Increase `waiter_count` here to prevent there is an on-the-fly WaitFor msg
// but the waiter_mgr haven't processed it, subsequent WakeUp msgs may be lost.
self.waiter_count.fetch_add(1, Ordering::SeqCst);
self.waiter_mgr_scheduler
.wait_for(start_ts, cb, pr, lock, timeout);
// If it is the first lock the transaction tries to lock, it won't cause deadlock.
if !is_first_lock { // 不是第一个锁的时候,不检测?问题是不加入这个锁信息。那这个锁等待不会被以后的锁检查
self.add_to_detected(start_ts);
self.detector_scheduler.detect(start_ts, lock); // 这里检测一下。
}
}
fn wake_up(
&self,
lock_ts: TimeStamp,
hashes: Vec<u64>,
commit_ts: TimeStamp,
is_pessimistic_txn: bool,
) {
// If `hashes` is some, there may be some waiters waiting for these locks.
// Try to wake up them.
if !hashes.is_empty() && self.has_waiter() {
self.waiter_mgr_scheduler
.wake_up(lock_ts, hashes, commit_ts);
}
// If a pessimistic transaction is committed or rolled back and it once sent requests to
// detect deadlock, clean up its wait-for entries in the deadlock detector.
if is_pessimistic_txn && self.remove_from_detected(lock_ts) {
self.detector_scheduler.clean_up(lock_ts);
}
}
fn has_waiter(&self) -> bool {
self.waiter_count.load(Ordering::SeqCst) > 0
}
}