跳到主要内容

TiSpark 3.0.0 新特性实践

作者:数据小黑,Senior Architect,TiDB Fans。

背景

TiSpark 3.0.0 于 6 月 15 号发布了,新的版本中提到了很多期望已久的功能,本文对几个新特性做了对比测试,验证新版本的特性是否符合线上要求。本文基础运行环境为 Spark On Kubernetes,Spark 镜像打包时,已包含 TiSpark 必要的依赖。

TiSpark 3.0.0 兼容性更改与新特性解析

下面的兼容性更改与新特性摘自官方:

兼容性更改

  • TiSpark without catalog plugin is no more supported. You must configure catalog configs and use tidb_catalog now
    • 此特性简言之就是在 3.0.0 版本中,不再支持非 catalog plugin 的配置,更改说明如下: 在 TiSpark 2.5.0 中,如下配置时是可以正常读取数据的:
      .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
      .set("spark.tispark.pd.addresses", pd_addr);
      数据读取:
      spark.sql("use sbtest");
      String source_sql = "select id,avg(k),max(c),count(`pad`) from sbtest_o so group by id order by id";
      spark.sql(source_sql).show();
      注意 use sbtest 中的 sbtest 是 tidb 的数据库。 在 TiSpark 3.0.0 中需按照如下配置:
      .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
      .set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
      .set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
      .set("spark.tispark.pd.addresses", pd_addr);
      数据读取:
      spark.sql("use tidb_catalog.sbtest");
      String source_sql = "select id,avg(k),max(c),count(`pad`) from sbtest_o so group by id order by id";
      spark.sql(source_sql).show();
    • 此案例中 use tidb_catalog.sbtest 中 tidb_catalog 是 spark.sql.catalog.tidb_catalog 配置中指定的,如果配置时使用.set("spark.sql.catalog.tidb_catalog2", "org.apache.spark.sql.catalyst.catalog.TiCatalog"),那么 use 时,需使用 use tidb_catalog2.sbtest
  • TiSpark's jar has a new naming rule like tispark-assembly-{$spark_version}_{$scala_version}-{$tispark_verison}
    • 此特性把 scala_version 版本号体现在版本命名中,版本包命名由 tispark-assembly-3.0-2.5.1.jar 变化成了 tispark-assembly-3.0_2.12-3.0.0.jar。

新功能

  • Support DELETE statement

    • 基于兼容性更改中 Datasource API 版本的替换,在 Spark 3.0.0 中可以支持 DELETE 特性,例如可以执行如下语句:spark.sql("delete from tidb_catalog.db.table where xxx")。
  • Support Spark 3.2

    • 支持 Spark 3.2 运行环境。
  • Support telemetry to collect information

    • 支持遥测信息收集
  • Support stale read to read historical versions of data

    • 支持过时读取特性( Stale Read ),使用此特性时需要在配置中指定毫秒级时间戳,指定时间戳后,数据读取时,程序按照指定时间戳读取一个 Snapshot,所有的 SELECT 语句都会从这个 Snapshot 中读取数据。如果每个 SQL 都需要读取不同的 Snapshot,需要在每个 SQL 之前配置不同的时间戳 (java 示例):

      val spark = SparkSession.builder.config(sparkConf).getOrCreate();

      spark.conf().set("spark.tispark.stale_read", 1651766410000L); //"2022-05-06 00:00:10"
      spark.sql("select * from test.t");

      spark.conf().set("spark.tispark.stale_read", 1651766420000L); //"2022-05-06 00:00:20"
      spark.sql("select * from test.t");

      spark.conf().set("spark.tispark.stale_read", "");
      spark.sql("select * from test.t");
  • Support TLS with reload capability

    • 支持 TLS 并具备动态更新证书的能力

特性评测

我们有个 TiDB 实验环境,是在 K8s 里面的,这个环境配套了一个 Spark on K8s 环境。这次测试是在 Spark on K8s + TiDB on K8s 中测试的。

非 catalog plugin 的配置运行情况

基于 Spark 3.0.3 on K8s 测试非 catalog plugin 的配置运行情况。

因为以前的项目都是基于 maven 的,本次已经在 maven 项目下进行测试,首先修改项目依赖:

