跳到主要内容

DM 数据旅程 01:序言 + 第一次 start task

作者:okenJiang

背景

在此之前已经有官方很多关于 DM 的优秀文章了,比如

这些文章从原理方面非常详尽地介绍了 DM 的相关功能,是非常好的学习资料。但是

而外部的文章则大部分集中在 DM 的使用上而不是实现上。

基于此,我想开一个坑《DM 数据旅程系列》,每一篇文章将以一个个小功能为线索,带大家理解 DM 中的各种实现。如果要讲的功能太大,也会拆分成小模块放出。每一步都会尽量放出 GitHub 地址,方便大家跟踪学习~

数据旅程出自于龙少 PPT 中提到的用户旅程和数据旅程,指我们可以通过数据(字节)传输的途径。在看一段代码时,我们可以思考这个字节是从哪里来的,到哪里去,作用是什么,通过理解数据旅程来理解整个产品它的深层原理,并且可以通过改变数据规模(提升/降低数据数量级)和场景(不同的时间不同的位置)来理解产品的缺点(bug)。

以上都是个人拙见(废话),欢迎提意见~

当然,现在的 DM 正在飞速的发展迭代中,本系列的内容也可能马上就会过时,现在是 2021 年 10 月 31 日,本系列文章预计将会覆盖 DM v5.3.0-vx.x.x 的代码逻辑。

如果认为文章中有任何可以改进的地方, 欢迎大家提出自己的想法。同样地,因为 DM 还在快速迭代,还有很多地方都有改进的地方,如果大家对代码实现有任何疑问,也都可以去 repo 中直接提 issue。

读者要求

  • 能看懂 Golang 语法
  • 了解 grpc、etcd

计划章节

  1. Start task
  2. Stop task
  3. Pause task
  4. Resume task
  5. Full mode(dumpling)
  6. Incremental mode(syncer)
  7. Block-allow list
  8. Binlog-filter
  9. Enable relay log
  10. Permistic sharding ddl
  11. Optimistic sharding ddl
  12. 。。。

一、概述

本文以 start task 为目的,带着读者从 0 到 1 启动一个数据迁移任务,旨在让读者了解到最基础的 DM 逻辑。本文将直接参照集成测试 start_task 的过程,从以下几个方面展开:

  1. Start dm-master
  2. Start dm-worker
  3. 绑定 source 和 dm-worker
  4. Start task

注:为了专注于我们的目的(start task),本文不会对无关代码进行解读

大家可使用 start/stop 流程 辅助阅读

由于写这篇的文章的时间是 2021 年 12 月份,所以所有的链接都是原 DM repo 的😂

二、start dm-master

  1. ./dm-master(in run_dm_master) 启动二进制文件,即调用 main 函数,其中 master-server start
  2. go electionNotify:这个是为了等待 etcd election 成功,并在其成功后做⬇️

DM master 中内嵌了一个 etcd,用于存储各种元数据,并且借此保证 DM master 的高可用。后面非常多的数据存储都会用到 etcd。

  1. startLeaderComponent,其中我们这次只需要关注 s.scheduler.Start 中的go observeWorkerEvent,主要分为两部分

    1. go WatchWorkerEvent:该函数通过 etcd client 监听是否有 workerEvent 出现

    2. handleWorkerEv:有 workerEvent 出现时,handle it

      1. handleWorkerOffline
      2. handleWorkerOnline
  2. 这个时候,dm-master 等待 workerEvent 到来

