0
0
0
0
专栏/.../

sync_diff_inspector 表结构比较功能探索

 yangzhj  发表于  2024-07-23
原创迁移

背景

https://docs.pingcap.com/zh/tidb/v6.5/sync-diff-inspector-overview

上述链接是TiDB官方校验工具sync_diff_inspector 的解释,其中功能的第一条就是:对比表结构和数据。

本人最近经历的一个迁移项目在使用该工具过程中,对迁移上下游 库表进行校验时,发现某张表上游无主键,下游有主键,其他结构信息相同时,sync_diff_inspector 工具显示校验无差异。基于上述情况,本人准备翻阅工具的代码,对结构校验的功能进行探索。

探索过程

01获取代码

首先该项目采用的是sync_diff_inspector 6.5.7 版本,因此需要从 github 获取对应版本的源码包:https://github.com/pingcap/tidb-tools/releases/tag/v6.5.7 。然后使用 go ide 工具打开分析。

02代码分析

对于 go 代码的分析,一般从 main 函数作为程序运行入口。我们 从main 函数代码开始,逐步分析结构校验的代码逻辑:

第一步:main.go 文件 ,程序主入口

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
//略
)

func main() {
	//初始化和配置文件检查加载相关代码,略
	if !checkSyncState(ctx, cfg) { //执行sync_diff 校验函数
		log.Warn("check failed!!!")
		os.Exit(1)
	}
	log.Info("check pass!!!")
}

func checkSyncState(ctx context.Context, cfg *config.Config) bool {//sync_diff 校验函数定义
	beginTime := time.Now()
	defer func() {
		log.Info("check data finished", zap.Duration("cost", time.Since(beginTime)))
	}()

	d, err := NewDiff(ctx, cfg) //创建一个校验示例
	if err != nil {
		fmt.Printf("There is something error when initialize diff, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName))
		log.Fatal("failed to initialize diff process", zap.Error(err))
		return false
	}
	defer d.Close()

	if !cfg.CheckDataOnly { //判断是否只校验数据,如果只校验数据为否,则会执行下面 检查结构 的代码。
		err = d.StructEqual(ctx)//执行结构校验函数
		if err != nil {
			fmt.Printf("There is something error when compare structure of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName))
			log.Fatal("failed to check structure difference", zap.Error(err))
			return false
		}
	} else {
		log.Info("Check table data only, skip struct check")
	}
	if !cfg.CheckStructOnly {
		err = d.Equal(ctx)
		if err != nil {
			fmt.Printf("There is something error when compare data of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName))
			log.Fatal("failed to check data difference", zap.Error(err))
			return false
		}
	} else {
		log.Info("Check table struct only, skip data check")
	}
	return d.PrintSummary(ctx)
}

第二步:diff.go 文件,校验准备工作,循环校验每张表,获取结构校验所需的表信息。

func (df *Diff) StructEqual(ctx context.Context) error {//结构校验函数定义
	tables := df.downstream.GetTables()
	tableIndex := 0
	if df.startRange != nil {
		tableIndex = df.startRange.ChunkRange.Index.TableIndex
	}
	for ; tableIndex < len(tables); tableIndex++ {
		isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].TableLack
		if common.AllTableExist(isAllTableExist) {
			var err error
			isEqual, isSkip, err = df.compareStruct(ctx, tableIndex) //循环调用比较结构的函数
			if err != nil {
				return errors.Trace(err)
			}
		}
		progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllTableExist)
		df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllTableExist)
	}
	return nil
}

func (df *Diff) compareStruct(ctx context.Context, tableIndex int) (isEqual bool, isSkip bool, err error) {//比较结构函数的定义
	sourceTableInfos, err := df.upstream.GetSourceStructInfo(ctx, tableIndex)
	if err != nil {
		return false, true, errors.Trace(err)
	}
	table := df.downstream.GetTables()[tableIndex]
	isEqual, isSkip = utils.CompareStruct(sourceTableInfos, table.Info) //调用 utils包的结构校验函数,参数是上下游的表信息
	table.IgnoreDataCheck = isSkip
	return isEqual, isSkip, nil
}

