背景
https://docs.pingcap.com/zh/tidb/v6.5/sync-diff-inspector-overview
上述链接是TiDB官方校验工具sync_diff_inspector 的解释,其中功能的第一条就是:对比表结构和数据。
本人最近经历的一个迁移项目在使用该工具过程中,对迁移上下游 库表进行校验时,发现某张表上游无主键,下游有主键,其他结构信息相同时,sync_diff_inspector 工具显示校验无差异。基于上述情况,本人准备翻阅工具的代码,对结构校验的功能进行探索。
探索过程
01获取代码
首先该项目采用的是sync_diff_inspector 6.5.7 版本,因此需要从 github 获取对应版本的源码包:https://github.com/pingcap/tidb-tools/releases/tag/v6.5.7 。然后使用 go ide 工具打开分析。
02代码分析
对于 go 代码的分析,一般从 main 函数作为程序运行入口。我们 从main 函数代码开始,逐步分析结构校验的代码逻辑:
第一步:main.go 文件 ,程序主入口
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
//略
)
func main() {
//初始化和配置文件检查加载相关代码,略
if !checkSyncState(ctx, cfg) { //执行sync_diff 校验函数
log.Warn("check failed!!!")
os.Exit(1)
}
log.Info("check pass!!!")
}
func checkSyncState(ctx context.Context, cfg *config.Config) bool {//sync_diff 校验函数定义
beginTime := time.Now()
defer func() {
log.Info("check data finished", zap.Duration("cost", time.Since(beginTime)))
}()
d, err := NewDiff(ctx, cfg) //创建一个校验示例
if err != nil {
fmt.Printf("There is something error when initialize diff, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName))
log.Fatal("failed to initialize diff process", zap.Error(err))
return false
}
defer d.Close()
if !cfg.CheckDataOnly { //判断是否只校验数据,如果只校验数据为否,则会执行下面 检查结构 的代码。
err = d.StructEqual(ctx)//执行结构校验函数
if err != nil {
fmt.Printf("There is something error when compare structure of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName))
log.Fatal("failed to check structure difference", zap.Error(err))
return false
}
} else {
log.Info("Check table data only, skip struct check")
}
if !cfg.CheckStructOnly {
err = d.Equal(ctx)
if err != nil {
fmt.Printf("There is something error when compare data of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName))
log.Fatal("failed to check data difference", zap.Error(err))
return false
}
} else {
log.Info("Check table struct only, skip data check")
}
return d.PrintSummary(ctx)
}
第二步:diff.go 文件,校验准备工作,循环校验每张表,获取结构校验所需的表信息。
func (df *Diff) StructEqual(ctx context.Context) error {//结构校验函数定义
tables := df.downstream.GetTables()
tableIndex := 0
if df.startRange != nil {
tableIndex = df.startRange.ChunkRange.Index.TableIndex
}
for ; tableIndex < len(tables); tableIndex++ {
isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].TableLack
if common.AllTableExist(isAllTableExist) {
var err error
isEqual, isSkip, err = df.compareStruct(ctx, tableIndex) //循环调用比较结构的函数
if err != nil {
return errors.Trace(err)
}
}
progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllTableExist)
df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllTableExist)
}
return nil
}
func (df *Diff) compareStruct(ctx context.Context, tableIndex int) (isEqual bool, isSkip bool, err error) {//比较结构函数的定义
sourceTableInfos, err := df.upstream.GetSourceStructInfo(ctx, tableIndex)
if err != nil {
return false, true, errors.Trace(err)
}
table := df.downstream.GetTables()[tableIndex]
isEqual, isSkip = utils.CompareStruct(sourceTableInfos, table.Info) //调用 utils包的结构校验函数,参数是上下游的表信息
table.IgnoreDataCheck = isSkip
return isEqual, isSkip, nil
}
第三步骤:util.go 文件 做结构校验逻辑的函数。
// CompareStruct compare tables' columns and indices from upstream and downstream.
// There are 2 return values:
//
// isEqual : result of comparing tables' columns and indices
// isPanic : the differences of tables' struct can not be ignored. Need to skip data comparing.
func CompareStruct(upstreamTableInfos []*model.TableInfo, downstreamTableInfo *model.TableInfo) (isEqual bool, isPanic bool) {
// compare columns
for _, upstreamTableInfo := range upstreamTableInfos {
if len(upstreamTableInfo.Columns) != len(downstreamTableInfo.Columns) {
// the numbers of each columns are different, don't compare data
log.Error("column num not equal",//校验列数量不一致
zap.String("upstream table", upstreamTableInfo.Name.O),
zap.Int("column num", len(upstreamTableInfo.Columns)),
zap.String("downstream table", downstreamTableInfo.Name.O),
zap.Int("column num", len(downstreamTableInfo.Columns)),
)
return false, true
}
for i, column := range upstreamTableInfo.Columns {
if column.Name.O != downstreamTableInfo.Columns[i].Name.O {
// names are different, panic!
log.Error("column name not equal",//校验列名称不一致
zap.String("upstream table", upstreamTableInfo.Name.O),
zap.String("column name", column.Name.O),
zap.String("downstream table", downstreamTableInfo.Name.O),
zap.String("column name", downstreamTableInfo.Columns[i].Name.O),
)
return false, true
}
if !isCompatible(column.GetType(), downstreamTableInfo.Columns[i].GetType()) {
// column types are different, panic!
log.Error("column type not compatible",//校验列类型不一致
zap.String("upstream table", upstreamTableInfo.Name.O),
zap.String("column name", column.Name.O),
zap.Uint8("column type", column.GetType()),
zap.String("downstream table", downstreamTableInfo.Name.O),
zap.String("column name", downstreamTableInfo.Columns[i].Name.O),
zap.Uint8("column type", downstreamTableInfo.Columns[i].GetType()),
)
return false, true
}
if !sameProperties(column, downstreamTableInfo.Columns[i]) { //调用列属性校验函数
// column properties are different, panic!
log.Error("column properties not compatible",//校验列属性不一致
zap.String("upstream table", upstreamTableInfo.Name.O),
zap.String("column name", column.Name.O),
zap.Uint8("column type", column.GetType()),
zap.String("downstream table", downstreamTableInfo.Name.O),
zap.String("column name", downstreamTableInfo.Columns[i].Name.O),
zap.Uint8("column type", downstreamTableInfo.Columns[i].GetType()),
)
return false, true
}
}
}
// compare indices 校验索引的逻辑
deleteIndicesSet := make(map[string]struct{})
unilateralIndicesSet := make(map[string]struct{})
downstreamIndicesMap := make(map[string]*struct {
index *model.IndexInfo
cnt int
})
for _, index := range downstreamTableInfo.Indices {
downstreamIndicesMap[index.Name.O] = &struct {
index *model.IndexInfo
cnt int
}{index, 0}
}
for _, upstreamTableInfo := range upstreamTableInfos {
NextIndex:
for _, upstreamIndex := range upstreamTableInfo.Indices {
if _, ok := deleteIndicesSet[upstreamIndex.Name.O]; ok {
continue NextIndex
}
indexU, ok := downstreamIndicesMap[upstreamIndex.Name.O]
if ok {
if len(indexU.index.Columns) != len(upstreamIndex.Columns) {
// different index, should be removed
deleteIndicesSet[upstreamIndex.Name.O] = struct{}{}
continue NextIndex
}
for i, indexColumn := range upstreamIndex.Columns {
if indexColumn.Offset != indexU.index.Columns[i].Offset || indexColumn.Name.O != indexU.index.Columns[i].Name.O {
// different index, should be removed
deleteIndicesSet[upstreamIndex.Name.O] = struct{}{}
continue NextIndex
}
}
indexU.cnt = indexU.cnt + 1
} else {
unilateralIndicesSet[upstreamIndex.Name.O] = struct{}{}
}
}
}
existBilateralIndex := false
for _, indexU := range downstreamIndicesMap {
if _, ok := deleteIndicesSet[indexU.index.Name.O]; ok {
continue
}
if indexU.cnt < len(upstreamTableInfos) {
// Some upstreamInfos don't have this index.
unilateralIndicesSet[indexU.index.Name.O] = struct{}{}
} else {
// there is an index the whole tables have,
// so unilateral indices can be deleted.
existBilateralIndex = true
}
}
// delete indices
// If there exist bilateral index, unilateral indices can be deleted.
if existBilateralIndex {
for indexName := range unilateralIndicesSet {
deleteIndicesSet[indexName] = struct{}{}
}
} else {
log.Warn("no index exists in both upstream and downstream", zap.String("table", downstreamTableInfo.Name.O))
}
if len(deleteIndicesSet) > 0 {
newDownstreamIndices := make([]*model.IndexInfo, 0, len(downstreamTableInfo.Indices))
for _, index := range downstreamTableInfo.Indices {
if _, ok := deleteIndicesSet[index.Name.O]; !ok {
newDownstreamIndices = append(newDownstreamIndices, index)
} else {
log.Debug("delete downstream index", zap.String("name", downstreamTableInfo.Name.O), zap.String("index", index.Name.O))
}
}
downstreamTableInfo.Indices = newDownstreamIndices
for _, upstreamTableInfo := range upstreamTableInfos {
newUpstreamIndices := make([]*model.IndexInfo, 0, len(upstreamTableInfo.Indices))
for _, index := range upstreamTableInfo.Indices {
if _, ok := deleteIndicesSet[index.Name.O]; !ok {
newUpstreamIndices = append(newUpstreamIndices, index)
} else {
log.Debug("delete upstream index", zap.String("name", upstreamTableInfo.Name.O), zap.String("index", index.Name.O))
}
}
upstreamTableInfo.Indices = newUpstreamIndices
}
}
return len(deleteIndicesSet) == 0, false //当两边索引不一致时,返回否
}
func sameProperties(c1, c2 *model.ColumnInfo) bool {//列属性校验函数定义,
switch c1.GetType() {
case mysql.TypeVarString, mysql.TypeString, mysql.TypeVarchar:
if c1.FieldType.GetCharset() != c2.FieldType.GetCharset() {
log.Warn("Ignoring character set differences",
zap.String("column name", c1.Name.O),
zap.String("charset source", c1.FieldType.GetCharset()),
zap.String("charset target", c2.FieldType.GetCharset()),
)
}
if c1.FieldType.GetCollate() != c2.FieldType.GetCollate() {
log.Warn("Ignoring collation differences",
zap.String("column name", c1.Name.O),
zap.String("collation source", c1.FieldType.GetCollate()),
zap.String("collation target", c2.FieldType.GetCollate()),
)
}
return c1.FieldType.GetFlen() == c2.FieldType.GetFlen() //判断上下游列的长度定义是否相同
default:
return true
}
}
03功能总结
根据对上述结构校验代码的分析,我们可以总结出v6.5.7 sync_diff_inspector 工具可以校验 的表结构项目包括如下几项:
- 列的数量
- 列的名称
- 列的类型
- 列的长度定义
- 索引的差异