<properties>
<tispark.spark.version>3.0_2.12</tispark.spark.version>
<tispark.version>3.0.0</tispark.version>
</properties>
<dependency>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-assembly-${tispark.spark.version}</artifactId>
<version>${tispark.version}</version>
</dependency>

本次依赖包的命名有变化,artifactId 由 tispark-assembly 改变成 tispark-assembly-3.0_2.12 也就是说对应每个版本的 Spark,artifactId 不同,类似下表所列:

spark 版本artifactid 版本
spark 3.0.Xtispark-assembly-3.0_2.12
spark 3.1.Xtispark-assembly-3.1_2.12
spark 3.2.Xtispark-assembly-3.2_2.12

其次隐藏掉关于 catalog 的两行配置,修改后如下所示:

SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
//.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
//.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
.set("spark.tispark.pd.addresses", pd_addr);

运行 Spark 程序,具体运行配置过程参考:TiSpark On Kubernetes实践 运行日志如下:

22/06/21 01:42:49 ERROR TiExtensions$: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.
com.pingcap.tikv.exception.TiInternalException: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.
at org.apache.spark.sql.TiExtensions$.validateCatalog(TiExtensions.scala:79)

代码中加了个检查,在没有配置 Catalog 的情况下,检查报错,提示 TiSpark must work with TiCatalog。

Spark 3.0.3 + stale read + delete 特性测试

实际运行时,建议最低使用 TiSpark 3.0.1 ,这个版本的已知问题比较少,各个 Spark 版本下也都测试通过。

Spark 3.2.1+TiSpark 3.0.1 stale read + delete 特性测试 测试构建代码如下:

String pd_addr = "basic-pd.tidb-cluster:2379";
String tidb_addr = "basic-tidb.tidb-cluster";

SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
.set("spark.tispark.pd.addresses", pd_addr);
SparkSession spark = SparkSession
.builder().appName("RdbToRdbProcess")
.config(conf)
.getOrCreate();
// 通过 TiSpark 将 DataFrame 批量写入 TiDB
Map<String, String> tiOptionMap = new HashMap<String, String>();
tiOptionMap.put("tidb.addr", tidb_addr);
tiOptionMap.put("tidb.port", "4000");
tiOptionMap.put("tidb.user", username);
tiOptionMap.put("tidb.password", password);
tiOptionMap.put("replace", "true");
tiOptionMap.put("spark.tispark.pd.addresses", pd_addr);

spark.sql("use tidb_catalog.sbtest2");
// 获取当前时间戳
long ttl=System.currentTimeMillis();

System.out.println("删除前查询");
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("删除");
spark.sql("delete from sbtest_t_t where id = 100").show();
System.out.println("删除后查询");
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("stale read");
spark.conf().set("spark.tispark.stale_read", ttl);
spark.sql("select * from sbtest_t_t where id = 100").show();
System.out.println("置空时间戳之后查询");
spark.conf().set("spark.tispark.stale_read", "");
spark.sql("select * from sbtest_t_t where id = 100").show();

上述查询结果依次:

删除前查询:
+---+------+--------------------+--------------------+
| id| k| c| pad|
+---+------+--------------------+--------------------+
|100|503013|72324218654-54342...|17648767791-53546...|
+---+------+--------------------+--------------------+
删除后查询:
+---+---+---+---+
| id| k| c|pad|
+---+---+---+---+
+---+---+---+---+
stale read:
+---+------+--------------------+--------------------+
| id| k| c| pad|
+---+------+--------------------+--------------------+
|100|503013|72324218654-54342...|17648767791-53546...|
+---+------+--------------------+--------------------+
置空时间戳之后查询:
+---+---+---+---+
| id| k| c|pad|
+---+---+---+---+
+---+---+---+---+

由以上查询结果可知,数据能够执行删除,删除后正常查询是查询不到的,使用 stale read,利用删除之前的时间戳能够查询到数据,置空时间戳后,恢复普通查询,数据能够查询到。

总结

本次版本中 stale read + delete 让 TiSpark 具有更灵活的应用场景,经过验证 Spark 3.2.1 跟 TiDB 6.1.0 on K8s 通讯还有些问题仍需解决,另外一方面,也盼望着能兼容分区表的一些操作能发布出来,比如说导入数据之前能够 truncate 分区一类的操作。