脚本内容
#!/bin/bash
# 全局变量存储配置信息
MASTER_DB=""
MASTER_USER=""
MASTER_PASSWORD=""
SLAVE_DB=""
SLAVE_USER=""
SLAVE_PASSWORD=""
MASTER_PD=""
SLAVE_PD=""
VERSION=""
RELOAD_LIST=""
CHANGEFEED_ID=""
CDC_CONFIG=""
LOCK_USER=""
MOD=""
clusterName=""
SINK_URI=""
# 参数解析函数
parserArgs() {
while [ "$1" != "" ]; do
case $1 in
--masterDB=*)
MASTER_DB="${1#*=}"
;;
--masterUser=*)
MASTER_USER="${1#*=}"
;;
--masterPassword=*)
MASTER_PASSWORD="${1#*=}"
;;
--slaveDB=*)
SLAVE_DB="${1#*=}"
;;
--slaveUser=*)
SLAVE_USER="${1#*=}"
;;
--slavePassword=*)
SLAVE_PASSWORD="${1#*=}"
;;
--masterPD=*)
MASTER_PD="${1#*=}"
;;
--slavePD=*)
SLAVE_PD="${1#*=}"
;;
--version=*)
VERSION="${1#*=}"
;;
--reloadList=*)
RELOAD_LIST="${1#*=}"
;;
--changefeedID=*)
CHANGEFEED_ID="${1#*=}"
;;
--cdcConfig=*)
CDC_CONFIG="${1#*=}"
;;
--lockUser=*)
LOCK_USER="${1#*=}"
;;
--mod=*)
MOD="${1#*=}"
;;
--clusterName=*)
clusterName="${1#*=}"
;;
--sink-uri=*)
SINK_URI="${1#*=}"
;;
*)
echo "无效参数:$1"
usage
exit 1
;;
esac
shift
done
# 检查是否所有必需的参数都已设置
if [ -z "$MASTER_DB" ] || [ -z "$MASTER_USER" ] || [ -z "$MASTER_PASSWORD" ] ||
[ -z "$SLAVE_DB" ] || [ -z "$SLAVE_USER" ] || [ -z "$SLAVE_PASSWORD" ] ||
[ -z "$MASTER_PD" ] || [ -z "$SLAVE_PD" ] || [ -z "$VERSION" ] || [ -z "$RELOAD_LIST" ] ||
[ -z "$CHANGEFEED_ID" ] || [ -z "$CDC_CONFIG" ] || [ -z "$LOCK_USER" ] ||
[ -z "$MOD" ] || [ -z "$clusterName" ] || [ -z "$SINK_URI" ]; then
echo "缺少必需的参数。"
usage
exit 1
fi
}
# 使用说明函数
usage() {
echo "使用方法: $0 --clusterName=<集群名称> --masterDB=<数据库主机:端口> --masterUser=<用户名> --masterPassword=<密码> --slaveDB=<数据库从机:端口> --slaveUser=<用户名> --slavePassword=<密码> --masterPD=<PD主机:端口> --slavePD=<PD从机:端口> --version=<版本> --reloadList=<主机列表> --changefeedID=<changefeed标识> --cdcConfig=<配置文件> --lockUser=<锁定用户列表> --mod=<执行模式> --sink-uri=<TiDB连接URI>"
# 逻辑代码
echo "主库数据库: $MASTER_DB"
echo "主库用户: $MASTER_USER"
echo "主库密码: $MASTER_PASSWORD"
echo "Master PD 服务: $MASTER_PD"
echo "从库数据库: $SLAVE_DB"
echo "从库用户: $SLAVE_USER"
echo "从库密码: $SLAVE_PASSWORD"
echo "Slave PD 服务: $SLAVE_PD"
echo "CDC 版本: $VERSION"
echo "重载列表: $RELOAD_LIST"
echo "Changefeed 标识: $CHANGEFEED_ID"
echo "CDC 配置文件: $CDC_CONFIG"
echo "锁定用户: $LOCK_USER"
echo "新建 cdc 同步 uri: $SINK_URI"
echo "执行模式: $MOD,
1. 用户锁定
2. 重启 tidb server 节点
3. 确认是否有留存连接
4. 等待 cdc 任务同步追上
5. 删除 master cdc 任务
6. 创建从库到主库的同步
7. 解锁从库应用用户"
}
# 锁定账户
userLock() {
# 获取用户
local userAccounts=$LOCK_USER
# 读取账户到数组
IFS=',' read -ra ACCOUNTS <<< "$userAccounts"
# 创建一个新的数组用于存储过滤后的账户
filtered_accounts=()
# 遍历 ACCOUNTS 数组
for account in "${ACCOUNTS[@]}"; do
# 过滤掉包含 root@'%'
if [[ "$account" != "root@'%'" ]]; then
filtered_accounts+=("$account")
fi
done
# 锁定操作开始
for account in "${filtered_accounts[@]}"; do
echo "Locking account: $account"
IFS=':' read -r ip port <<< "$MASTER_DB"
mysql -h "$ip" -u "$MASTER_USER" -p"$MASTER_PASSWORD" -P "$port" -e "ALTER USER $account ACCOUNT LOCK;FLUSH PRIVILEGES;"
if [ $? -eq 0 ]; then
echo "Successfully locked: $account"
else
echo "Failed to lock: $account"
exit 1
fi
done
}
# 重启 tidb-server 节点
restartTidb(){
local tidbList=$RELOAD_LIST
# 使用数组来安全地存储和执行命令
local command=(~/.tiup/bin/tiup cluster reload "$clusterName" -R tidb -N "$tidbList" -c 200 -y)
# 使用echo显示命令
echo "执行重启命令:${command[*]}"
# 执行命令
"${command[@]}"
# 检查命令执行的返回值
if [ $? -eq 0 ]; then
echo "Reload command executed successfully."
else
echo "Reload command failed."
exit 1
fi
}
# 确定是否有相关账户连接留存
checkConnect(){
# master 的 ip 端口
IFS=':' read -r ip port <<< "$MASTER_DB"
# 封的用户:kfc_login@'%',phhs_login@'%' 转为:'kfc_login','phhs_login'
local users=$(echo $LOCK_USER|sed "s/@'%'//g" | sed "s/,/','/g")
# count 行数
local command=(mysql -h "$ip" -u "$MASTER_USER" -p"$MASTER_PASSWORD" -P "$port" -e \
"select count(1) as count from information_schema.cluster_processlist where USER in ('','$users');")
# 输出命令
echo "查看连接 SQL:"
echo ${command[*]}
# 执行命令
local result
result=$("${command[@]}")
local status=$? # 保存 mysql 命令的退出状态
# 检查命令执行的返回值
if [ $status -eq 0 ]; then
echo "count command executed successfully."
else
echo "count command failed."
exit 1
fi
# 行数判断
echo "'$users' 账户的连接数为:" $result
# 去除结果 \n
local result=$(echo $result | tr -d '\n')
if [ "$result" = "count 0" ]; then
echo "无连接,继续脚本"
else
echo "尚存连接,即将退出"
exit 1
fi
}
# 获取 cdc tso
getCheckPoint(){
# 执行查询命令
local command=(~/.tiup/bin/tiup cdc:$VERSION cli changefeed query -s --pd=http://$MASTER_PD --changefeed-id=$CHANGEFEED_ID)
# 输出命令
echo "取 cdc 获取同步任务状态命令: "
echo ${command[*]}
# 执行命令
local result
result=$("${command[@]}")
## 这里是测试问题看结果
# local result
# local result=$(cat ./tmp.txt)
# 检查命令执行的返回值
if [ $? -eq 0 ]; then
echo "get CDC checkpoint tso command executed successfully."
else
echo "get CDC checkpoint tso command failed."
exit 1
fi
# 获取 tso
cdcCheckPoint=$(echo "$result" | tr '\r' '\n' | grep "checkpoint_tso" | sed -n 's/.*"checkpoint_tso": \([0-9]*\),.*/\1/p')
echo "CDC check Point TSO: $cdcCheckPoint"
echo "$cdcCheckPoint"
}
# 确定 cdc 任务
checkCdcStatus(){
# master 的 ip 端口
IFS=':' read -r ip port <<< "$MASTER_DB"
# 获取 master 当前 TSO
local masterCommand=(mysql -h "$ip" -u "$MASTER_USER" -p"$MASTER_PASSWORD" -P "$port" -e
"show master status;")
# 输出命令
echo ${masterCommand[*]}
# 执行命令
local masterResult
masterResult=$("${masterCommand[@]}")
local status=$?
# 检查命令执行的返回值
if [ $status -eq 0 ]; then
echo "get Master tso command executed successfully."
else
echo "get Master tso command failed."
exit 1
fi
# 结果格式化,取第二行最后一个字段内容,也就是 pos 值
local masterPos=$(echo "$masterResult" | awk 'NR==2 {print $NF}')
echo "master pos 为:$masterPos"
# 定义循环控制变量
ctlNum=5
ctmTmp=1
# 进入循环
while [ $ctmTmp -le $ctlNum ]; do
echo "循环 $ctmTmp 次"
# 等待一小段时间,获取 cdc 同步位点
waitTime=3
echo "等待 $waitTime s"
sleep $waitTime;
# 获取 checkpoint,获取 func 最后一个 echo
local checkPoint=$(getCheckPoint | tail -n 1)
# echo $checkPoint
if [[ "$checkPoint" =~ ^-?[0-9]+$ ]]; then
echo "Checkpoint: $checkPoint is a integer number."
else
echo "Checkpoint: $checkPoint is not a integer number. exit!!"
exit 1
fi
# 比较 checkpoint 和 master tso,如果 checkpoint 大于等于 master tso
if [[ $checkPoint -ge $masterPos ]]; then
echo "checkPoint ($checkPoint) is greater than or equal to masterPos ($masterPos). Breaking the loop."
break # 如果 checkPoint 大于等于 masterPos,跳出循环
else
echo "checkPoint ($checkPoint) is less than masterPos ($masterPos). Continuing the loop."
fi
# 增加迭代器
ctmTmp=$(( ctmTmp + 1 ))
done
# 添加如果循环判断到最后,同步都没有追上,退出脚本
if [[ $ctmTmp -le $ctlNum ]]; then
# 判断结束
echo "判断结束"
else
echo "超过判断次数,退出整个脚本"
exit 1;
fi
}
# master cdc 任务删除
deleteCdcTask(){
# 暂停任务
pauseTaskCommand=(~/.tiup/bin/tiup cdc:$VERSION cli changefeed pause --pd=http://$MASTER_PD --changefeed-id=$CHANGEFEED_ID)
# 输出命令
echo "暂停 cdc 同步任务命令: "
echo ${pauseTaskCommand[*]}
# 执行命令
"${pauseTaskCommand[@]}"
# 检查命令执行的返回值
if [ $? -eq 0 ]; then
echo "Pause CDC task command executed successfully."
else
echo "Pause CDC task command failed."
exit 1
fi
# 删除任务
removeTaskCommand=(~/.tiup/bin/tiup cdc:$VERSION cli changefeed remove --pd=http://$MASTER_PD --changefeed-id=$CHANGEFEED_ID)
# 输出命令
echo "暂停 cdc 同步任务命令: "
echo ${removeTaskCommand[*]}
# 执行命令
"${removeTaskCommand[@]}"
# 检查命令执行的返回值
if [ $? -eq 0 ]; then
echo "Remove CDC task command executed successfully."
else
echo "Remove CDC task command failed."
exit 1
fi
}
createCdcTask(){
# slave 的 ip 端口
IFS=':' read -r ip port <<< "$SLAVE_DB"
# 获取 slave 当前 TSO
local slaveCommand=(mysql -h "$ip" -u "$SLAVE_USER" -p"$SLAVE_PASSWORD" -P "$port" -e
"show master status;")
# 输出命令
echo ${slaveCommand[*]}
# 执行命令
local slaveResult
slaveResult=$("${slaveCommand[@]}")
local status=$?
# 检查命令执行的返回值
if [ $status -eq 0 ]; then
echo "get Slave tso command executed successfully."
else
echo "get Slave tso command failed."
exit 1
fi
# 结果格式化,取第二行最后一个字段内容,也就是 pos 值
local slavePos=$(echo "$slaveResult" | awk 'NR==2 {print $NF}')
echo "slave 的 pos 为:$slavePos"
# 从库创建任务
createTaskCommand=(~/.tiup/bin/tiup cdc:$VERSION cli changefeed create --pd=http://$SLAVE_PD \
--changefeed-id=$CHANGEFEED_ID --sink-uri="$SINK_URI" --config=$CDC_CONFIG --start-ts="$slavePos")
# 输出命令
echo "创建 cdc 同步任务命令: "
echo ${createTaskCommand[*]}
# 执行命令
"${createTaskCommand[@]}"
# 检查命令执行的返回值
if [ $? -eq 0 ]; then
echo "Create CDC task command executed successfully."
else
echo "Create CDC task command failed."
exit 1
fi
}
# 解锁从库 user
userUnLock() {
local userAccounts=$LOCK_USER
# 解析数据库用户名
IFS=',' read -ra ACCOUNTS <<< "$userAccounts"
for account in "${ACCOUNTS[@]}"; do
echo "UnLock account: $account"
# 解析数据库账号密码
IFS=':' read -r ip port <<< "$SLAVE_DB"
mysql -h "$ip" -u "$SLAVE_USER" -p"$SLAVE_PASSWORD" -P "$port" -e "ALTER USER $account ACCOUNT UNLOCK;FLUSH PRIVILEGES;"
if [ $? -eq 0 ]; then
echo "Successfully Unlocked: $account"
else
echo "Failed to Unlock: $account"
exit 1
fi
done
}
# 主逻辑函数
main() {
parserArgs "$@"
# 逻辑代码
echo "主库数据库: $MASTER_DB"
echo "主库用户: $MASTER_USER"
echo "主库密码: $MASTER_PASSWORD"
echo "Master PD 服务: $MASTER_PD"
echo "从库数据库: $SLAVE_DB"
echo "从库用户: $SLAVE_USER"
echo "从库密码: $SLAVE_PASSWORD"
echo "Slave PD 服务: $SLAVE_PD"
echo "CDC 版本: $VERSION"
echo "重载列表: $RELOAD_LIST"
echo "Changefeed 标识: $CHANGEFEED_ID"
echo "CDC 配置文件: $CDC_CONFIG"
echo "锁定用户: $LOCK_USER"
echo "新建 cdc 同步 uri: $SINK_URI"
echo "执行模式: $MOD,
1. 用户锁定
2. 重启 tidb server 节点
3. 确认是否有留存连接
4. 等待 cdc 任务同步追上
5. 删除 master cdc 任务
6. 创建从库到主库的同步
7. 解锁从库应用用户"
# 开始任务
echo "<-------->开始任务<-------->"
# 1. 用户锁定
# userLock;
# 2. 重启 tidb server 节点
# restartTidb;
# 3. 确认是否有留存连接
# checkConnect;
# 4. 等待 cdc 任务同步追上
# checkCdcStatus;
# 5. 删除 master cdc 任务
# deleteCdcTask;
# 6. 创建从库到主库的同步
# createCdcTask;
# 7. 解锁从库应用用户
# userUnLock;
# 根据 mod 模式来执行步骤
# 根据传入的模式执行对应的函数
# 根据传入的模式执行对应的函数
IFS=',' read -ra ADDR <<< "$MOD"
for i in "${ADDR[@]}"; do
case $i in
1)
echo "Step 1: 用户锁定"
userLock
;;
2)
echo "Step 2: 重启 TiDB server 节点"
restartTidb
;;
3)
echo "Step 3: 确认是否有留存连接"
checkConnect
;;
4)
echo "Step 4: 等待 CDC 任务同步追上"
checkCdcStatus
;;
5)
echo "Step 5: 删除 master CDC 任务"
deleteCdcTask
;;
6)
echo "Step 6: 创建从库到主库的同步"
createCdcTask
;;
7)
echo "Step 7: 解锁从库应用用户"
userUnLock
;;
*)
echo "无效的选项: $i"
;;
esac
done
}
# 脚本入口点
if [ "${BASH_SOURCE[0]}" -ef "$0" ]; then
main "$@"
fi
# 测试命令
# ./failoverByCDC.sh --masterDB='10.1xxx02.58.180:4000' --masterUser='root' --masterPassword='tidbxx' --masterPD='10.xxx:2379' --slaveDB='10.102.5xx:4000' --slaveUser='root' --slavePassword='tidb@123' --slavePD='10.1xxx1:2379' --version='v7.1.4' --reloadList='172.xxx:4000,172.xxx.47:4000' --changefeedID='usexbackup' --cdcConfig='cdc.conf' --lockUser="xin@'%',px@'%',tidb@'%'" --mod='1,2,3' --clusterName="tidb-test" --sink-uri="tidb://root:w@-xxxxxxR@172.20.xx:4000/&transaction-atomicity=none"
使用说明
这个脚本是一个用于管理 TiDB 集群和 CDC 同步任务的 Bash 脚本。它包含多项功能,例如数据库用户锁定、重启 TiDB 服务器节点、检查连接、管理 CDC 任务等。以下是该脚本的详细使用说明:
脚本功能
该脚本主要用于TiDB数据库的主从切换及数据同步任务的管理。它可以执行以下操作:
- 锁定指定的数据库用户
- 重启 TiDB 服务节点
- 检查是否有连接留存
- 等待 CDC 同步任务赶上当前数据状态
- 删除现有的 CDC 同步任务
- 在从库创建新的 CDC 同步任务
- 解锁数据库用户
使用方法
脚本通过命令行参数接收配置信息,并根据这些参数执行不同的操作。使用示例如下:
./failoverByCDC.sh [参数列表]
参数列表
脚本支持以下参数:
- --clusterName=<集群名称>: 指定操作的 TiDB 集群名称。
- --masterDB=<主数据库地址:端口>: 主数据库的地址和端口,格式为 IP:端口。
- --masterUser=<主数据库用户名>: 用于访问主数据库的用户名。
- --masterPassword=<主数据库密码>: 用于访问主数据库的密码。
- --slaveDB=<从数据库地址:端口>: 从数据库的地址和端口,格式同主数据库。
- --slaveUser=<从数据库用户名>: 用于访问从数据库的用户名。
- --slavePassword=<从数据库密码>: 用于访问从数据库的密码。
- --masterPD=<PD主服务地址:端口>: 主 PD (Placement Driver) 服务的地址和端口,PD 是 TiDB 集群的元数据管理组件。
- --slavePD=<PD从服务地址:端口>: 从 PD 服务的地址和端口。
- --version=<CDC版本>: 指定 TiCDC 的版本,TiCDC 是 TiDB 集群的变更数据捕获组件。
- --reloadList=<重启节点列表>: 需要重启的 TiDB 节点的地址和端口列表,多个地址用逗号分隔。
- --changefeedID=<changefeed标识>: CDC 同步任务的唯一标识。
- --cdcConfig=<CDC配置文件路径>: CDC 同步任务的配置文件路径。
- --lockUser=<锁定的用户列表>: 需要锁定的数据库用户列表,多个用户用逗号分隔,格式为 用户名@'主机'。
- --mod=<执行模式>: 指定脚本执行的具体步骤,用逗号分隔的数字序列表示,每个数字对应脚本中定义的一个操作步骤。
- --sink-uri=<新 CDC 同步的 TiDB 连接 URI>: 新的 CDC 同步任务的目标 TiDB 实例的连接 URI。
示例
以下是一个使用示例:
./failoverByCDC.sh --clusterName="tidb-test" \
--masterDB='10.xxx:4000' \
--masterUser='root' \
--masterPassword='tidbxxx' \
--masterPD='10.xxx:2379' \
--slaveDB='10.xxx:4000' \
--slaveUser='root' \
--slavePassword='tidb@123' \
--slavePD='10.102.173.121:2379' \
--version='v7.1.4' \
--reloadList='172.xxx:4000,172xxx:4000' \
--changefeedID='userxxkup' \
--cdcConfig='cdc.conf' \
--lockUser="xin@'%',pxlogin@'%',tidb@'%'" \
--mod='1,2,3' \
--sink-uri="tidb://root:w@-xxx@1xx.xxx:4000/&transaction-atomicity=none"
示例参数解释
-
--mod='1,2,3':这个参数决定了脚本中执行的操作序列。每个数字对应脚本中定义的一个操作步骤。例如,在上面的示例中,1,2,3 分别表示:
- 1 - 锁定用户
- 2 - 重启 TiDB server 节点
- 3 - 检查是否有留存连接
mod 参数的可选值及其对应功能
- 1 锁定用户
- 2 重启 TiDB server 节点
- 3 检查是否有留存连接
- 4 等待 CDC 任务同步追上
- 5 删除 master CDC 任务
- 6 创建从库到主库的同步
- 7 解锁从库应用用户
您可以根据需要任意组合这些数字来指定执行的步骤。例如,如果只想删除 CDC 任务并创建新的同步任务,可以设置 --mod='5,6'。
注意事项
- 确保所有涉及的数据库、用户及其他配置信息都是正确的。
- 用逗号分隔 --mod 参数中的数字,不要有空格。
- 确保脚本和相关命令行工具具有执行权限,并在可以访问 TiDB 和 CDC 的环境中执行。
- 确保脚本具有执行权限,可使用 chmod +x failoverByCDC.sh 命令来设置。
- 该脚本需要在具有访问数据库和执行 TiDB、CDC 命令的环境中运行。
- 传入的参数应根据实际环境进行调整。
通过适当配置这些参数,灵活地使用此脚本来管理 TiDB 集群和 CDC 任务。