3
1
0
2
专栏/.../

上游sql通过drainer同步到kafka时在kafka中是什么样子的

 qhd2004  发表于  2022-08-19

环境:上游为v5.4.1版本tidb集群,下游为2.12-2.4.1版本kafka集群,使用drainer进行同步数据

本文对上游中的ddl、dml在下游是如何体现,以及是否会对同步产生影响,做个抛砖引玉的介绍,相关测试过程如下:

drainer的配置

- host: 10.103.236.178
  ssh_port: 22
  port: 8239
  deploy_dir: /data/tidb-deploy/drainer-8239
  data_dir: /data/tidb-data/drainer-8239
  log_dir: log
  config:
    syncer.db-type: kafka
    syncer.ignore-schemas: INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql,db_name
    syncer.to.kafka-addrs: 10.xxx.xxx.10:9092,10.xxx.xxx.11:9092,10.xxx.xxx.59:9092
    syncer.to.kafka-max-messages: 1024
    syncer.to.kafka-version: 2.4.1
    syncer.to.topic-name: syk-test-binlog-to-kafka
  arch: amd64
  os: linux

注意:也可以设置replicate-do-db来指定只复制哪些库,但是这个replicate-do-db参数我在测试时,是不成功的,体现在drainer这个服务根本就启动不起来。

1,上游执行create

create table moe_test
(
    id int(3) auto_increment not null primary key,
    name char(10) not null,
    address varchar(50) default 'beijing',
    year date
);

drainer日志

