跳到主要内容

TiCDC系列分享 Open API与业务系统集成

作者:dapan3927 发表于 2022-05-18

前言

公司准备将内部运行的部分业务系统进行升级,将后台的MySQL数据库迁移至TiDB。在正式升级之前,先通过测试环境进行模拟操作。目前在测试环境中部署了两个单机的TiDB集群,分别部署在移动云和腾讯云的服务器上。移动云TiDB实例的数据库中部署TiCDC将数据同步至腾讯云TiDB数据库。为了便于在业务系统直接对TiCDC进行管理,希望在业务系统中集成TiCDC的管理。

TiCDC 提供 OpenAPI 功能,用户可以通过 OpenAPI 对 TiCDC 集群进行查询和运维操作。通过 OpenAPI 可以完成如下 TiCDC 集群的运维操作:

具体参见官方文档 TiCDC Open API。在业务系统中主要需要实现查询同步任务列表、查询特定同步任务、创建/删除/暂停/恢复同步任务。下文的内容主要围绕这几个接口的具体实现展开。

部署架构 & 硬件环境

两台服务器的部署架构相同。部署的topo.yaml文件如下:

# # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing.
global:
user: "tidb"
ssh_port: 22
deploy_dir: "/tidb-deploy"
data_dir: "/tidb-data"

# # Monitored variables are applied to all the machines.
monitored:
node_exporter_port: 9100
blackbox_exporter_port: 9115

server_configs:
tidb:
log.slow-threshold: 300
tikv:
readpool.storage.use-unified-pool: false
readpool.coprocessor.use-unified-pool: true
pd:
replication.enable-placement-rules: true
replication.location-labels: ["host"]
tiflash:
logger.level: "info"

pd_servers:
- host: 192.168.0.6

tidb_servers:
- host: 192.168.0.6

tikv_servers:
- host: 192.168.0.6
port: 20160
status_port: 20180
config:
server.labels: { host: "logic-host-1" }

- host: 192.168.0.6
port: 20161
status_port: 20181
config:
server.labels: { host: "logic-host-2" }

- host: 192.168.0.6
port: 20162
status_port: 20182
config:
server.labels: { host: "logic-host-3" }

tiflash_servers:
- host: 192.168.0.6

monitoring_servers:
- host: 192.168.0.6

grafana_servers:
- host: 192.168.0.6

TiCDC在TiDB集群部署之后通过scale-out部署,具体参考:使用 TiUP 扩容缩容 TiDB 集群

扩容yaml文件如下:

cdc_servers:
- host: 192.168.0.6
gc-ttl: 86400
deploy_dir: "tidb-deploy/cdc-8300"
log_dir: "tidb-deploy/cdc-8300/log"
data_dir: "tidb-data/cdc-8300"

两台服务器的主要配置信息如下:

服务器OSCPU & 内存磁盘网络带宽
移动云Ununtu 20.04 LTS4核 32GSSD 100G1M
腾讯云Ununtu 20.04 LTS4核 8GSSD 100G10M

功能开发

现有的业务系统采用前后端分离的方式开发,前端基于Vue+Element UI,后端基于Spring Boot,构建RESTful API提供给前端访问。在后端Controller中新增ticdc的控制器,主要代码如下:

package com.javaweb.admin.controller;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.javaweb.common.common.BaseController;
import com.javaweb.common.constant.CommonConstants;
import com.javaweb.common.utils.CommonUtils;
import com.javaweb.common.utils.DBUtils;
import com.javaweb.common.utils.HttpUtils;
import com.javaweb.common.utils.JsonResult;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.web.bind.annotation.*;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/ticdc")
public class TicdcController extends BaseController {
/**
* 获取同步任务列表
* @return
*/
@GetMapping("/changefeeds")
public JsonResult changefeeds() {

String result = HttpUtils.doGet(CommonConstants.QUERY_TASK_LIST_URL,null);
result = "{\"records\":"+result+",\"total\":1,\"size\":10,\"current\":1,\"orders\":[],\"searchCount\":true,\"pages\":1}";
return JsonResult.success(CommonUtils.strToJson(result));

}

/**
* 查询特定同步任务
* @param feed_id 同步任务id
* @return
*/
@GetMapping("/changefeedsinfo/{feed_id}")
public JsonResult changefeedsinfo(@PathVariable("feed_id") String feed_id) {

String url = CommonConstants.QUERY_TASK_INFO_URL;
url = url.replace("{0}",feed_id);
String result = HttpUtils.doGet(url,null);
//获取同步的表名
JSONObject obj = CommonUtils.strToJson(result);
String json = obj.getString("task_status");
JSONArray jsonArray = JSONArray.parseArray(json);
JSONObject jsonObject = jsonArray.getJSONObject(0);
JSONArray array = jsonObject.getJSONArray("table_ids");
String table_ids = array.toJSONString().replace("[","").replace("]","");

//根据table_id获取对应的表名
String sql = "select tidb_table_id,table_name from INFORMATION_SCHEMA.`TABLES` where tidb_table_id in ("+table_ids+")";
String temp = "";
try {
List list= DBUtils.convertList(DBUtils.GetResultSet(sql));
temp = JSON.toJSONString(list);

} catch (SQLException throwables) {
throwables.printStackTrace();
}
result = "{\"tables\":"+temp+",\"records\":"+result+",\"total\":1,\"size\":10,\"current\":1,\"orders\":[],\"searchCount\":true,\"pages\":1}";
return JsonResult.success(CommonUtils.strToJson(result));

}

/**
* 暂停同步任务
* @param feed_id 同步任务id
* @return
*/
@PostMapping("/pause/{feed_id}")
public JsonResult pause(@PathVariable("feed_id") String feed_id) {

String url = CommonConstants.PAUSE_TASK_URL;
url = url.replace("{0}",feed_id);
String result = HttpUtils.doPost(url,null);
JsonResult<T> obj = new JsonResult<>();
obj.setCode(result.equals("202")?202:200);
obj.setData(null);
obj.setMsg(result);
return JsonResult.success(obj);

}

/**
* 恢复同步任务
* @param feed_id 同步任务id
* @return
*/
@PostMapping("/resume/{feed_id}")
public JsonResult resume(@PathVariable("feed_id") String feed_id) {

String url = CommonConstants.RESUME_TASK_URL;
url = url.replace("{0}",feed_id);
String result = HttpUtils.doPost(url,null);
JsonResult<T> obj = new JsonResult<>();
obj.setCode(result.equals("202")?202:200);
obj.setData(null);
obj.setMsg(result);
return JsonResult.success(obj);

}

/**
* 创建同步任务
* @param feed_id 同步任务id
* @return
*/
@PostMapping("/create/{feed_id}")
public JsonResult create(@PathVariable("feed_id") String feed_id) {

String url = CommonConstants.CREATE_TASK_URL;
Map<String,Object> param = new HashMap<>();
param.put("changefeed_id",feed_id);
param.put("sink_uri","blackhole://");
param.put("ignore_ineligible_table",true);
url = url.replace("{0}",feed_id);
String result = HttpUtils.doPostJson(url,param);
JsonResult<T> obj = new JsonResult<>();
obj.setCode(result.equals("202")?202:200);
obj.setData(null);
obj.setMsg(result);
return JsonResult.success(obj);

}

/**
* 删除同步任务
* @param feed_id 同步任务id
* @return
*/
@DeleteMapping("/delete/{feed_id}")
public JsonResult delete(@PathVariable("feed_id") String feed_id) {

String url = CommonConstants.DELETE_TASK_URL;
url = url.replace("{0}",feed_id);
String result = HttpUtils.doDelete(url,null);
JsonResult<T> obj = new JsonResult<>();
obj.setCode(result.equals("202")?202:200);
obj.setData(null);
obj.setMsg(result);
return JsonResult.success(obj);

}

}

代码写的比较简单,出于简化考虑,没有进行Service分层设计。编译通过后启动,通过ApiPost测试一下新增的ticdc API接口,以调用获取同步任务列表为例,若服务端正常响应,输出信息如下图:

