1
0
2
1
专栏/.../

TiCDC 源码解读(6)- TiCDC Puller 模块介绍

 sdojjy  发表于  2023-03-14

分享概要

本文是 TiCDC 源码解读的第六篇, 主要是 TiCDC 中的 Puller 模块介绍,TiCDC 中的 Puller 通过创建 KV-Client 向 TiKV 发送 ChangeDataRequest 请求, 在 TiCDC 中实现从TiKV 接收变更数据功能。本期将详细分享 Puller 模块的功能实现原理,今天的分享将从以下四个方面展开,解答 TiCDC Puller 模块的四个关键问题:

  • TiCDC 中的 Puller 模块是什么?
  • Puller 如何初始化以及如何 从 TiKV 拉取数据?
  • Puller 如何处理数据变更事件?
  • Puller 如何推进拉取数据的进度?

Puller 是什么

Puller 创建一个 KV-Client 并向 TiKV 发送 ChangeDataRequest 请求,从而从 TiKV 接收变更数据。

  • TiKV 按照 Region 为单位,将数据变更事件和 Resolved Ts 事件发送给 TiCDC。Resolved Ts 事件按照 Region 为单位周期性地发送到 TiCDC,表明该 Region 中所有 Commit Ts 小于该 Resolved Ts 的事件都已被发送到 TiCDC。
  • Puller 从 KV-Client 接收数据,将其写入 Sorter 中,并持续推进表级别的 Resolved Ts,以标识该表当前接收数据的进度。

对于 DML 事件,从 table pipeline 的角度来看,pullerNodetable pipeline 中的一个节点,处理流程大概为

  1. pullerNode 构造并初始化了一个 Puller 接口。
  2. pullerNode 驱动 Puller 向 TiKV 发送 ChangeDataRequest grpc 请求。
  3. region worker模块则处理从 TiKV 收到的数据,将数据写到了 Puller 的 output Chan 中
  4. pullerNode 消费 outputCh 中的 RawKvEventry 数据。

对于 DDL 事件,其实底层也是一个 Puller,只是拉取的数据范围不一样,其就是 DDLJobPuller 模块运行并消费 Puller 输出的事件。今天我们要讲的 Puller 就是下图的 pullerImplcdc client 以及其他相关模块。

Puller 接口定义

下面是 Puller 接口的定义,它主要包含两个比较重要的方法:

  • Run 方法,这是一个阻塞的方法,上层调用者通过单独的 goroutine 来运行 Run 方法, 来驱动 Puller 向 TiKV 发送请求,并处理 TiKV 发送过来的事件,输出到 Puller 的 output channel 中;
  • Output 方法,这个方法比较简单直观,它返回一个 channel 以供上层消费数据。
// Puller pull data from tikv and push changes into a buffer.
type Puller interface {
        // Run the puller, continually fetch event from TiKV and add event into buffer.
        Run(ctx context.Context) error
        GetResolvedTs() uint64
        Output() <-chan *model.RawKVEntry
        Stats() Stats
}

ChangeDataRequest 请求

ChangeDataRequest 结构体定义了向 TiKV 发送请求的所有信息:

  • CheckpointTs 表示从哪一个时间点开始同步数据。
  • RegionId 表示向 TiKV 的哪一个 region 请求变更事件。
  • ExtraOp 定义了一些请求的扩展性属性,现在使用的 old value 输出标志就是附加在这个字段上的。
req := &cdcpb.ChangeDataRequest{
   Header:       header,
   RegionId:     regionID,
   RequestId:    requestID,
   RegionEpoch:  regionEpoch,
   CheckpointTs: sri.resolvedTs,
   StartKey:     sri.span.Start,
   EndKey:       sri.span.End,
   ExtraOp:      extraOp,
   FilterLoop:   s.client.filterLoop,
}

