暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

TiDB 悲观事务死锁检测

2021-11-18
446

作者:jiyf

原文来源:https://tidb.net/blog/9722d003

【是否原创】是\ 【首发渠道】TiDB 社区

死锁检测 leader

每个 tikv 都会开启死锁检测进程,开启的进程有 leader 和 follow 两种角色可以切换,默认为 follow 角色。

leader 角色:维护锁的 DAG 信息,接受检测请求,计算 DAG 检测是否有锁存在等

follow 角色:通过 GRPC 将检测请求发送给 leader,接受检测结果

死锁检测 的 leader 为:包含 key 为空字符串的 region,也就是集群按 key 排序第一个 region 的 region leader 所在的 store 将成为死锁检测的 leader。

const LEADER_KEY: &[u8] = b""; fn is_leader_region(region: &Region) -> bool { // The key range of a new created region is empty which misleads the leader // of the deadlock detector stepping down. // // If the peers of a region is not empty, the region info is complete. is_region_initialized(region) && region.get_start_key() <= LEADER_KEY && (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key()) }

通过订阅 region 变化的信息,包括 region 创建、更新、role 角色变更来捕获 leader region 的角色变化,然后同步死锁检测的角色。

pub(crate) fn register(self, host: &mut CoprocessorHost<RocksEngine>) { host.registry .register_role_observer(1, BoxRoleObserver::new(self.clone())); // role 变化的时候调用 host.registry .register_region_change_observer(1, BoxRegionChangeObserver::new(self)); // region 变化的时候改变 }

检测接口函数

死锁检测接口函数,当自己是 leader 时候,直接调用 local 函数接口,如果是 follower,那么通过 grpc 向 leader 查询。

/// Handles detect requests of itself. /// 处理锁 detect. fn handle_detect(&mut self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) { if self.is_leader() { self.handle_detect_locally(tp, txn_ts, lock); } else { for _ in 0..2 { ... if self.send_request_to_leader(tp, txn_ts, lock) { return; } ... } ... } }

从这里接口对应三种查询类型:

  • DetectType::Detect,也就是死锁检测的接口,当悲观事务遇到一个锁的时候,就会通过这个接口来检测是否产生了死锁
  • DetectType::CleanUpWaitFor,删除事务等待的一个锁,事务对这个锁没有等待了
  • DetectType::CleanUp,删除这个事务所有等待的锁,比如事务回滚了,所以就清除这个事务的锁等待信息

fn handle_detect_locally(&self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) { let detect_table = &mut self.inner.borrow_mut().detect_table; // 原子锁的。 match tp { DetectType::Detect => { // 检测死锁是否存在。 if let Some(deadlock_key_hash) = detect_table.detect(txn_ts, lock.ts, lock.hash) { self.waiter_mgr_scheduler .deadlock(txn_ts, lock, deadlock_key_hash); // 处理死锁吧? } } DetectType::CleanUpWaitFor => { // 清理一个索等待。 detect_table.clean_up_wait_for(txn_ts, lock.ts, lock.hash) } DetectType::CleanUp => detect_table.clean_up(txn_ts), // 删除这个事务的锁等待。 } }

死锁检测算法

当悲观事务过程中,尝试锁定一个 key,发现 key 已经被上锁,这时候会调用死锁检测接口,假设当前事务是 txn_1,持有锁的事务是 txn_lock,检测当事务 txn_1 等待 txn_lock 的锁的情况下,存不存在死锁。

检测算法是构建一个 DAG 有向无环图,如果目前集群存在的锁中存在一条从 txn_lock 到 txn_1 的边,那么就代表死锁将会存在。

`` ///Locks` is a set of locks belonging to one transaction. struct Locks { ts: TimeStamp, // 事务ts吧。 hashes: Vec, last_detect_time: Instant, }

/// Used to detect the deadlock of wait-for-lock in the cluster. pub struct DetectTable { /// Keeps the DAG of wait-for-lock. Every edge from txn_ts to lock_ts has a survival time -- ttl. /// When checking the deadlock, if the ttl has elpased, the corresponding edge will be removed. /// last_detect_time is the start time of the edge. Detect requests will refresh it. // txn_ts => (lock_ts => Locks) wait_for_map: HashMap>,

/// The ttl of every edge.
ttl: Duration,

/// The time of last `active_expire`.
last_active_expire: Instant,

now: Instant,

} 其中 wait_for_map 是个两层的 hashMap,第一层 key 是等待锁的事务 txn_ts,第二层 key 是等待的事务 txn_lock,第二层 value 是事务 txn_ts 等待事务 txn_lock 持有的锁列表。 ```

wait_for_map 描述了集群事务锁等待的关系,通过 txn_lock,可以查询出当前事务在等待哪些事务的锁、等待哪些锁。

``` /// Returns the key hash which causes deadlock. /// // 检查是否存在死锁。 pub fn detect(&mut self, txn_ts: TimeStamp, lock_ts: TimeStamp, lock_hash: u64) -> Option { let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer(); TASK_COUNTER_METRICS.detect.inc();

    self.now = Instant::now_coarse();
    self.active_expire();    // 清理过期的。

    // If `txn_ts` is waiting for `lock_ts`, it won't cause deadlock.
    // 已经有 txn_tx 等待 lock_ts,那么就不会存在 lock_ts 等待 txn_ts,也就是不会存在死锁。
    if self.register_if_existed(txn_ts, lock_ts, lock_hash) {
        return None;
    }

    if let Some(deadlock_key_hash) = self.do_detect(txn_ts, lock_ts) {
        ERROR_COUNTER_METRICS.deadlock.inc();
        return Some(deadlock_key_hash);    // 存在这个死锁。
    }
    self.register(txn_ts, lock_ts, lock_hash);
    None
}

```

算法流程:

  1. 清理过期锁(一般很少走这里,只有等待的事务数量达到100000,且距离上次清理达到1个小时才会执行)
  2. 检查是否存在 txn_ts 在等待事务 txn_lock 的锁,如果已经存在,那么必然不存在 txn_lock 到 txn_ts 的边,必然不会有死锁,那么加入新的锁,返回
  3. 调用 do_detect 函数,遍历构建所有 DAG 检查是否有存在 txn_lock 到 txn_ts 的边,如果存在那么死锁就存在
  4. 如果没有死锁存在,那么说明 txn_ts 等待 txn_lock 不会产生死锁,把 txn_ts 等待 txn_lock 的锁信息添加进去

`` /// Checks if there is an edge fromwait_for_tstotxn_ts`. /// 检查有没有从 wait_for_ts 到 txn_tx 的锁。 fn do_detect(&mut self, txn_ts: TimeStamp, wait_for_ts: TimeStamp) -> Option { let now = self.now; let ttl = self.ttl;

    let mut stack = vec![wait_for_ts];
    // Memorize the pushed vertexes to avoid duplicate search.
    let mut pushed: HashSet<TimeStamp> = HashSet::default();
    pushed.insert(wait_for_ts);
    while let Some(wait_for_ts) = stack.pop() {
        if let Some(wait_for) = self.wait_for_map.get_mut(&wait_for_ts) {
            // Remove expired edges.
            wait_for.retain(|_, locks| !locks.is_expired(now, ttl));    // 清理过期的。
            if wait_for.is_empty() {
                self.wait_for_map.remove(&wait_for_ts);    // 清理掉。
            } else {
                for (lock_ts, locks) in wait_for {
                    if *lock_ts == txn_ts {
                        return Some(locks.hashes[0]);
                    }
                    if !pushed.contains(lock_ts) {
                        stack.push(*lock_ts);
                        pushed.insert(*lock_ts);
                    }
                }
            }
        }
    }
    None
}

