EMR with TiSpark(on EKS )
作者:王歌
背景描述
现有集群部署在 EKS 上,使用 TiDB Operator 部署的 TiDB 集群
使用 spark 主要想实现以下功能:
ETL(批处理数据,从 TiDB 读取数据进行加工,然后再写入到 TiDB )
加速 AP 查询
客户倾向于使用托管的 spark,在 AWS 上 Spark 有 3 种部署形式:emr serverless,EMR on EC2,EMR on EKS,考虑到 TiSpark 需要和 PD,TiKV 进行交互,使用 EMR on EKS 默认网络是连通的,以下的方案是基于 EMR on EKS 展开。
方案简介
在 EKS 上,已存在 TiDB Operator 部署的 TiDB 集群
启动 EMR on EKS 的集群访问并通过 EMR 注册 EKS 集群
自定义 docker 镜像
配置 spark pod 并启动任务
操作步骤
现有 TiDB 集群部署在 EKS 上
基于 EKS 部署 EMR
启用 Amazon EMR on EKS 的集群访问
eksctl create iamidentitymapping --cluster wg1 --namespace tidb-cluster --service-name "emr-containers"
namespace=tidb-cluster
cat - <<EOF | kubectl apply -f - --namespace "${namespace}"
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: emr-containers
namespace: ${namespace}
rules:
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get"]
- apiGroups: [""]
resources: ["serviceaccounts", "services", "configmaps", "events", "pods", "pods/log"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["create", "patch", "delete", "watch"]
- apiGroups: ["apps"]
resources: ["statefulsets", "deployments"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
- apiGroups: ["extensions"]
resources: ["ingresses"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["roles", "rolebindings"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"]
EOF
namespace=tidb-cluster
cat - <<EOF | kubectl apply -f - --namespace "${namespace}"
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: emr-containers
namespace: ${namespace}
subjects:
- kind: User
name: emr-containers
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: Role
name: emr-containers
apiGroup: rbac.authorization.k8s.io
EOF
eksctl create iamidentitymapping --cluster wg1 --arn "arn:aws:iam::378955295993:role/AWSServiceRoleForAmazonEMRContainers" --username emr-containers
在 EKS 集群上为服务账户(IRSA)启用 IAM 角色
aws eks describe-cluster --name wg1 --query "cluster.identity.oidc.issuer" --output text https://oidc.eks.ap-northeast-1.amazonaws.com/id/965000E562F657CEFEBB5E681CB5A46F
eksctl utils associate-iam-oidc-provider --cluster wg1 --approve
create iam role for job execution
cat <<EoF > emr-trust-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "elasticmapreduce.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EoF
aws iam create-role --role-name EMRContainers-JobExecutionRole --assume-role-policy-document file://emr-trust-policy.json
update relationship for job execution role
aws emr-containers update-role-trust-policy --cluster-name wg1 --namespace tidb-cluster --role-name EMRContainers-JobExecutionRole
通过 Amazon EMR 注册 Amazon EKS 集群
aws emr-containers create-virtual-cluster --name emr1009 --container-provider '{ "id": "wg1", "type": "EKS", "info": { "eksInfo": { "namespace": "tidb-cluster" } } }'
查看 emr
aws emr-containers list-virtual-clusters
"virtualClusters": [
{
"id": "afqacgyj4oo41apmbhsk2plpw",
"name": "emr1009",
"arn": "arn:aws:emr-containers:ap-northeast-1:378955295993:/virtualclusters/afqacgyj4oo41apmbhsk2plpw",
"state": "RUNNING",
"containerProvider": {
"type": "EKS",
"id": "wg1",
"info": {
"eksInfo": {
"namespace": "tidb-cluster"
}
}
},
"createdAt": "2022-10-09T03:19:28+00:00",
"tags": {}
},
删除 emr
aws emr-containers delete-virtual-cluster --id e5uoso9wwz5v1nilwe3yu92f7
demo
export VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --query "virtualClusters[?state=='RUNNING'].id" --output text)
export EMR_ROLE_ARN=$(aws iam get-role --role-name EMRContainers-JobExecutionRole --query Role.Arn --output text)
aws emr-containers start-job-run --virtual-cluster-id=$VIRTUAL_CLUSTER_ID --name=pi-2 --execution-role-arn=$EMR_ROLE_ARN --release-label=emr-6.2.0-latest --job-driver='{
"sparkSubmitJobDriver": {
"entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=1 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
}
}'
运行 demo 之后,会启动 pod
kubectl get pod -ntidb-cluster NAME READY STATUS RESTARTS AGE 000000030s8cnaq04ql-tvlv6 2/2 Running 0 8m48s basic-discovery-84b7dd85dd-k5ljz 1/1 Running 0 45m basic-monitor-0 4/4 Running 0 45m basic-pd-0 1/1 Running 0 45m basic-pd-1 1/1 Running 0 45m basic-pd-2 1/1 Running 0 45m basic-tidb-0 2/2 Running 0 43m basic-tikv-0 1/1 Running 0 44m basic-tikv-1 1/1 Running 0 44m basic-tikv-2 1/1 Running 0 44m spark-000000030s8cnaq04ql-driver 0/2 Pending 0 8m11s
运行 demo 之后,会自动创建 EMR 运行所需的 SA,如下:
tidb-cluster emr-containers-sa-spark-client-378955295993-189nnyj7mn9w2lqiewgg1u0l3jhmo0z69yjkj9u6qhosj8l 1 7s
tidb-cluster emr-containers-sa-spark-driver-378955295993-189nnyj7mn9w2lqiewgg1u0l3jhmo0z69yjkj9u6qhosj8l 1 6s
tidb-cluster emr-containers-sa-spark-executor-378955295993-189nnyj7mn9w2lqiewgg1u0l3jhmo0z69yjkj9u6qhosj8l 1 6s
需要为 emr-containers-sa-spark-driver 加上以下额外权限:
cat > spark-driver-access.yaml <<EOF
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
namespace: tidb-cluster
name: spark-driver-reader
rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "watch", "list", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "watch", "list", "delete"]
EOF
kubectl apply -f spark-driver-access.yaml
kubectl get sa -n tidb-cluster
kubectl create clusterrolebinding tispark-access \
--clusterrole=spark-driver-reader \
--serviceaccount=tidb-cluster:emr-containers-sa-spark-driver-XXXX
自定义 docker 镜像
Dockerfile 需要将 tispark 和 mysql-connector 的 jar 包放入到 spark 的 jars 目录下,参考:
注意 TiSpark 的版本需要和 spark 匹配,否则 job 会报错。(emr-6.7 对应的 spark 版本是 3.2.1-amzn-0)
cat > Dockerfile <<EOF
FROM 059004520145.dkr.ecr.ap-northeast-1.amazonaws.com/spark/emr-6.7.0:latest
USER root
### Add customization commands here ####
COPY tispark-assembly-3.2_2.12-3.1.1.jar /usr/lib/spark/jars/
COPY mysql-connector-java-8.0.27.jar /usr/lib/spark/jars/
USER hadoop:hadoop
EOF
配置 spark job
创建节点组,并打上标签 dedicated: emr
cat newtidb.yaml
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
name: wg1
region: ap-northeast-1
availabilityZones: ['ap-northeast-1a','ap-northeast-1d']
nodeGroups:
- name: emr
instanceType: m5.xlarge
desiredCapacity: 3
privateNetworking: true
availabilityZones: ["ap-northeast-1a"]
labels:
dedicated: emr
taints:
dedicated: emr:NoSchedule
eksctl create nodegroup -f newtidb.yaml
Spark pod 模板
将以下示例 pod 模板和 python 脚本上传到 s3 存储桶。
cat > spark_executor_nyc_taxi_template.yml <<EOF
apiVersion: v1
kind: Pod
spec:
volumes:
- name: source-data-volume
emptyDir: {}
- name: metrics-files-volume
emptyDir: {}
nodeSelector:
dedicated: emr
tolerations:
- effect: NoSchedule
key: dedicated
operator: Equal
value: emr
containers:
- name: spark-kubernetes-executor # This will be interpreted as Spark executor container
EOF
cat > spark_driver_nyc_taxi_template.yml <<EOF
apiVersion: v1
kind: Pod
spec:
volumes:
- name: source-data-volume
emptyDir: {}
- name: metrics-files-volume
emptyDir: {}
nodeSelector:
dedicated: emr
tolerations:
- effect: NoSchedule
key: dedicated
operator: Equal
value: emr
containers:
- name: spark-kubernetes-driver # This will be interpreted as Spark driver container
EOF
以下是 spark+jdbc 的方式读取 TiDB
from __future__ import print_function
import sys
from time import sleep
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("test")\
.getOrCreate()
# jdbcUrl = "jdbc:mysql://a0d9e6340ceb14267addbc87ce05e057-f6e62300101bf1da.elb.ap-northeast-1.amazonaws.com:4000/test"
# jdbcDriver = "com.mysql.jdbc.Driver"
# jdbcDF = spark.read.format("jdbc").option("url", jdbcUrl).option("dbtable", "t1").option("user", "root").option("password", "").option("database","test").option("driver", jdbcDriver).load()
# jdbcDF.show()
# print("end")
prop = {'user': 'root',
'password': '',
'driver': 'com.mysql.jdbc.Driver'}
# database 地址(需要修改)
url = 'jdbc:mysql://a0d9e6340ceb14267addbc87ce05e057-f6e62300101bf1da.elb.ap-northeast-1.amazonaws.com:4000/test'
# 读取表
data = spark.read.jdbc(url=url, table='t1', properties=prop)
# 打印data数据类型
print(type(data))
# 展示数据
data.show()
spark.stop()
以下是 TiSpark 读取 TiKV 并将数据写入到 TiDB 中
from __future__ import print_function
import sys
from time import sleep
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("ti1")\
.getOrCreate()
spark.sql("use tidb_catalog")
df1 = spark.sql("select * from test.t1")
df1.write.format("tidb").option("tidb.addr", "a0d9e6340ceb14267addbc87ce05e057-f6e62300101bf1da.elb.ap-northeast-1.amazonaws.com").option("tidb.password","").option("tidb.port","4000").option("tidb.user","root").option("database","test").option("table","t2").mode("append").save()
spark.stop()
创建 spark job
aws emr-containers start-job-run --cli-input-json file://request-nytaxi.json
cat > request-nytaxi.json <<EOF
{
"name": "nytaxi",
"virtualClusterId": "${VIRTUAL_CLUSTER_ID}",
"executionRoleArn": "${EMR_ROLE_ARN}",
"releaseLabel": "emr-6.7.0-latest",
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "${s3DemoBucket}/nytaxi.py",
"sparkSubmitParameters": "--conf spark.kubernetes.driver.podTemplateFile=${s3DemoBucket}/pod_templates/spark_driver_nyc_taxi_template.yml \
--conf spark.kubernetes.executor.podTemplateFile=${s3DemoBucket}/pod_templates/spark_executor_nyc_taxi_template.yml \
--conf spark.executor.instances=3 \
--conf spark.executor.memory=2G \
--conf spark.executor.cores=2 \
--conf spark.driver.cores=1"
}
},
"configurationOverrides": {
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.kubernetes.container.image": "自定义镜像的地址",
"spark.dynamicAllocation.enabled": "false",
"spark.kubernetes.executor.deleteOnTermination": "true",
"spark.tispark.pd.addresses": "pd-ip:port",
"spark.sql.extensions": "org.apache.spark.sql.TiExtensions",
"spark.sql.catalog.tidb_catalog": "org.apache.spark.sql.catalyst.catalog.TiCatalog",
"spark.sql.catalog.tidb_catalog.pd.addresses": "pd-ip:port"
}
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-on-eks/eksworkshop-eksctl",
"logStreamNamePrefix": "nytaxi"
},
"s3MonitoringConfiguration": {
"logUri": "${s3DemoBucket}/"
}
}
}
}
EOF