0
0
0
0
博客/.../

TiDB Binlog 源码阅读系列文章(八)Loader Package 介绍

 TiDB官方  发表于  2020-01-20

本文作者:赵一霖

上一篇文章中,我们介绍了 Drainer server,其中 Syncer 的实现之一 MySQLSyncer 就是对 Loader 的封装。Loader 主要用于将 binlog 数据写入到支持 MySQL 协议的数据库。由于 Drainer/Reparo 等组件都需要类似功能,因此我们将 Loader 抽离成独立的 Package,供其他应用程序调用。Loader 代码主要在 TiDB Binlog repo 的 pkg/loader 目录下。

本文将从 Loader 的接口定义开始,由上至下的介绍 Loader 将 binlog 写入下游数据库的过程。

Loader Interface

Loader 组件的接口定义

type Loader interface {
    // SetSafeMode 设置 SafeMode 状态
    SetSafeMode(bool)

    // GetSafeMode 获取 SafeMode 状态
    GetSafeMode() bool

    // Input 返回一个用于输入事务的 Channel
    Input() chan<- *Txn

    // Successes 事务执行成功后,将输出到 Successes
    Successes() <-chan *Txn

    // Close 安全关闭 Loader
    Close()

    // Run 运行 Loader 相关流程
    Run() error
}

Loader 将处理调用者传入的 Txn Cahnnel,Txn 的结构中包含 DDL、DML 等信息,将在下一篇中详细介绍。Loader 的使用者可以将待同步的事务传入 Input,并从 Successes 中接收事务提交成功的事件通知。Example Loader 是一个比较简单的 Loader 使用范例,可供大家参考。

SafeMode

了解 Binlog 组件的读者可能知道,drainer 有一个配置项 safe-mode。开启安全模式后,Loader 会将 Insert 语句换为 Replace 语句,将 Update 语句拆分为 Delete + Replace 语句,使得下游 MySQL/TiDB 可被重复写入。

在 Loader 中,我们通过 SetSafeModeGetSafeMode 来设置和获取 SafeMode 状态。SetSafeModeGetSafeMode 的实现如下:

func (s *loaderImpl) SetSafeMode(safe bool) {
    if safe {
        atomic.StoreInt32(&s.safeMode, 1)
    } else {
        atomic.StoreInt32(&s.safeMode, 0)
    }
}

func (s *loaderImpl) GetSafeMode() bool {
    v := atomic.LoadInt32(&s.safeMode)
    return v != 0
}

SetSafeModeGetSafeMode 实际上都是对 safeMode 这个变量进行原子操作,接下来我们来追踪一下 safeMode 变量,safeMode(*executor).singleExec 函数中被用到:

func (e *executor) singleExec(dmls []*DML, safeMode bool) error {
    tx, err := e.begin()
    if err != nil {
        return errors.Trace(err)
    }

    for _, dml := range dmls {
        if safeMode && dml.Tp == UpdateDMLType {
            sql, args := dml.deleteSQL()
            if _, err := tx.autoRollbackExec(sql, args...); err != nil {
                return errors.Trace(err)
            }

            sql, args = dml.replaceSQL()
            if _, err := tx.autoRollbackExec(sql, args...); err != nil {
                return errors.Trace(err)
            }
        } else if safeMode && dml.Tp == InsertDMLType {
            sql, args := dml.replaceSQL()
            if _, err := tx.autoRollbackExec(sql, args...); err != nil {
                return errors.Trace(err)
            }
        } else {
            sql, args := dml.sql()
            if _, err := tx.autoRollbackExec(sql, args...); err != nil {
                return errors.Trace(err)
            }
        }
    }

    err = tx.commit()
    return errors.Trace(err)
}

0
0
0
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论