```

do_detect 函数构建 DAG 遍历所有从 wait_for_ts(txn_lock)出发的可能,检查有没有到 txn_ts 的边,如果有,那么返回一个存在的锁的 hash,告诉死锁的存在。

唤醒锁等待

死锁检测中维护了从事务出发可以找到所有等待的锁的信息,当锁被释放、超时、死锁存在情况下,需要唤醒等待锁的事务,这里就需要根据锁 id 找到等待的事务,进行唤醒操作。

锁等待信息

/// If a pessimistic transaction meets a lock, it will wait for the lock /// released in `WaiterManager`. /// /// `Waiter` contains the context of the pessimistic transaction. Each `Waiter` /// has a timeout. Transaction will be notified when the lock is released /// or the corresponding waiter times out. pub(crate) struct Waiter { pub(crate) start_ts: TimeStamp, pub(crate) cb: StorageCallback, /// The result of `Command::AcquirePessimisticLock`. /// /// It contains a `KeyIsLocked` error at the beginning. It will be changed /// to `WriteConflict` error if the lock is released or `Deadlock` error if /// it causes deadlock. pub(crate) pr: ProcessResult, pub(crate) lock: Lock, delay: Delay, _lifetime_timer: HistogramTimer, }

  • start_ts: 代表等待锁的事务 ts
  • lock: 代表等待的锁
  • pr: 代表等待的锁的结果,例如锁冲突、死锁等
  • delay: 等待超时时间
  • cb: 回调函数,唤醒函数,把锁等待结果 pr 返回给等待锁的事务的钩子函数

`` // NOTE: Now we assumeWaitersis not very long. // Maybe needs to useBinaryHeapor sortedVecDeque` instead. type Waiters = Vec;