no-alt

好了,后台服务接口运行正常。接下来编写前端页面调用该接口进行数据展示。主要前端页面代码如下:

<template>
<div class="ele-body">
<el-card shadow="never">
<el-form
:model="form"
label-width="77px"
class="ele-form-search"
@keyup.enter.native="reload"
@submit.native.prevent>
<el-row :gutter="15">
<el-col :lg="6" :md="12">
<el-form-item label="FeedId:">
<el-input
clearable
v-model="form.feedid"
placeholder="请输入同步任务ID"/>
</el-form-item>
</el-col>
<el-col :lg="12" :md="12">
<div class="ele-form-actions">
<el-button
type="primary"
icon="el-icon-search"
class="ele-btn-icon"
@click="reload">查询同步列表
</el-button>
<el-button type="warning" @click="pause">暂停任务</el-button>
<el-button type="primary" @click="resume">恢复任务</el-button>
<el-button type="primary" @click="create">创建任务</el-button>
<el-button type="danger" @click="deljob">删除任务</el-button>
</div>
</el-col>
</el-row>
</el-form>
<!-- 数据表格 -->
<ele-data-table
ref="table"
:where="where"
:data="propTableData.col"
:choose.sync="selection"
height="calc(100vh - 315px)">
<el-table-column type="selection" width="45" align="center" fixed="left"/>
<el-table-column type="index" :index="index" label="编号" width="60" align="center" fixed="left" show-overflow-tooltip/>
<el-table-column prop="id" label="ID" :min-width="90" sortable="custom" show-overflow-tooltip />
<el-table-column prop="state" label="状态" :min-width="160" sortable="custom" show-overflow-tooltip />
<el-table-column prop="checkpoint_time" label="CheckPoint" :min-width="160" sortable="custom" show-overflow-tooltip />
<el-table-column prop="checkpoint_tso" label="TSO" :min-width="160" sortable="custom" show-overflow-tooltip />
<!-- 操作列 -->
<el-table-column label="操作" min-width="150px" align="center" :resizable="false" fixed="right">
<template slot-scope="{row}">
<el-link
type="primary"
:underline="false"
icon="el-icon-edit"
@click="openEdit(row)"
>查看任务
</el-link>
</template>
</el-table-column>
</ele-data-table>
</el-card>
<!-- 显示同步任务明细窗口 -->
<task
:visible.sync="showEdit"
:data="editData"
:tableList="tableData"
@done="reload"/>
</div>
</template>