第三步骤:util.go 文件 做结构校验逻辑的函数。

// CompareStruct compare tables' columns and indices from upstream and downstream.
// There are 2 return values:
//
//	isEqual	: result of comparing tables' columns and indices
//	isPanic	: the differences of tables' struct can not be ignored. Need to skip data comparing.
func CompareStruct(upstreamTableInfos []*model.TableInfo, downstreamTableInfo *model.TableInfo) (isEqual bool, isPanic bool) {
	// compare columns
	for _, upstreamTableInfo := range upstreamTableInfos {
		if len(upstreamTableInfo.Columns) != len(downstreamTableInfo.Columns) {
			// the numbers of each columns are different, don't compare data
			log.Error("column num not equal",//校验列数量不一致
				zap.String("upstream table", upstreamTableInfo.Name.O),
				zap.Int("column num", len(upstreamTableInfo.Columns)),
				zap.String("downstream table", downstreamTableInfo.Name.O),
				zap.Int("column num", len(downstreamTableInfo.Columns)),
			)
			return false, true
		}

		for i, column := range upstreamTableInfo.Columns {
			if column.Name.O != downstreamTableInfo.Columns[i].Name.O {
				// names are different, panic!
				log.Error("column name not equal",//校验列名称不一致
					zap.String("upstream table", upstreamTableInfo.Name.O),
					zap.String("column name", column.Name.O),
					zap.String("downstream table", downstreamTableInfo.Name.O),
					zap.String("column name", downstreamTableInfo.Columns[i].Name.O),
				)
				return false, true
			}

			if !isCompatible(column.GetType(), downstreamTableInfo.Columns[i].GetType()) {
				// column types are different, panic!
				log.Error("column type not compatible",//校验列类型不一致
					zap.String("upstream table", upstreamTableInfo.Name.O),
					zap.String("column name", column.Name.O),
					zap.Uint8("column type", column.GetType()),
					zap.String("downstream table", downstreamTableInfo.Name.O),
					zap.String("column name", downstreamTableInfo.Columns[i].Name.O),
					zap.Uint8("column type", downstreamTableInfo.Columns[i].GetType()),
				)
				return false, true
			}

			if !sameProperties(column, downstreamTableInfo.Columns[i]) { //调用列属性校验函数
				// column properties are different, panic!
				log.Error("column properties not compatible",//校验列属性不一致
					zap.String("upstream table", upstreamTableInfo.Name.O),
					zap.String("column name", column.Name.O),
					zap.Uint8("column type", column.GetType()),
					zap.String("downstream table", downstreamTableInfo.Name.O),
					zap.String("column name", downstreamTableInfo.Columns[i].Name.O),
					zap.Uint8("column type", downstreamTableInfo.Columns[i].GetType()),
				)
				return false, true
			}
		}
	}

	// compare indices 校验索引的逻辑
	deleteIndicesSet := make(map[string]struct{})
	unilateralIndicesSet := make(map[string]struct{})
	downstreamIndicesMap := make(map[string]*struct {
		index *model.IndexInfo
		cnt   int
	})
	for _, index := range downstreamTableInfo.Indices {
		downstreamIndicesMap[index.Name.O] = &struct {
			index *model.IndexInfo
			cnt   int
		}{index, 0}
	}
	for _, upstreamTableInfo := range upstreamTableInfos {

	NextIndex:
		for _, upstreamIndex := range upstreamTableInfo.Indices {
			if _, ok := deleteIndicesSet[upstreamIndex.Name.O]; ok {
				continue NextIndex
			}

			indexU, ok := downstreamIndicesMap[upstreamIndex.Name.O]
			if ok {
				if len(indexU.index.Columns) != len(upstreamIndex.Columns) {
					// different index, should be removed
					deleteIndicesSet[upstreamIndex.Name.O] = struct{}{}
					continue NextIndex
				}

				for i, indexColumn := range upstreamIndex.Columns {
					if indexColumn.Offset != indexU.index.Columns[i].Offset || indexColumn.Name.O != indexU.index.Columns[i].Name.O {
						// different index, should be removed
						deleteIndicesSet[upstreamIndex.Name.O] = struct{}{}
						continue NextIndex
					}
				}
				indexU.cnt = indexU.cnt + 1
			} else {
				unilateralIndicesSet[upstreamIndex.Name.O] = struct{}{}
			}
		}
	}

	existBilateralIndex := false
	for _, indexU := range downstreamIndicesMap {
		if _, ok := deleteIndicesSet[indexU.index.Name.O]; ok {
			continue
		}
		if indexU.cnt < len(upstreamTableInfos) {
			// Some upstreamInfos don't have this index.
			unilateralIndicesSet[indexU.index.Name.O] = struct{}{}
		} else {
			// there is an index the whole tables have,
			// so unilateral indices can be deleted.
			existBilateralIndex = true
		}
	}

	// delete indices
	// If there exist bilateral index, unilateral indices can be deleted.
	if existBilateralIndex {
		for indexName := range unilateralIndicesSet {
			deleteIndicesSet[indexName] = struct{}{}
		}
	} else {
		log.Warn("no index exists in both upstream and downstream", zap.String("table", downstreamTableInfo.Name.O))
	}
	if len(deleteIndicesSet) > 0 {
		newDownstreamIndices := make([]*model.IndexInfo, 0, len(downstreamTableInfo.Indices))
		for _, index := range downstreamTableInfo.Indices {
			if _, ok := deleteIndicesSet[index.Name.O]; !ok {
				newDownstreamIndices = append(newDownstreamIndices, index)
			} else {
				log.Debug("delete downstream index", zap.String("name", downstreamTableInfo.Name.O), zap.String("index", index.Name.O))
			}
		}
		downstreamTableInfo.Indices = newDownstreamIndices

		for _, upstreamTableInfo := range upstreamTableInfos {
			newUpstreamIndices := make([]*model.IndexInfo, 0, len(upstreamTableInfo.Indices))
			for _, index := range upstreamTableInfo.Indices {
				if _, ok := deleteIndicesSet[index.Name.O]; !ok {
					newUpstreamIndices = append(newUpstreamIndices, index)
				} else {
					log.Debug("delete upstream index", zap.String("name", upstreamTableInfo.Name.O), zap.String("index", index.Name.O))
				}
			}
			upstreamTableInfo.Indices = newUpstreamIndices
		}

	}

	return len(deleteIndicesSet) == 0, false //当两边索引不一致时,返回否
}