[2022/08/19 11:17:06.836 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105937] [binlog="tp:Commit start_ts:435389471196708868 commit_ts:435389471196708871 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"create table moe_test\\n(\\n    id int(3) auto_increment not null primary key,\\n    name char(10) not null,\\n    address varchar(50) default 'beijing',\\n    year date\\n)\" ddl_job_id:105937 ddl_schema_state:5 "]
[2022/08/19 11:17:06.863 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105937, Type:create table, State:synced, SchemaState:public, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 11:17:05.256 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"]
[2022/08/19 11:17:06.973 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="create table moe_test\n(\n    id int(3) auto_increment not null primary key,\n    name char(10) not null,\n    address varchar(50) default 'beijing',\n    year date\n)"] ["commit ts"=435389471196708871] [shouldSkip=false]
[2022/08/19 11:17:06.999 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.xxx.xxx.59:9092] from broker %!s(MISSING)\n"]
[2022/08/19 11:17:07.023 +08:00] [INFO] [sarama.go:122] ["[sarama] Connected to broker at [10.xxx.xxx.59:9092] (unregistered)\n"]
[2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[2 %!d(string=10.xxx.xxx.10:9092)] at %!s(MISSING)"]
[2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[3 %!d(string=10.xxx.xxx.11:9092)] at %!s(MISSING)"]
[2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[1 %!d(string=10.xxx.xxx.59:9092)] at %!s(MISSING)"]
[2022/08/19 11:17:07.096 +08:00] [INFO] [client.go:902] ["[sarama] client/metadata found some partitions to be leaderless"]
[2022/08/19 11:17:07.096 +08:00] [INFO] [client.go:870] ["[sarama] client/metadata retrying after 500ms... (10000 attempts remaining)\n"]
[2022/08/19 11:17:07.598 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.xxx.xxx.59:9092] from broker %!s(MISSING)\n"]
[2022/08/19 11:17:07.612 +08:00] [INFO] [async_producer.go:744] ["[sarama] producer/broker/2 starting up\n"]
[2022/08/19 11:17:07.613 +08:00] [INFO] [async_producer.go:760] ["[sarama] producer/broker/2 state change to [open] on syk-test-binlog-to-kafka/0\n"]
[2022/08/19 11:17:07.626 +08:00] [INFO] [sarama.go:122] ["[sarama] Connected to broker at [10.xxx.xxx.10:9092 %!s(int32=2)] (registered as #%!d(MISSING))\n"]

kafka中查询

小结:在drainer日志中create语句、kafka broker信息、topic都体现出来了。查看kafka的内容,发现的是create的sql语句。因此,对于create语句,不会产生大的binlog,也不会引起kafka server: Message was too large

2,上游执行insert

insert into moe_test (name,address,year) values('allen','大连一中','1976-10-10');
insert into moe_test (name,address,year) values('jack','大连二中','1975-12-23');
insert into moe_test (name,address,year) values('jordan','芝加哥公牛','1984-03-23');

drainer日志

[2022/08/19 12:25:57.918 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390554362609666] [version=130819]

kafka中查询

image.png

小结:对于insert,kafka中内容是表结构与行数据都有体现,因此,对于如果是大量insert语句,是可能产生大的binlog,也可能引起kafka server: Message was too large,也可能引起下面错误

[2022/08/18 18:16:30.214 +08:00] [INFO] [pump.go:166] ["receive big size binlog"] [size="624 MB"]
[2022/08/18 18:16:30.577 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373417798303755] [version=202807]
[2022/08/18 18:17:51.315 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373417811410954] [version=202807]
[2022/08/18 18:17:54.373 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373440801177602] [version=202807]
[2022/08/18 18:17:57.460 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373441535442957] [version=202807]
[2022/08/18 18:33:27.895 +08:00] [ERROR] [syncer.go:533] ["Failed to close syncer"] [error="fail to push msg to kafka after 30s, check if kafka is up and working"] [errorVerbose="fail to push msg to kafka after 30s, check if kafka is up and working\ngith
ub.com/pingcap/tidb-binlog/drainer/sync.(*KafkaSyncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/sync/kafka.go:236\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1371"]
[2022/08/18 18:33:27.895 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650948] [version=202816]
[2022/08/18 18:33:31.311 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650950] [version=202816]
[2022/08/18 18:33:37.896 +08:00] [INFO] [server.go:465] ["begin to close drainer server"]
[2022/08/18 18:33:37.896 +08:00] [ERROR] [util.go:69] ["Recovered from panic"] [err="\"Waiting too long for `Syncer.run` to quit.\""] ["real stack"="github.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1.1\n\t/home/jenkins/agent/workspace/build-
common/go/src/github.com/pingcap/tidb-binlog/drainer/util.go:71\nruntime.gopanic\n\t/usr/local/go/src/runtime/panic.go:965\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-
binlog/drainer/syncer.go:539\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).Start\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/syncer.go:151\ngithub.com/pingcap/tidb-binlog/drainer.(*Server).Start.func4\n
\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/server.go:290\ngithub.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/
drainer/util.go:79"] [name=syncer]
[2022/08/18 18:33:37.896 +08:00] [INFO] [util.go:76] [Exit] [name=syncer]
[2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:430] ["has already update status"] [id=10.xxx.xxx.59:8249]
[2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:469] ["commit status done"]
[2022/08/18 18:33:37.927 +08:00] [INFO] [collector.go:136] ["publishBinlogs quit"]
[2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=heartbeat]
[2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250]
[2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250]
[2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=collect]
[2022/08/18 18:33:37.927 +08:00] [INFO] [main.go:73] ["drainer exit"]

3,上游执行delete

delete from moe_test where id=1;

drainer日志

[2022/08/19 12:30:42.362 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.99.110.11:9092] from broker %!s(MISSING)\n"]
[2022/08/19 12:30:45.497 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390629322424322] [version=130819]

kafka中查询

小结:对于delete来说,可能情况跟上面的insert一样。

4,上游执行update

update moe_test set address='xxxxxx' where id=2;

drainer日志

[2022/08/19 12:35:46.485 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390708280459266] [version=130819]

kafka中查询

小结:update在kafka中是把更新前的值与更新的值一起体现出来,其他可能的情况跟insert一样。

5,上游执行列操作

alter table moe_test drop column year;

drainer日志

[2022/08/19 12:38:02.341 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105938] [binlog="tp:Commit start_ts:435390744390795265 commit_ts:435390744403902465 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"alter table moe_test drop column year\" ddl_job_id:105938 ddl_schema_state:1 "]
[2022/08/19 12:38:03.347 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"]
[2022/08/19 12:38:03.347 +08:00] [INFO] [schema.go:289] ["Got DeleteOnly Job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:delete only, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"]
[2022/08/19 12:38:03.347 +08:00] [INFO] [syncer.go:454] ["Syncer skips DeleteOnly DDL"] [job="ID:105938, Type:drop column, State:synced, SchemaState:delete only, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [ts=435390744403902465]
[2022/08/19 12:38:07.549 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105938] [binlog="tp:Commit start_ts:435390744430116865 commit_ts:435390744430116866 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"alter table moe_test drop column year\" ddl_job_id:105938 "]
[2022/08/19 12:38:07.551 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"]
[2022/08/19 12:38:07.552 +08:00] [INFO] [schema.go:501] ["Finished dropping column"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"]
[2022/08/19 12:38:07.563 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="alter table moe_test drop column year"] ["commit ts"=435390744430116866] [shouldSkip=false]
[2022/08/19 12:38:07.577 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390744430116866] [version=130823]
[2022/08/19 12:38:12.697 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390746579697665] [version=130823]

kafka中查询

image.png

小结:日志与kafka内容中都体现的是alter语句,不会产生大的binlog,也不会引起kafka server: Message was too large。

6,上游执行truncate

truncate table moe_test;

drainer日志

[2022/08/19 14:05:31.893 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105940] [binlog="tp:Commit start_ts:435392119781720073 commit_ts:435392119781720075 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTID:1059\\37736\\000\\000\\000\\000\\000\\000\\371\" ddl_query:\"truncate table moe_test\" ddl_job_id:105940 ddl_schema_state:5 "]
[2022/08/19 14:05:31.897 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105940, Type:truncate table, State:synced, SchemaState:public, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 14:05:28.806 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"]
[2022/08/19 14:05:31.926 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="truncate table moe_test"] ["commit ts"=435392119781720075] [shouldSkip=false]
[2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:1011] ["[sarama] producer/broker/2 state change to [closing] because kafka: broker not connected\n"]
[2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:611] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [retrying-1]\n"]
[2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:621] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 abandoning broker 2\n"]
[2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:750] ["[sarama] producer/broker/2 input chan closed\n"]
[2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:843] ["[sarama] producer/broker/2 shut down\n"]
[2022/08/19 14:05:32.427 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.99.110.10:9092] from broker %!s(MISSING)\n"]
[2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:744] ["[sarama] producer/broker/2 starting up\n"]
[2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:760] ["[sarama] producer/broker/2 state change to [open] on syk-test-binlog-to-kafka/0\n"]
[2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:594] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 selected broker 2\n"]
[2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:627] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [flushing-1]\n"]
[2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:649] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [normal]\n"]
[2022/08/19 14:05:32.454 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392119781720075] [version=130824]

kafka中查询

image.png

小结:日志与kafka内容中都体现的是truncate语句,不会产生大的binlog,也不会引起kafka server: Message was too large。

7,上游执行drop

drop table syk_test;

drainer日志

[2022/08/19 14:10:03.681 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105941] [binlog="tp:Commit start_ts:435392191333924865 commit_ts:435392191347032067 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTID:4603\\3777\\000\\000\\000\\000\\000\\000\\000\\370\" ddl_query:\"drop table syk_test\" ddl_job_id:105941 "]
[2022/08/19 14:10:03.685 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105941, Type:drop table, State:synced, SchemaState:queueing, SchemaID:5840, TableID:46037, RowCount:0, ArgLen:0, start time: 2022-08-19 14:10:01.606 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"]
[2022/08/19 14:10:03.685 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="drop table syk_test"] ["commit ts"=435392191347032067] [shouldSkip=false]
[2022/08/19 14:10:03.700 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392191347032067] [version=130827]
[2022/08/19 14:10:09.786 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392192985169921] [version=130827]

kafka中查询

小结:日志与kafka内容中都体现的是drop语句,不会产生大的binlog,也不会引起kafka server: Message was too large。

根据上面测试可以得出结论:

1,只有dml才可能造成大的binlog。

2,大事务可能引起kafka server: Message was too large,此时可以调整下游kafka中参数(参考https://docs.pingcap.com/zh/tidb/v4.0/handle-tidb-binlog-errors#drainer-%E5%90%8C%E6%AD%A5%E6%95%B0%E6%8D%AE%E5%88%B0-kafka-%E6%97%B6%E6%8A%A5%E9%94%99-kafka-server-message-was-too-large-server-rejected-it-to-avoid-allocation-error),但更应该考虑上游业务,是否可以把大事务拆成多个小事务。

3,大binlog会引起下面错误(我们反复测试后发现binlog在500M-1G时会引起)。

[2022/08/18 18:33:27.895 +08:00] [ERROR] [syncer.go:533] ["Failed to close syncer"] [error="fail to push msg to kafka after 30s, check if kafka is up and working"] [errorVerbose="fail to push msg to kafka after 30s, check if kafka is up and working\ngith
ub.com/pingcap/tidb-binlog/drainer/sync.(*KafkaSyncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/sync/kafka.go:236\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1371"]
[2022/08/18 18:33:27.895 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650948] [version=202816]
[2022/08/18 18:33:31.311 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650950] [version=202816]
[2022/08/18 18:33:37.896 +08:00] [INFO] [server.go:465] ["begin to close drainer server"]
[2022/08/18 18:33:37.896 +08:00] [ERROR] [util.go:69] ["Recovered from panic"] [err="\"Waiting too long for `Syncer.run` to quit.\""] ["real stack"="github.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1.1\n\t/home/jenkins/agent/workspace/build-
common/go/src/github.com/pingcap/tidb-binlog/drainer/util.go:71\nruntime.gopanic\n\t/usr/local/go/src/runtime/panic.go:965\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-
binlog/drainer/syncer.go:539\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).Start\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/syncer.go:151\ngithub.com/pingcap/tidb-binlog/drainer.(*Server).Start.func4\n
\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/server.go:290\ngithub.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/
drainer/util.go:79"] [name=syncer]
[2022/08/18 18:33:37.896 +08:00] [INFO] [util.go:76] [Exit] [name=syncer]
[2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:430] ["has already update status"] [id=10.xxx.xxx.59:8249]
[2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:469] ["commit status done"]
[2022/08/18 18:33:37.927 +08:00] [INFO] [collector.go:136] ["publishBinlogs quit"]
[2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=heartbeat]
[2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250]
[2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250]
[2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=collect]
[2022/08/18 18:33:37.927 +08:00] [INFO] [main.go:73] ["drainer exit"]

#end

3
1
0
2

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论