三、start dm-worker

  1. ./dm-worker(in run_dm_worker)启动二进制文件,即调用 main 函数,其中 worker-server start

  2. JoinMaster:先告诉 master,我来了!

    1. worker 先在这 RegisterWorker,然后会触发 master 调用 RegisterWorker
    2. Master 会调用 AddWorker,然后 PutWorkerInfo,把相应的 key-value 写到 etcd
    3. 可以看到写到 etcd 用的是 clientv3.OpPut(key, value),也就是说 kv 要执行 put 操作
    4. 之前的 go WatchWorkerEvent 中就监听到有事件来了,并且判断其为 mvccpb.PUT 类型,event 处理之后会通过 outCh 传到 handleWorkerEv 中进行具体的上线处理
    5. 刚上线的时候,就会去各种找 source 去 bound,但是现在我们还没有 create source,所以也找不到 source,暂时可以不关注这里
  3. Start task 还需要 bound source,那 worker 首先要做的就是 observeSourceBound,这里同 observeWorkerEvent 是类似的:

    1. go WatchSourceBound:通过 etcd client 监听是否有 sourceBound 出现
    2. handleSourceBound:上面监听到了之后,则 operateSourceBound
  4. 接下来,dm-worker 等待 source bound

四、operate-source create

DM 用的命令行工具是 cobra,有兴趣的读者可深入了解一下

  1. 命令行执行 operate-source create(in test_prepare),operate-source 这个命令在 NewOperateSourceCmd 注册,具体实现在 operateSourceFunc

  2. 读取到该命令后,开始解析第一个参数(即 create)并转换,最后被打包送到 master,开始执行 master 的 OperateSource 函数

  3. 该函数中,master 会从命令行中给出的配置文件路径

    1. 解析并调整 source config
    2. 把 source cfg 也存到 etcd 里,因为 worker 待会要用
    3. Try to bound it to a free worker:因为我们是第一次 start task,并且也没有开启 relay 功能(test 中是开启了,但本篇文章假设不开启),所以我们就只能 bound a free worker 了。
    4. 最终,通过 PutSourceBound,把 SourceBound 通过 etcd client 发送
  4. 发送之后,worker 就通过 go WatchSourceBound 监听到有 SourceBound 出现,然后进行 operateSourceBound

    1. 首先需要拿到 source cfg,因为上面的操作都是在 master 执行的,worker 这里并没有 source cfg
    2. Source cfg 也是通过 etcd 拿到的,正好上面存了
  5. 之后就可以开始 subtask 了吧

    1. 但是并没有。。。我们还没开始 start task 呢!
    2. 所以 fetchSubTasksAndAdjust 并不能拿到 subtask。拿到是空的
  6. 那没办法了,继续呗(又是同样的 watch/handle 机制)

    1. go WatchSubTaskStage
    2. handleSubTaskStage

五、start-task

  1. 命令行执行 start-task(in test_prepare),start-task 命令的注册和实现参考 operate-source,最后执行 master 的 StartTask 函数

  2. 直接开始就 generateSubTaskreq.Task 直接传递的就是解析好的 task.yaml 字符串,原来在命令的实现中就帮我们解析好啦)。简单的说,就是经过一些 adjust 和 check, 帮助我们生成了 SubTask struct

  3. 重点来了,AddSubTasks -> NewSubTaskStage,subTask 终于创建好了,stage=running;再 put 进 etcd,完美。可以看到我们分别把 SubTaskCfgSubTaskStage 都 put 进 etcd 了。

  4. 那上面就 watch 到 stage 来了,对 SubTaskCfg 进行处理,如果我们是要进行 run 的操作,我们还得先把 cfg 拿出来,最后 startSubTask

  5. startSubTask 中,会 NewSubTask,再 runSubTask。subTask 内部具体的执行组建是由 unit 负责的,所以它会

    1. initUnits
    2. st.run 其实也是由 currentUnitProcess

六、结语

在 unit Process 后,start-task 就结束啦!是不是还意犹未尽呢?到底有哪些 unit 呢?这些 unit 内部到底是怎么 Process 的呢?在后续的文章中会陆续和大家见面哦。

其实再复读一下全文,我们发现本篇文章并没有太多很难的东西,大部分篇幅都在描述一些「准备活动」,全程用 etcd watch——master 等待 worker 到来、worker 等待 source 到来、source-worker 等待 subtask 到来。等就完事了。

任何建议和反馈都欢迎告诉我。下期再见!