从这里可以注意到在给 TiKV 发送的请求中是没有 table 的概念的,只有 region 相关的信息,而 table pipeline 又是一个表的概念,所以当我们要从 TiKV 拉取一个表的实时数据变更时,需要在 TiCDC 侧将表的概念映射到 TiKV 的 region 中,根据 关系模型到 Key-Value 模型的映射 这个文档 , 如 GetTableSpan 方法展示的一样,将一个 table 的数据存在 [ t<table-id>_r, t<table-id>_<r+1>) 这样一个左闭右开的区间里。

// GetTableSpan returns the span to watch for the specified table
func GetTableSpan(tableID int64) Span {
        tablePrefix := tablecodec.GenTablePrefix(tableID)
        sep := byte('_')
        recordMarker := byte('r')

        var start, end kv.Key
        // ignore index keys.
        start = append(tablePrefix, sep, recordMarker)
        end = append(tablePrefix, sep, recordMarker+1)
        return Span{
                Start: start,
                End:   end,
        }
}

Puller 请求流程

现在我们已经知道了 TiCDC 与 TiKV grpc 请求的格式,以及怎样将 table id 映射到 kv range。下面我们来介绍 Puller 请求的整体流程, 初始化 Puller 的时候,上层将 table id 转换成了 totalSpan 字段赋值给了 Puller ,从这以后,Puller 内部没有 table 的概念了,Puller 所做的事就是去捕获并输出这个 kv span 范围内的实时数据变更。

在 TiCDC 代码中,pullerImpl 结构体实现了 Puller 接口, 并通过 cdc client 为每一个 span 创建一个 eventSession , 而 eventSession 则会启动 5 个 goroutine 来协调完成该 span 的 kv 事件拉取任务,下面我们详细介绍一下这个 5 个 goroutine。。

        g.Go(func() error {
                return s.dispatchRequest(ctx)
        })

        g.Go(func() error {
                return s.requestRegionToStore(ctx, g, regionCount)
        })

        g.Go(func() error {
              ....
              go s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts)
              .....
        })

        g.Go(func() error {
            go s.handleError(ctx, errInfo)
        })

        g.Go(func() error {
                return s.regionRouter.Run(ctx)
        })
        s.requestRangeCh <- rangeRequestTask{span: s.totalSpan, ts: ts}
}

divideAndSendEventFeedToRegions

这个 goroutine 从 requestRangeCh 获取一个 span 信息, 把 span 拆分成为 region,可以注意这里的第一个 span 是整个 totalSpan, 其内部逻辑大致是:

  1. 利用 regionCache 来迭代出这个 span 中覆盖的所有 region信息
  2. 为每一个 region 生成一个 region 相关的 singleRegionInfo 任务
  3. 用 RegionRangeLock 模块来锁住这一个 region 的范围,表示这个 region 已经在被处理了,如果成功锁定,则将任务发送到下一级 chan 中,否则,重新把这个子 span 发送到 requestRangeCh 来重试失败的子 span

dispatchRequest

这个 goroutine 做的事比较简单, 就是从 chanel 中取出 singleRegionInfo,然后根据 region 信息获取 rpcCtx,并将 rpcCtx 赋值给 singleRegionInfo,最后再把 singleRegionInfo 任务传给 regionRouter。

regionRouter

regionRouter 是一个基于 token 请求限流模块,用来限制 TiCDC 向 TiKV 发送请求的频率。

regionRouter 周期性地按 token limit 的方式把 region 任务输出到他的 output 中,默认情况下,每个 TiKV store 的token 数是 40。在 Puller 真正向 TiKV 发送完请求后被会将该 store 的 token 减1,而在收到 TiKV 发送来的 INITIALIZED 事件后释放一个 token。

requestRegionToStore

