前言
本文将探讨从 MongoDB 到 TiDB 的数据复制的实现方式。
Gravity 支持大数据总线,能够解析 MySQL Binlog、MongoDB Oplog 中的数据变更并发布到 kafka 供下游消费;在同步过程中,支持对数据进行在线变换。
MongoDB 作为 NoSQL 的代表,数据采用 json 的存储结构;MtySQL 5.7 以后也支持 json 类型,MySQL 5.5 和 5.6 则需要将 json 转换成字符串格式存储;所以如果不考虑性能,从MongoDB 复制数据到 MySQL 的方案是可行的。
TiDB 作为分布式 NewSQL 数据库的代表,实现了自动的水平伸缩,强一致性的分布式事务,基于 Raft 的多副本复制特性除了具备 NoSQL 的优点,还兼容 MySQL 协议,并支持大部分 MySQL 语法,包括 json 数据类型,使用场景上要比 NoSQL 更加丰富。
方案测试
下面部署 Gravity 并测试从 MongoDB 到 TiDB 的数据复制方案,主要分以下两个步骤进行:
- 配置 Gravity 从 MongoDB 获取解析 Oplog 并发布到 Kafka
- 从 Kafka 中解析输出的数据格式,构造 SQL将变更同步到 TiDB
配置 mongodb 到 kafka 的同步
(1)安装 MongoDB 副本集
(2)安装 Kafka
(3)配置 Go 语言环境
(4)配置 Gravity
安装启动
mkdir -p $GOPATH/src/github.com/moiot/ && cd $GOPATH/src/github.com/moiot
git clone https://github.com/moiot/gravity.git
cd gravity && make
nohup bin/gravity -config mongo2kafka.toml &
配置文件
name = "mongo2kafka"
#
# Input 插件的定义
#
#
# 源端 Mongo 连接配置
# - 必填
#
[input.mongooplog.source]
host = "127.0.0.1"
port = 27017
username = ""
password = ""
#
# 源端 Mongo Oplog 的起始点,若不配置,则从当前最新的 Oplog 开始同步
# - 默认为空
# - 可选
#
[input.mongooplog]
# start-position = 123456
#
# 源端 Mongo Oplog 并发相关配置
# - 默认分别为 false, 50, 512, "750ms"
# - 可选 (准备废弃)
[input.mongooplog.gtm-config]
use-buffer-duration = false
buffer-size = 50
channel-size = 512
buffer-duration-ms = "750ms"
#
# Output 插件的定义
#
#
# 目标端 Kafka 连接配置
# - 必填
#
[output.async-kafka.kafka-global-config]
# - 必填
broker-addrs = ["localhost:9092"]
mode = "async"
# 目标端 kafka SASL 配置
# - 可选
[output.async-kafka.kafka-global-config.net.sasl]
enable = false
user = ""
password = ""
#
# 目标端 Kafka 路由配置
# - 必填
#
[[output.async-kafka.routes]]
match-schema = "test"
match-table = "test_table"
dml-topic = "test.test_table"
#
# 目标端编码规则:输出类型和版本号
# - 可选
[output.async-kafka]
# 默认为 json
output-format = "json"
# 默认为 2.0 版本
schema-version = "0.1"
#
# scheduler 插件的定义,此处使用默认 scheduler
#
[scheduler.batch-table-scheduler]
nr-worker = 1
batch-size = 1
queue-size = 1024
sliding-window-size = 10240
(5)测试数据同步
- MongoDB 数据变更
use test;
# 插入数据
db.test_table.insert({name:'pingcap',age:3});
# 更新数据
db.test_table.update({name:'pingcap'},{$set:{age:4}})
# 删除数据
db.test_table.remove({name:'pingcap'})
Kafka Topic 订阅
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.test_table --from-beginning
Kafka 消息输出
# 插入数据
{
"version": "0.1",
"database": "test",
"collection": "test_table",
"unique_source_name": "127.0.0.1",
"oplog": {
"timestamp": 1547033910,
"ordinal": 2,
"_id": "5c35dd3667be48b14aeb46f6",
"operation": "i",
"namespace": "test.test_table",
"data": {
"_id": "5c35dd3667be48b14aeb46f6",
"age": 3,
"name": "pingcap"
},
"row": null,
"source": 0
}
}
# 更新数据
{
"version": "0.1",
"database": "test",
"collection": "test_table",
"unique_source_name": "127.0.0.1",
"oplog": {
"timestamp": 1547034515,
"ordinal": 1,
"_id": "5c35dd3667be48b14aeb46f6",
"operation": "u",
"namespace": "test.test_table",
"data": {
"$set": {
"age": 4
}
},
"row": {
"_id": "5c35dd3667be48b14aeb46f6",
"age": 4,
"name": "pingcap"
},
"source": 0
}
}
# 删除数据
{
"version": "0.1",
"database": "test",
"collection": "test_table",
"unique_source_name": "127.0.0.1",
"oplog": {
"timestamp": 1547035146,
"ordinal": 2,
"_id": "5c35dd3667be48b14aeb46f6",
"operation": "d",
"namespace": "test.test_table",
"data": null,
"row": null,
"source": 0
}
}
配置 kafka 到 TiDB 的同步
思路是根据 Kafka 输出的 json 数据格式,按照一定规则构造 DML 并在 TiDB 中执行 SQL。
(1)规则说明
- namespace 为 db.table
- _id 和 timestamp 映射为 mongo_id varchar(24) 和 mongo_ts timestamp
- operation 中的 i/u/d 分别对应 insert/update/delete
- data 中的外层的 key 解析为 column,value 解析为 int、varchar 或 json 类型存储
- 以 mongo_id 和 mongo_ts 为 where 条件完成 update 和 delete 操作
按照上述规则,将 Kafka 消息输出构造为如下 SQL:
# 插入数据
INSERT INTO test.test_table (mongo_id, mongo_ts, age, name)
VALUES ('5c35dd3667be48b14aeb46f6', from_unixtime(1547033910), 3, 'pingcap');
# 更新数据
UPDATE test.test_table
SET age = 4
WHERE mongo_id = '5c35dd3667be48b14aeb46f6'
AND mongo_ts = from_unix(1547034515);
# 删除数据
DELETE FROM test.test_table
WHERE mongo_id = '5c35dd3667be48b14aeb46f6'
AND mongo_ts = from_unix(1547035146);
(2)特殊处理
由于 MongoDB 使用 schema free 数据模型,database、table/collection、column/filed 都是隐式创建的,Oplog 中也并没有 DDL 操作;同步到 MySQL/TiDB 中需要先判断对应 database、table、column 是否存在,如果不存在则要先执行相应的 DDL。
例如上面的数据变更执行前,需要先执行
create database test;
create table test.test_table;
如果 insert/update 操作涉及到新的 column/field,需要先执行
alter table add column ...
对于更新操作的 unset,相当于把这个 column/field 删除,需要先执行
alter table drop column ...
此外 MongoDB 也是可以显式执行 create/drop 等 DDL 操作,但是这类操作 Kafka 消息中并没有解析输出。
MongoDB 语法转换到 SQL 规则可以参考 https://github.com/goodybag/mongo-sql 1
(3)规则实现
这里不考虑特殊处理,仅针对规则说明的部分,用 python 实现了一个简单的 demo 脚本,
功能是将来自 Kafka 订阅 Topic test.test_table 的 json 格式的 MongoDB DML 转换为 SQL 语句并在下游 TiDB 中执行。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# filename: json2sql.py
from kafka import KafkaConsumer
from kafka.client import KafkaClient
import MySQLdb
import json
import time
class KafkaPython:
consumer = server = topic = None
TOPIC = 'test.test_table'
BROKER_LIST = 'localhost:9092'
DB_USER = 'root'
DB_PASS = ''
DB_NAME = 'test_db'
DB_PORT = 4000
DB_IP = 'localhost'
DB_OPT = {'i':'insert', 'u':'update', 'd':'delete'}
def __init__(self):
print("init kafka consumer")
self.server = self.BROKER_LIST
self.topic = self.TOPIC
print("init mysql client")
self.db = MySQLdb.connect(self.DB_IP, self.DB_USER,
self.DB_PASS, self.DB_NAME, port=self.DB_PORT)
self.cursor = self.db.cursor()
def __del__(self):
print("end")
def getConnect(self):
self.consumer = KafkaConsumer(self.topic, bootstrap_servers = self.server)
def execSQL(self, sql):
rows = self.cursor.execute(sql)
# print sql
if rows > 0:
self.db.commit()
return rows
def beginConsumer(self):
for oneLog in self.consumer:
mlog = json.loads(oneLog.value).get('oplog', None)
if mlog is not None:
self.getDesc(mlog)
def getDesc(self, mlog):
desc = {}
for k,v in mlog.items():
if k == '_id':
desc['mongo_id'] = v
if k == 'timestamp':
desc['mongo_ts'] = v
if k == 'namespace':
desc['table'] = v
if k == 'operation':
desc['iud'] = v
if k == 'data':
desc['values'] = v
sql = self.ruleRoute(desc)
if sql is not None:
ret = self.execSQL(sql)
dt = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
print '%s %s %s rows' % (dt, self.DB_OPT[desc['iud']], ret)
def ruleRoute(self, desc):
if desc['iud'] not in self.DB_OPT.keys():
return
if desc['iud'] == 'i':
return self.stmtIns(desc)
elif desc['iud'] == 'u':
return self.stmtUpd(desc)
else:
return self.stmtDel(desc)
def stmtIns(self, desc):
mtbl = desc['table']
mid = desc['mongo_id']
mts = desc['mongo_ts']
data = desc['values']
data.pop('_id')
mlist = data.keys()
vlist = data.values()
mlist = mtbl + '(%s, %s, %s)' % ('mongo_id', 'mongo_ts', ', '.join(mlist))
clist = "('%s', from_unixtime(%s), " % (mid, mts)
for val in vlist:
if type(val).__name__ == 'unicode':
clist += "'%s', " % val
elif type(val).__name__ == 'dict':
clist += "'%s', " % json.dumps(val)
else:
clist += "%s, " % val
clist = clist[:-2] + ")"
sql = "insert into %s values %s" % (mlist, clist)
# print 'insert stmt: ',sql
return sql
def stmtUpd(self, desc):
mtbl = desc['table']
mid = desc['mongo_id']
mts = desc['mongo_ts']
data = desc['values']
stmt = []
for k, v in data.items():
if k != '$set':
return
for col, val in v.items():
if type(val).__name__ == 'unicode':
ln = "%s = '%s'" % (col, val)
elif type(val).__name__ == 'dict':
ln = "%s = '%s'" % (col, json.dumps(val))
else:
ln = "%s = %s" % (col, val)
stmt.append(ln)
col = ','.join(stmt)
sql = "update %s set %s where mongo_id = '%s' and "
"mongo_ts = from_unixtime(%s)" % (mtbl, col, mid, mts)
# print 'update stmt: ',sql
return sql
def stmtDel(self, desc):
mtbl = desc['table']
mid = desc['mongo_id']
mts = desc['mongo_ts']
sql = "delete from %s where mongo_id = '%s' and "
"mongo_ts = from_unixtime(%s)" % (mtbl, mid, mts)
# print 'delete stmt: ',sql
return sql
def disConnect(self):
self.consumer.close()
if __name__ == '__main__':
kp = KafkaPython()
kp.getConnect()
kp.beginConsumer()
(4)同步测试
启动脚本后首先初始化连接到 Kafka 以及下游 TiDB。
# python json2sql.py
init kafka consumer
init mysql client
当接收到 Kafka 的新消息后,会将其解析为 SQL ,打印出来的格式如下。
insert stmt: insert into test.test_table(mongo_id, mongo_ts, info, age, addr, name) values ('5c35dde567be48b14aeb46f8'
, from_unixtime(1547034085), {"employess": 80, "offce": "beijing"}3, 'dongsheng', 'pingcap01')
接着会在之前建立的 TiDB 连接中执行 SQL,如果返回的 rows 大于 0,提交操作;Kafka 消费的 Topic 对应某些库表的 DML,解析出来的是流式数据,到 TiDB 执行 SQL 也是串行的,不存在乐观锁事务冲突的问题;当上游 MongoDB 写入压力较大时,会存在一定的延迟。
关于全量数据复制 由于 mongodump 备份文件是 bson 格式的二进制数据,与 MySQL 不兼容,无法直接导入,
而 mongoexport 可以将数据导出格式为 csv、json 的文本,因此对于全量数据,通过 load data 命令导入 csv 文本是可行的;
如果数据量比较大,则可以根据各 collection 的大小分批导出多个文件,然后并行执行 load 将数据导入 MySQL/TiDB 中。
需要说明的是,不管是全量还是增量数据复制,都要将 MongoDB 中的 collection 映射为 MySQL/TiDB 中的 table,这部分工作要提前完成;
如果涉及到下游 DDL 变更,也要暂停同步,手动完成表的变更。
因此,对于 MongoDB 中的 collection,如果 schema 设计不够规范,各 document 的 field 的数量、类型不固定,
那么数据同步过程难免频繁中断,进行手工维护,甚至可能变得不可维护,这是对业务库表设计及对所要同步数据的考量;
此外数据同步时的失败处理、中断恢复、一致性校验等问题也需要进一步考虑。
总结
本文主要介绍了从 MongoDB 到 TiDB 的数据复制方案和一些注意要点 ,并结合 Gravity 工具实现从 MongoDB 到 Kafka 的数据同步,并简单展示了如何将 Kafka 的输出数据进行格式转换,构造 DML SQL 完成到 TiDB 的增量同步;由于 MongoDB 采用非关系型数据模型,且不支持标准 SQL 语法,除了简单的增删改操作,MongoDB 的丰富语法在 Oplog 中还会解析出更多格式,需要根据业务需要或借助专门的类库进一步完善。