0
1
1
0
专栏/.../

TiSpark 3.0.0 新特性实践

 数据小黑  发表于  2022-06-21

背景

TiSpark 3.0.0 于 6 月 15 号发布了,新的版本中提到了很多期望已久的功能,本文对几个新特性做了对比测试,验证新版本的特性是否符合线上要求。本文基础运行环境为 Spark On kubernetes,Spark 镜像打包时,已包含 TiSpark 必要的依赖。

阅读收益

TiSpark 3.0.0 兼容性更改与新特性解析

下面的兼容性更改与新特性摘自官方:

兼容性更改

  • TiSpark without catalog plugin is no more supported. You must configure catalog configs and use tidb_catalog now
    • 此特性简言之就是在 3.0.0 版本中,不再支持非 catalog plugin 的配置,更改说明如下: 在 TiSpark 2.5.0 中,如下配置时是可以正常读取数据的:
      .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
      .set("spark.tispark.pd.addresses", pd_addr);
      
      数据读取:
      spark.sql("use sbtest");
      String source_sql = "select id,avg(k),max(c),count(`pad`) from sbtest_o so group by id order by id";
      spark.sql(source_sql).show();
      
      注意 use sbtest 中的 sbtest 是 tidb 的数据库。 在 TiSpark 3.0.0 中需按照如下配置:
      .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
      .set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
      .set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
      .set("spark.tispark.pd.addresses", pd_addr);
      
      数据读取:
      spark.sql("use tidb_catalog.sbtest");
      String source_sql = "select id,avg(k),max(c),count(`pad`) from sbtest_o so group by id order by id";
      spark.sql(source_sql).show();
      
    • 此案例中 use tidb_catalog.sbtest 中 tidb_catalog 是 spark.sql.catalog.tidb_catalog 配置中指定的,如果配置时使用.set("spark.sql.catalog.tidb_catalog2", "org.apache.spark.sql.catalyst.catalog.TiCatalog"),那么 use 时,需使用 use tidb_catalog2.sbtest
  • TiSpark's jar has a new naming rule like tispark-assembly-{$spark_version}_{$scala_version}-{$tispark_verison}
    • 此特性把 scala_version 版本号体现在版本命名中,版本包命名由 tispark-assembly-3.0-2.5.1.jar 变化成了 tispark-assembly-3.0_2.12-3.0.0.jar。

新功能

  • Support DELETE statement
    • 基于兼容性更改中 Datasource API 版本的替换,在 Spark 3.0.0 中可以支持 DELETE 特性,例如可以执行如下语句:spark.sql("delete from tidb_catalog.db.table where xxx")。
  • Support Spark 3.2
    • 支持 Spark 3.2 运行环境。
  • Support telemetry to collect information
    • 支持遥测信息收集
  • Support stale read to read historical versions of data
    • 支持过时读取特性( Stale Read ),使用此特性时需要在配置中指定毫秒级时间戳,指定时间戳后,数据读取时,程序按照指定时间戳读取一个 Snapshot,所有的 SELECT 语句都会从这个 Snapshot 中读取数据。如果每个 SQL 都需要读取不同的 Snapshot,需要在每个 SQL 之前配置不同的时间戳 (java 示例):
      val spark = SparkSession.builder.config(sparkConf).getOrCreate();
      
      spark.conf().set("spark.tispark.stale_read", 1651766410000L); //"2022-05-06 00:00:10"
      spark.sql("select * from test.t");
      
      spark.conf().set("spark.tispark.stale_read", 1651766420000L); //"2022-05-06 00:00:20"
      spark.sql("select * from test.t");
      
      spark.conf().set("spark.tispark.stale_read", "");
      spark.sql("select * from test.t");
      
  • Support TLS with reload capability
    • 支持 TLS 并具备动态更新证书的能力

特性评测

我们有个 tidb 实验环境,是在 k8s 里面的,这个环境配套了一个 spark on k8s 环境。这次测试是在 spark on k8s+tidb on k8s 中测试的。

非 catalog plugin 的配置运行情况

基于 spark 3.0.3 on k8s 测试非 catalog plugin 的配置运行情况。 因为以前的项目都是基于 maven 的,本次已经在 maven 项目下进行测试,首先修改项目依赖:

<properties>
    <tispark.spark.version>3.0_2.12</tispark.spark.version>
    <tispark.version>3.0.0</tispark.version>
</properties>
<dependency>
    <groupId>com.pingcap.tispark</groupId>
    <artifactId>tispark-assembly-${tispark.spark.version}</artifactId>
    <version>${tispark.version}</version>