func sameProperties(c1, c2 *model.ColumnInfo) bool {//列属性校验函数定义,
	switch c1.GetType() {
	case mysql.TypeVarString, mysql.TypeString, mysql.TypeVarchar:
		if c1.FieldType.GetCharset() != c2.FieldType.GetCharset() {
			log.Warn("Ignoring character set differences",
				zap.String("column name", c1.Name.O),
				zap.String("charset source", c1.FieldType.GetCharset()),
				zap.String("charset target", c2.FieldType.GetCharset()),
			)
		}
		if c1.FieldType.GetCollate() != c2.FieldType.GetCollate() {
			log.Warn("Ignoring collation differences",
				zap.String("column name", c1.Name.O),
				zap.String("collation source", c1.FieldType.GetCollate()),
				zap.String("collation target", c2.FieldType.GetCollate()),
			)
		}
		return c1.FieldType.GetFlen() == c2.FieldType.GetFlen() //判断上下游列的长度定义是否相同
	default:
		return true
	}
}

03功能总结

根据对上述结构校验代码的分析,我们可以总结出v6.5.7 sync_diff_inspector 工具可以校验 的表结构项目包括如下几项:

  1. 列的数量
  2. 列的名称
  3. 列的类型
  4. 列的长度定义
  5. 索引的差异

0
0
0
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论