本文作者:赵一霖
在上一篇文章中,我们介绍了 Drainer server,其中 Syncer 的实现之一 MySQLSyncer 就是对 Loader 的封装。Loader 主要用于将 binlog 数据写入到支持 MySQL 协议的数据库。由于 Drainer/Reparo 等组件都需要类似功能,因此我们将 Loader 抽离成独立的 Package,供其他应用程序调用。Loader 代码主要在 TiDB Binlog repo 的 pkg/loader 目录下。
本文将从 Loader 的接口定义开始,由上至下的介绍 Loader 将 binlog 写入下游数据库的过程。
Loader Interface
type Loader interface {
// SetSafeMode 设置 SafeMode 状态
SetSafeMode(bool)
// GetSafeMode 获取 SafeMode 状态
GetSafeMode() bool
// Input 返回一个用于输入事务的 Channel
Input() chan<- *Txn
// Successes 事务执行成功后,将输出到 Successes
Successes() <-chan *Txn
// Close 安全关闭 Loader
Close()
// Run 运行 Loader 相关流程
Run() error
}
Loader 将处理调用者传入的 Txn Cahnnel,Txn 的结构中包含 DDL、DML 等信息,将在下一篇中详细介绍。Loader 的使用者可以将待同步的事务传入 Input,并从 Successes 中接收事务提交成功的事件通知。Example Loader 是一个比较简单的 Loader 使用范例,可供大家参考。
SafeMode
了解 Binlog 组件的读者可能知道,drainer 有一个配置项 safe-mode。开启安全模式后,Loader 会将 Insert 语句换为 Replace 语句,将 Update 语句拆分为 Delete + Replace 语句,使得下游 MySQL/TiDB 可被重复写入。
在 Loader 中,我们通过 SetSafeMode 和 GetSafeMode 来设置和获取 SafeMode 状态。SetSafeMode 和 GetSafeMode 的实现如下:
func (s *loaderImpl) SetSafeMode(safe bool) {
if safe {
atomic.StoreInt32(&s.safeMode, 1)
} else {
atomic.StoreInt32(&s.safeMode, 0)
}
}
func (s *loaderImpl) GetSafeMode() bool {
v := atomic.LoadInt32(&s.safeMode)
return v != 0
}
SetSafeMode 和 GetSafeMode 实际上都是对 safeMode 这个变量进行原子操作,接下来我们来追踪一下 safeMode 变量,safeMode 在 (*executor).singleExec 函数中被用到:
func (e *executor) singleExec(dmls []*DML, safeMode bool) error {
tx, err := e.begin()
if err != nil {
return errors.Trace(err)
}
for _, dml := range dmls {
if safeMode && dml.Tp == UpdateDMLType {
sql, args := dml.deleteSQL()
if _, err := tx.autoRollbackExec(sql, args...); err != nil {
return errors.Trace(err)
}
sql, args = dml.replaceSQL()
if _, err := tx.autoRollbackExec(sql, args...); err != nil {
return errors.Trace(err)
}
} else if safeMode && dml.Tp == InsertDMLType {
sql, args := dml.replaceSQL()
if _, err := tx.autoRollbackExec(sql, args...); err != nil {
return errors.Trace(err)
}
} else {
sql, args := dml.sql()
if _, err := tx.autoRollbackExec(sql, args...); err != nil {
return errors.Trace(err)
}
}
}
err = tx.commit()
return errors.Trace(err)
}