struct WaitTable { // Map lock hash to waiters. wait_table: HashMap, waiter_count: Arc, } ```

wait_table 维护了等待某个锁的所有事务列表,key 为锁的 hashId,value 是等待这个锁的所有 Waiter。

至此,当某个 key 上的锁被释放时候,根据锁的 hash ID 查找到所有的 Waiter,选择等待时间最早的事务进行直接唤醒。

锁唤醒

当事务提交或者回滚以后,事务持有的锁将会被释放,事务持有的每一个锁,都会对其 Waiter 进行唤醒操作(只唤醒等待锁最久的 Waiter)。

fn handle_wake_up(&mut self, lock_ts: TimeStamp, hashes: Vec<u64>, commit_ts: TimeStamp) { ... for hash in hashes { // 对于事务的每一个锁都进行唤醒操作 let lock = Lock { ts: lock_ts, hash }; // 找到最老的 waiter 进行唤醒 if let Some((mut oldest, others)) = wait_table.remove_oldest_waiter(lock) { // Notify the oldest one immediately. self.detector_scheduler .clean_up_wait_for(oldest.start_ts, oldest.lock); oldest.conflict_with(lock_ts, commit_ts); oldest.notify(); // Others will be waked up after `wake_up_delay_duration`. // // NOTE: Actually these waiters are waiting for an unknown transaction. // If there is a deadlock between them, it will be detected after timeout. if others.is_empty() { // Remove the empty entry here. wait_table.remove(lock); } else { others.iter_mut().for_each(|waiter| { waiter.conflict_with(lock_ts, commit_ts); waiter.reset_timeout(new_timeout); }); } } } }

参数 lock_ts 代表是有锁的事务,hashes 代表持有的锁信息。

  1. 找到等待时间最久的 Waiter,从 Waiter 列表中删除
  2. 删除死锁检测维护的 txn_ts 到 txn_lock 的锁等待信息
  3. 构建唤醒的 pr 结果,调用唤醒函数
  4. 对于其他的 waiter(除了等待最久剩余的),构建唤醒的 pr,通过等待超时方式唤醒

锁管理接口

`` ///LockManagerhas two components working in two threads: /// * One is theWaiterManagerwhich manages transactions waiting for locks. /// * The other one is theDetector` which detects deadlocks between transactions. pub struct LockManager { waiter_mgr_worker: Option>, detector_worker: Option>,

waiter_mgr_scheduler: WaiterMgrScheduler,
detector_scheduler: DetectorScheduler,

waiter_count: Arc<AtomicUsize>,

/// Record transactions which have sent requests to detect deadlock.
detected: Arc<[CachePadded<Mutex<HashSet<TimeStamp>>>]>,

pipelined: Arc<AtomicBool>,

}

impl LockManagerTrait for LockManager { fn wait_for( &self, start_ts: TimeStamp, cb: StorageCallback, pr: ProcessResult, lock: Lock, is_first_lock: bool, timeout: Option, ) { let timeout = match timeout { Some(t) => t, None => { cb.execute(pr); return; } };

    // Increase `waiter_count` here to prevent there is an on-the-fly WaitFor msg
    // but the waiter_mgr haven't processed it, subsequent WakeUp msgs may be lost.
    self.waiter_count.fetch_add(1, Ordering::SeqCst);
    self.waiter_mgr_scheduler
        .wait_for(start_ts, cb, pr, lock, timeout);

    // If it is the first lock the transaction tries to lock, it won't cause deadlock.
    if !is_first_lock {    // 不是第一个锁的时候,不检测?问题是不加入这个锁信息。那这个锁等待不会被以后的锁检查
        self.add_to_detected(start_ts);
        self.detector_scheduler.detect(start_ts, lock);    // 这里检测一下。
    }
}

fn wake_up(
    &self,
    lock_ts: TimeStamp,
    hashes: Vec<u64>,
    commit_ts: TimeStamp,
    is_pessimistic_txn: bool,
) {
    // If `hashes` is some, there may be some waiters waiting for these locks.
    // Try to wake up them.
    if !hashes.is_empty() && self.has_waiter() {
        self.waiter_mgr_scheduler
            .wake_up(lock_ts, hashes, commit_ts);
    }
    // If a pessimistic transaction is committed or rolled back and it once sent requests to
    // detect deadlock, clean up its wait-for entries in the deadlock detector.
    if is_pessimistic_txn && self.remove_from_detected(lock_ts) {
        self.detector_scheduler.clean_up(lock_ts);
    }
}

fn has_waiter(&self) -> bool {
    self.waiter_count.load(Ordering::SeqCst) > 0
}

} ```

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论