<script>
import Task from './task';
export default {
name: 'TicdcChangefeeds',
components: {Task},
computed: {
},
data() {
return {
// 表格数据接口
url: '/ticdc/changefeeds',
// 表格列配置
columns: [
{
columnKey: 'selection',
type: 'selection',
width: 45,
align: 'center',
fixed: "left"
},
{
prop: 'id',
label: 'ID',
width: 60,
align: 'center',
showOverflowTooltip: true,
fixed: "left"
},
{
prop: 'state',
label: '状态',
align: 'center',
showOverflowTooltip: true,
minWidth: 110,
},
{
prop: 'checkpoint_time',
label: 'CheckPointTime',
align: 'center',
showOverflowTooltip: true,
minWidth: 150
},
{
prop: 'checkpoint_tso',
label: 'TSO',
align: 'center',
showOverflowTooltip: true,
minWidth: 100
},
{
columnKey: 'action',
label: '操作',
width: 130,
align: 'center',
resizable: false,
slot: 'action',
fixed: "right"
}
],
// 表格搜索条件
where: {},
form:{},
// 表格选中数据
selection: [],
// 当前编辑数据
current: null,
// 是否显示编辑弹窗
showEdit: false,
// 编辑回显数据
editData: null,
tableData: null,
// 同步任务数据
propTableData: {
sel: null, // 选中行
col: [] //同步任务列表
},
};
},
methods: {
/* 刷新表格 */
reload() {
this.$http.get('/ticdc/changefeeds').then(res => {
this.propTableData.col = res.data.data.records;
}).catch(e => {
this.$message.error(e.message);
});

},
/* 创建同步任务 */
create(){
this.$http.post('/ticdc/create/' + [this.form.feedid]).then(res => {
if (res.data.data.code === 202) {
this.$message.success(res.data.msg);
this.reload();
} else {
this.$message.error(res.data.msg);
}
}).catch(e => {
this.$message.error(e.message);
});


},
/* 暂停同步任务 */
pause(){
if (!this.selection.length) {
this.$message.error('请选择一个同步')
return;
}
let id = this.selection[0].id;
this.$http.post('/ticdc/pause/' + [id]).then(res => {
if (res.data.data.code === 202) {
this.$message.success(res.data.msg);
this.reload();
} else {
this.$message.error(res.data.msg);
}
}).catch(e => {
this.$message.error(e.message);
});


},
/* 恢复同步任务 */
resume(){
if (!this.selection.length) {
this.$message.error('请选择一个同步任务')
return;
}
let id = this.selection[0].id;
this.$http.post('/ticdc/resume/' + [id]).then(res => {
if (res.data.data.code === 202) {
this.$message.success(res.data.msg);
this.reload();
} else {
this.$message.error(res.data.msg);
}
}).catch(e => {
this.$message.error(e.message);
});

},
/* 删除任务 */
deljob(){
if (!this.selection.length) {
this.$message.error('请至少选择一条数据')
return;
}
let id = this.selection[0].id;
this.$http.delete('/ticdc/delete/' + [id]).then(res => {
if (res.data.data.code === 202) {
this.$message.success(res.data.msg);
this.reload();
} else {
this.$message.error(res.data.msg);
}
}).catch(e => {
this.$message.error(e.message);
});

},
reset() {
this.where = {};
this.reload();
},
/* 显示同步任务明细窗口 */
openEdit(row) {
this.current = row;
this.$http.get('/ticdc/changefeedsinfo/'+row.id).then(res => {
this.editData = res.data.data.records;
this.tableData = res.data.data.tables;
}).catch(e => {
this.$message.error(e.message);
});

this.showEdit = true;
}
}
}
</script>

同步任务详细信息页面代码如下:

<!-- 同步任务详细信息弹窗 -->
<template>
<el-dialog
:title="isUpdate ? '修改同步任务' : '同步任务详细信息'"
:visible="visible"
width="840px"
:destroy-on-close="true"
:lock-scroll="false"
@update:visible="updateVisible"
>
<el-form :model="form" ref="form" :rules="rules" label-width="120px">
<el-row :gutter="15">
<el-col :md="12" :sm="12">
<el-form-item label="id:" prop="id">
<el-input
:maxlength="20"
v-model="form.id"
placeholder=""
clearable
/>
</el-form-item>
</el-col>
<el-col :md="12" :sm="12">
<el-form-item label="sink_uri:" prop="sink_uri">
<el-input
:maxlength="20"
v-model="form.sink_uri"
placeholder=""
clearable
/>
</el-form-item>
</el-col>
</el-row>
<el-row :gutter="15">
<el-col :md="12" :sm="12">
<el-form-item label="create_time:" prop="create_time">
<el-input
v-model="form.create_time"
placeholder=""
class="ele-fluid ele-text-left"
/>
</el-form-item>
</el-col>
<el-col :md="12" :sm="12">
<el-form-item label="checkpoint_time:">
<el-input v-model="form.checkpoint_time" placeholder="" />
</el-form-item>
</el-col>
</el-row>
<el-row :gutter="15">
<el-col :md="12" :sm="12">
<el-form-item label="start_ts:" prop="start_ts">
<el-input
v-model="form.start_ts"
placeholder=""
class="ele-fluid ele-text-left"
/>
</el-form-item>
</el-col>
<el-col :md="12" :sm="12">
<el-form-item label="checkpoint_tso:">
<el-input v-model="form.checkpoint_tso" placeholder="" />
</el-form-item>
</el-col>
</el-row>
<el-row :gutter="15">
<el-col :md="12" :sm="12">
<el-form-item label="sort_engine:" prop="sort_engine">
<el-input
v-model="form.sort_engine"
placeholder=""
class="ele-fluid ele-text-left"
/>
</el-form-item>
</el-col>
<el-col :md="12" :sm="12">
<el-form-item label="state:">
<el-input v-model="form.state" placeholder="" />
</el-form-item>
</el-col>
</el-row>
</el-form>
<ele-data-table
ref="table"
:where="where"
:data="form.tables"
:choose.sync="selection"
height="calc(100vh - 515px)"
>
<el-table-column
type="selection"
width="45"
align="center"
fixed="left"
/>
<el-table-column
type="index"
:index="index"
label="编号"
width="60"
align="center"
fixed="left"
show-overflow-tooltip
/>
<el-table-column
prop="tidb_table_id"
label="同步表ID"
:min-width="60"
sortable="custom"
show-overflow-tooltip
/>
<el-table-column
prop="table_name"
label="同步表名称"
:min-width="120"
sortable="custom"
show-overflow-tooltip
/>
<!-- 操作列 -->
<el-table-column
label="操作"
min-width="60px"
align="center"
:resizable="false"
fixed="right"
>
<template slot-scope="{ row }">
<el-link
type="primary"
:underline="false"
icon="el-icon-edit"
@click="openEdit(row)"
v-if="false"
>查看任务
</el-link>
<el-popconfirm
class="ele-action"
title="确定要删除此会员吗?"
@confirm="remove(row)"
>
<el-link
type="danger"
slot="reference"
:underline="false"
icon="el-icon-delete"
>删除
</el-link>
</el-popconfirm>
</template>
</el-table-column>
</ele-data-table>