</dependency>

本次依赖包的命名有变化,artifactId 由 tispark-assembly 改变成 tispark-assembly-3.0_2.12 也就是说对应每个版本的 spark,artifactId 不同,类似下表所列:

spark 版本 artifactid 版本
spark 3.0.X tispark-assembly-3.0_2.12
spark 3.1.X tispark-assembly-3.1_2.12
spark 3.2.X tispark-assembly-3.2_2.12

其次隐藏掉关于 catalog 的两行配置,修改后如下所示:

SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
        //.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
        //.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
        .set("spark.tispark.pd.addresses", pd_addr);

运行 spark 程序,具体运行配置过程参考:https://tidb.net/blog/30f417ad 运行日志如下:

22/06/21 01:42:49 ERROR TiExtensions$: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.
com.pingcap.tikv.exception.TiInternalException: TiSpark must work with TiCatalog. Please add TiCatalog in spark conf.
	at org.apache.spark.sql.TiExtensions$.validateCatalog(TiExtensions.scala:79)

代码中加了个检查,在没有配置 Catalog 的情况下,检查报错,提示 TiSpark must work with TiCatalog。

Spark 3.0.3 + stale read + delete 特性测试

本来测试的是 Spark 3.2.1+TiSpark 3.0.0,结果有报错,详见:https://asktug.com/t/topic/694197 更换成 Spark 3.0.3+TiSpark 3.0.0 stale read + delete 特性测试 测试构建代码如下:

String pd_addr = "basic-pd.tidb-cluster:2379";
String tidb_addr = "basic-tidb.tidb-cluster";

SparkConf conf = new SparkConf().set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
        .set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
        .set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
        .set("spark.tispark.pd.addresses", pd_addr);
SparkSession spark = SparkSession
        .builder().appName("RdbToRdbProcess")
        .config(conf)
        .getOrCreate();
// 通过 TiSpark 将 DataFrame 批量写入 TiDB
Map<String, String> tiOptionMap = new HashMap<String, String>();
tiOptionMap.put("tidb.addr", tidb_addr);
tiOptionMap.put("tidb.port", "4000");
tiOptionMap.put("tidb.user", username);
tiOptionMap.put("tidb.password", password);
tiOptionMap.put("replace", "true");
tiOptionMap.put("spark.tispark.pd.addresses", pd_addr);

spark.sql("use tidb_catalog.sbtest2");
// 获取当前时间戳
long ttl=System.currentTimeMillis();

System.out.println("删除前查询");
spark.sql("select * from sbtest_t_t  where id = 100").show();
System.out.println("删除");
spark.sql("delete from sbtest_t_t where id = 100").show();
System.out.println("删除后查询");
spark.sql("select * from sbtest_t_t  where id = 100").show();
System.out.println("stale read");
spark.conf().set("spark.tispark.stale_read", ttl);
spark.sql("select * from sbtest_t_t  where id = 100").show();
System.out.println("置空时间戳之后查询");
spark.conf().set("spark.tispark.stale_read", "");
spark.sql("select * from sbtest_t_t  where id = 100").show();

上述查询结果依次:

删除前查询:
+---+------+--------------------+--------------------+
| id|     k|                   c|                 pad|
+---+------+--------------------+--------------------+
|100|503013|72324218654-54342...|17648767791-53546...|
+---+------+--------------------+--------------------+
删除后查询:
+---+---+---+---+
| id|  k|  c|pad|
+---+---+---+---+
+---+---+---+---+
stale read:
+---+------+--------------------+--------------------+
| id|     k|                   c|                 pad|
+---+------+--------------------+--------------------+
|100|503013|72324218654-54342...|17648767791-53546...|
+---+------+--------------------+--------------------+
置空时间戳之后查询:
+---+---+---+---+
| id|  k|  c|pad|
+---+---+---+---+
+---+---+---+---+

由以上查询结果可知,数据能够执行删除,删除后正常查询是查询不到的,使用 stale read,利用删除之前的时间戳能够查询到数据,置空时间戳后,恢复普通查询,数据能够查询到。

总结

本次版本中 stale read + delete 让 tispark 具有更灵活的应用场景,经过验证 spark 3.2.1 跟 tidb 6.1.0 on k8s 通讯还有些问题仍需解决,另外一方面,也盼望着能兼容分区表的一些操作能发布出来,比如说导入数据之前能够 truncate 分区一类的操作。

0
1
1
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论