requestRegionToStore 从上面的 regionRoute 的 routput chan 中读取出 singleRegionInfo,并根据其携带的信息来构造 ChangeDataRequest 发送给 TiKV,然后在单独的 goroutine 中调用接收从 TiKV 发送过来的事件。同时也创建并初始化运行一个 region worker,用于处理 TiKV 发送来数据。到这里,正常流程下,一个 span 的 region 拆分、region 任务分发,以及请求的发送过程都完成了。值得注意的是每个 TiKV store 只有一个 grpc stream, 建立后,ChangeDataRequest 都在这个 stream 上收发,并用同一个 region worker 处理。

handleError

最后一个 goroutine 是来到做错误处理的。

Region worker 数据处理

Region worker 对应一个 TiKV store,负责这个 TiKV store 上所有 region 的数据处理。 Region worker 从自己的 input chan 中读取 grpc response,经过处理后把数据写到 Puller 的 eventChan 中。

Region worker 的启动是在它的 Run 方法中进行的:

  1. 首先是 initPoolHandles 向 workerPool 注册 handle,workerPool 是 TiCDC 内部实现的一个线程池,向这个线程池中注册完 handle 后就可以向 workPool 提交任务了。当 region worker 的 input chan 里堆积了大量 grpc response 且超过了一个阈值来不及处理时, region worker 会向这个线程池提交任务,以加快任务处理,提高 Puller 的吞吐。
  2. Region worker 启动一个 eventHandler goroutine 来从 inputCh 读取 grpc 的接收到的数据,这是真正数据处理的地方。
  3. Region worker 还会启动一个goroutine 来 resolveLock,尝试解决上游 TiDB 崩溃时在 TiKV 中残留的 lock 信息,以免影响 TiKV resolved ts 的推进。

KV 事件处理

在讲 TiCDC 如何处理数据前,我们先来回顾一下 TiKV 如何捕获变更数据的,在TiKV 侧数据的捕获分成了两个阶段:

  1. 第一个阶段 我们称之为增量扫,这个阶段获取 Region Snapshot,读取某段时间范围内的数据更改,时间范围一般为 (start ts, current ts]。这一阶段会输出三种类型的事件,

    1. 一种是 prewrite 事件,表示发生在 start ts 到 current ts 期间的上锁记录
    2. 一种是 commited 事件,表示发生在 start ts 到 current ts 期间的提交记录,也就是 prewrite 加 commit 之后的完整结果
    3. 最后一种是 initilaized 事件,表示增量扫的过程结束了,后续不会有 committed 内容输出, TiKV 会向 TiCDC 发送 resolved ts 事件了。
  2. 第二个阶段是实时推流的过程,这个阶段贯穿于整个 TiCDC 连接生命周期,通过启动运行在 Raftstore Apply 线程中的 CdcObserver,实时捕捉上游写入。

总的来说就是 TiKV 会汇总两个阶段的 KV 数据,通过 Grpc 发送给 TiCDC。

TiCDC 在收到 Grpc 响应后,Grpc 响应数据被解析出来发送到了 region worker 的 inputChan 中, region worker 从 inputChan 中读出数据,并在 handleEventEntry 方法中处理上述各种类型的事件还原成完整事务:

  • Committed:已提交事务,直接输出到 Puller 的 event channel 中。

  • Prewrite: prewrite 事件, 需要缓存到 matcher 中, key 为 startTs 和 kv key。

  • Commit: 事务提交事件, 从 matcher 中找到缓存的 prewrite 事件,组装成一个提交好的事务,然后输出到 eventCh 中。

  • Rollback: 事务回滚,从matcher 的缓存中清理掉 prewriter 事件。

  • Initialized:增量扫完成,表示这个 region 可以处理了,这个时候 region worker 需要做这几件事:

    • 设置 region initialized 状态, handleResolvedTs 方法在检测到这个标志后会输出该 region 的 resolvedTs
    • 释放 regionRouter 的 token, region 增量扫完成了,可以允许发送别的 region 的 ChnageDataRequest 请求了
    • 处理所有的 cachedCommit 事件,并发送到 event channel 中
  • ResolvedTs: resolved ts 会被送到 event chan 中, 同时也会发送到 resolve lock goroutine 中 channel 中。