<div slot="footer">
<el-button @click="updateVisible(false)">取消 </el-button>
<el-button type="primary" @click="save" :loading="loading"
>保存
</el-button>
</div>
</el-dialog>
</template>

<script>
export default {
name: "Task",
props: {
// 弹窗是否打开
visible: Boolean,
// 修改回显的数据
data: Object,
tableList: Object,
},
data() {
return {
// 表单数据
form: Object.assign({}, this.data),
where: {},
// 表格选中数据
selection: [],
// 表单验证规则
rules: {
id: [{ required: true, message: "请输入同步id", trigger: "blur" }],
sink_uri: [
{ required: true, message: "请输入下游Uri", trigger: "blur" },
],
},
// 提交状态
loading: false,
// 是否是修改
isUpdate: false,
};
},
watch: {
data() {
if (this.data) {
this.form = Object.assign({}, this.data);
this.form.tables = this.tableList;
this.isUpdate = true;
} else {
this.form = {};
this.isUpdate = false;
}
},
},
methods: {
/* 更新visible */
updateVisible(value) {
this.$emit("update:visible", value);
},
},
};
</script>

功能测试

现在前端页面和后端服务都已经开发完成,通过Yarn启动前端后进入TiCDC管理页面。下面针对界面上的几个接口功能分别做测试:

  • 查询同步列表 点击查询同步按钮列表,显示目前TiCDC节点中配置的同步任务列表。界面刷新后显示同步任务列表如下:

    no-alt

点击查看任务显示同步任务详细信息窗口:

no-alt

  • 暂停/恢复/创建/删除任务的测试见如下动图:

    no-alt

好了,TiCDC Open API和业务系统的集成实践就到此结束了,希望能给感兴趣的同学提供些许参考。感谢你的阅读!

总结

  1. 参考TiCDC Open API官方文档后,在现有的开发框架中比较方便地集成了CDC的管理功能。感觉TiDB在后续的版本中会提供更多的开放接口,方便应用系统实现集成。
  2. 上述代码主要以测试和演示为目的,服务接口中的异常处理未实现。
  3. 根据官网文档中的提示,TiCDC OpenAPI 目前为实验功能,不建议在生产环境中使用该功能。希望在后续发布的LTS版本中得到完善,作为正式功能发布,实现应用系统在生产环境中的正式应用。