最近有个任务要把上游的分库分表合并后同步到下游的TiDB,鉴于我司自己个儿的同步工具只能单线程的同步一张表,效率比较低,有时候高峰期会因为同步工具的瓶颈导致延迟。当然也不是我司的同步工具特别菜,是有点水土不服,毕竟之前那个同步工具上下游都是MySQL, MySQL的延迟是肯定比TiDB低的,TiDB的优势就是连接数可以搞很多,每次查询的延迟稍微大一些。所以这个工具就不那么好使了。
决定用DM试试。
用DM有个问题:
我司数据库权限管控比较严格,不能申请LOCK TABLE 权限,这样的话DM的extra-args: “–consistency ” 只能设置为none
用这个选项倒是能先跑起来,但是心里还是没底,到底这样同步数据能不能做到一致呢?会不会在dump开始到sync期间的修改丢掉?找了一通资料也没找到会不会,好在P社的代码都是开源的,那拿出代码看看吧。
下面就跟着这个 extra-args: “–consistency none” 选项走一走,看看到底是怎么处理的。
DM的代码在tiflow里面:
https://github.com/pingcap/tiflow
代码结构也很清晰:
从 DM 的配置文件也大概能看出来,分4个部分,mydumper、loader、syncer、checker
是不是真这样对应的我也没完全读所有代码,我关心的一致性的问题,主要在于备份那一块,我就是去dumpling里面去找找。
dumpling有这么几个函数,主要关注process
忽略里面的failpoint后,代码看起来很清晰。
if dumpling, err = export.NewDumper(newCtx, m.dumpConfig); err == nil {
m.mu.Lock()
m.core = dumpling
m.mu.Unlock()
err = dumpling.Dump()
dumpling.Close()
} else {
m.logger.Warn("error occurred during NewDumper", zap.Error(err))
}
func (d *Dumper) Dump() (dumpErr error) {
// 这里判断return consistency != ConsistencyTypeSnapshot || serverType != version.ServerTypeTiDB,
// 我的情况下: consistency=none,并且源也不是tidb,这里都是返回的true
repeatableRead := needRepeatableRead(conf.ServerInfo.ServerType, conf.Consistency)
conCtrl, err = NewConsistencyController(tctx, conf, pool)
if err = conCtrl.Setup(tctx); err != nil {
return errors.Trace(err)
}
// 这里执行:SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */
metaConn, err := createConnWithConsistency(tctx, pool, repeatableRead)
m.recordStartTime(time.Now())
// 注意看这段注释:对于 consistency 是 none 的情况,binlog 的 pos 可能早于dump的数据,所以要开启 safe-mode 确保数据安全。
// for consistency lock, we can write snapshot info after all tables are locked.
// the binlog pos may changed because there is still possible write between we lock tables and write master status.
// but for the locked tables doing replication that starts from metadata is safe.
// for consistency flush, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot.
// for consistency snapshot, we should use the snapshot that we get/set at first in metadata. TiDB will assure the snapshot of TSO.
// for consistency none, the binlog pos in metadata might be earlier than dumped data. We need to enable safe-mode to assure data safety.
// 这里面用 show master status 记录了 binlog的位置
err = m.recordGlobalMetaData(metaConn, conf.ServerInfo.ServerType, false)
// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached
if conf.Consistency != ConsistencyTypeLock {
if err = prepareTableListToDump(tctx, conf, metaConn); err != nil {
return err
}
}
atomic.StoreInt64(&d.totalTables, int64(calculateTableCount(conf.Tables)))
// 这里启动 goroutine,处理接受的命令
writers, tearDownWriters, err := d.startWriters(writerCtx, wg, taskOut, rebuildConn)
// 这里再记录一次 binlog的位置,因为这里链接已经设置了RR,并且连接已经开启了事务。从这个位置往后就可以安全的退出safe-mode了
if conf.PosAfterConnect {
// record again, to provide a location to exit safe mode for DM
err = m.recordGlobalMetaData(metaConn, conf.ServerInfo.ServerType, true)
if err != nil {
tctx.L().Info("get global metadata (after connection pool established) failed", log.ShortError(err))
}
}
tableDataStartTime := time.Now()
baseConn := newBaseConn(metaConn, true, rebuildMetaConn)
if conf.SQL == "" {
// 这里开始 dump 数据,里面构造sql语句,发送到taskIn这个chan里面,然后由上面启动的goroutine执行。
// 包括show databases,select * from xxx,根据一系列条件构造sql,dump数据。
if err = d.dumpDatabases(writerCtx, baseConn, taskIn); err != nil && !errors.ErrorEqual(err, context.Canceled) {
return err
}
} else {
d.dumpSQL(writerCtx, baseConn, taskIn)
}
close(taskIn)
_ = baseConn.DBConn.Close()
if err := wg.Wait(); err != nil {
summary.CollectFailureUnit("dump table data", err)
return errors.Trace(err)
}
m.recordFinishTime(time.Now())
return nil
}
总的思路就是:对连接设置RR,然后开启事务,先记录binlog的位置,再dump数据,为了安全,会先开启一段时间的安全模式。
安全模式的意思是:
安全模式 (safe mode) 是 DM 在进行增量同步时候的一种运行模式,在安全模式中,DM 增量同步组件在同步 binlog event 时,将把所有
INSERT
和UPDATE
操作强制进行改写后再在下游执行。安全模式的目的是在增量同步过程中,同一条 binlog event 能够在下游被重复同步且保证幂等性,从而确保增量同步能够“安全”进行。
DM 从 checkpoint 恢复数据同步任务后,可能重复执行某些 binlog 事件而导致下述问题:
- 在进行增量同步过程中,执行 DML 的操作和写 checkpoint 的操作并不是同步的;写 checkpoint 的操作和写下游数据的操作也并不能保证原子性。因此,当 DM 异常退出时,checkpoint 可能只记录到退出时刻之前的一个恢复点。
- 当 DM 重启同步任务,并从 checkpoint 重新开始增量数据同步时,checkpoint 之后的部分数据可能已经在异常退出前被处理过了,从而导致部分 SQL 语句重复执行。
- 如果重复执行
INSERT
操作,会导致主键或唯一索引冲突,引发同步中断;如果重复执行UPDATE
操作,会导致不能根据筛选条件找到之前对应的更新记录。在安全模式下,通过改写 SQL 语句,DM 可以解决上述问题。
安全模式通过 SQL 语句改写来保证 binlog event 的幂等性。具体来说,在安全模式下:
INSERT
语句会被改写成REPLACE
语句。UPDATE
语句会被分析,得到该语句涉及的行的主键或唯一索引的值,然后改写成DELETE
+REPLACE
语句 :先根据主键或唯一索引的定位删除对应的行,然后使用REPLACE
语句插入一条最新值的行记录。
REPLACE
操作是 MySQL 特有的数据插入语法。使用REPLACE
语法插入数据时,如果新插入的数据和现有数据存在主键或唯一约束冲突,MySQL 会删除所有冲突的记录,然后再执行插入记录操作,相当于“强制插入”的操作。具体请参考 MySQL 官方文档的REPLACE
语句相关介绍。
也就是说即使设置了 extra-args: “–consistency none” ,DM dump的数据也是连续的,可靠的,放心用就行。
另外吐槽:专栏这里贴了代码再加注释真实很难用,还是先在别的记事本里改好再贴过来比较方便。