ResolvedTs 推进

KV 事件经过处理后可以输出了,但是 region worker 输出到 Puller event chan 中的 resolved ts 事件还只是 region 级别的 resolved ts, 而我们需要输出一个表级别的 resolved ts,更准确地说是整个 Puller span 范围的 resolved ts,来告诉下游这个 resolved ts 之前的所有 KV 数据可以处理了。 这里就需要用到一个叫 frointier 模块,这个模块的接口定义比较简单, Forward 方法用来接收某个 region 的 resolved ts 值以及其 kv 范围, 而 Frointier 则表示输出 Puller 级别的 resolved ts。

type Frontier interface {
        Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64)
        Frontier() uint64
        String() string
}

Puller 会在其 Run 方法中,消费由 region worker 输出到 event chan 中的数据:

  • 如果是 kv 事件,直接就输出到 output chan 中
  • 如果是 resolved ts,则会调用 frointer 的 Forward 的方法来调整并更新 frointier 内部维护的最小堆,重新计算并缓存所有 span 中最小的 resolved ts。Puller 最后调用 Frointier 方法拿到最小的 resolved ts,封装成 RawKvEntry 输出到 Puller 的 output chan 中。举个例子,一个 Puller 同步的表有 6 个 region,它们当前的 resolved ts 分别为 6,3,2,5,4,7, 现在 TiCDC 收到了其中一个 region 的 resolved ts 事件, 其值更新成了 4, 变成了 6,3,4, 5,4, 7,则 Puller 会对外会输出一个 3 的 resolved ts类型 RawKvEntry。

错误处理

Region worker 在处理事件的时候会发生一些错误,同时也可能会收到一些由 TiKV 发过来的错误,比如说 region not found、not leader 等等。这些错误都会由 Puller eventSession 统一的错误处理逻辑来处理:

  • 首先调用 handleSingleRegionError 方法统一输出到了一个 errChan 中
  • 然后由之前提到的 Puller eventSession 中的 handleError goroutine 消费处理, handleError 处理 errChan 时,根据错误的类型来判断是否需要重新调度 region 请求。一些错误是不预期或者不可重试的,比如 DuplicateRequest,Compatibility,ClusterIdMismatch,遇到这类错误时 Puller 就直接报错,而其他可重试的错误则会调用 scheduleRegionRequest 来重新调度 region 所覆盖的 span,再一次完整的走过 eventSeesion 处理 span 的流程。

DDL 事件捕获

最后,我们来了解一下 DDL 事件的拉取过程。DDL 也是存储在 TiKV 中,因此底层捕获数据的逻辑与 DML 相同,都是向 TiKV 中请求事件变更的 span。TiCDC 在 Puller 的上层定义了一个名为 DDLJobPuller 的接口。

这个接口的作用类似于 pullerNode,用于驱动 Puller 数据流动。DDLJobPuller 接口具有一个 Run 方法用于拉取数据,以及一个返回 channel 的 Output 方法, 用于向外暴露数据。与 DML 相比,底层 Puller 需要拉取的 span 的初始值不同,共需要拉取 3 个 span 的数据。

func GetAllDDLSpan() []Span {
        return []Span{getDDLSpan(), getAddIndexDDLSpan(), GetTableSpan(JobTableID)}
}

在 TiDB 支持 concurrent DDL 时, DDL 也被视为一个特殊表,表的 ID 为 MaxInt48 - 1,所以在这种情况下,它的 span 计算方式与 DML 保持一致,为了兼容性所有的场景,TiCDC 会同时拉取这 3 个 span 的数据。

总结

以上就是本文的全部内容,希望在阅读上面的内容之后,能够对 TiCDC Puller 模块的工作原理有一个基本了解,了解以下几个要点:

  • Puller 确定需要拉取哪些 KV region 的方式
  • Puller 初始化、还原事务的操作
  • Puller 推进 resolved ts 的过程

1
0
2
1

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论