使用 Spark Load 批量导入数据
Spark Load 通过外部的 Spark 资源实现对导入数据的预处理,提高 StarRocks 大数据量的导入性能并且节省 StarRocks 集群的计算资源。主要用于 初次迁移、大数据量导入 StarRocks 的场景(数据量可到 TB 级别)。
本文介绍导入任务的操作流程(包括相关客户端配置、创建和查看任务等)、系统配置、最佳实践和常见问题。
注意
- Spark Load 操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权。
- Spark Load 不支持导入至主键表。
背景信息
在 StarRocks v2.4 及以前版本,Spark Load 需要借助 Broker 进程访问外部存储系统。配置执行 ETL 任务 的 Spark 集群时需要指定 Broker 组。Broker 是一个独立的无状态进程,封装了文件系统接口。通过 Broker 进程,StarRocks 能够访问和读取外部存储系统上的数据文件。 自 StarRocks v2.5 起,Spark Load 不再需要借助 Broker 进程即可访问外部存储系统。
说明
使用无 Broker 进程的方式导入在某些场景下会受限。如果您配置了多 HDFS 集群或多 Kerberos 用户时,暂时还不支持使用无 Broker 进程的方式导入。这种情况下,您必须继续通过 Broker 进程执行导入。
支持的数据格式
- CSV
- ORC(2.0 版本之后支持)
- PARQUET(2.0 版本之后支持)
基本原理
用户通过 MySQL 客户端提交 Spark 类型导入任务,FE记录元数据并返回用户提交成功。
Spark Load 任务的执行主要分为以下几个阶段:
- 用户向 FE 提交 Spark Load 任务;
- FE 调度提交 ETL 任务到 Spark 集群执行。
- Spark 集群执行 ETL 完成对导入数据的预处理。包括全局字典构建(BITMAP类型)、分区、排序、聚合等。预处理后的数据落盘 HDFS。
- ETL 任务完成后,FE 获取预处理过的每个分片的数据路径,并调度相关的 BE 执行 Push 任务。
- BE 通过 Broker 进程读取 HDFS 数据,转化为 StarRocks 存储格式。
说明
如果选择不使用 Broker 进程,则 BE 直接读取 HDFS 数据。
- FE 调度生效版本,完成导入任务。
下图展示了 Spark Load 的主要流程:
基本操作
使用 Spark Load导入数据,需要按照 创建资源 -> 配置 Spark 客户端 -> 配置 YARN 客户端 -> 创建 Spark Load 导入任务
流程执行,具体的各个部分介绍请参考下面描述。
配置 ETL 集群
Spark 作为一种外部计算资源在 StarRocks 中用来完成 ETL 工作,因此我们引入 Resource Management 来管理 StarRocks 使用的外部资源。
提交 Spark 导入任务之前,需要配置执行 ETL 任务的 Spark 集群。操作语法:
创建资源
示例:
-- yarn cluster 模式
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
"spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
"working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
-- yarn HA cluster 模式
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
"spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
"spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
"working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
"broker" = "broker1"
);
-- HDFS HA cluster 模式
CREATE EXTERNAL RESOURCE "spark2"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
"spark.hadoop.fs.defaultFS" = "hdfs://myha",
"spark.hadoop.dfs.nameservices" = "myha",
"spark.hadoop.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
"spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
"spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://myha/tmp/starrocks",
"broker" = "broker2",
"broker.dfs.nameservices" = "myha",
"broker.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
"broker.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
"broker.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
spark0
、spark1
和 spark2
为 StarRocks 中配置的 Spark 资源的名字。
PROPERTIES 是 Spark 资源相关参数,以下对重要参数进行说明:
Spark 资源的全部参数和说明,请参见 CREATE RESOURCE。
- Spark 集群相关参数
type
:必填,资源类型,取值为spark
。spark.master
: 必填,Spark 的 cluster manager。当前仅支持 YARN,所以取值为yarn
。spark.submit.deployMode
: 必填,Spark driver 的部署模式。取值包括cluster
和client
。关于取值说明,参考 Launching Spark on YARN。spark.hadoop.fs.defaultFS
: 必填,HDFS 中 NameNode 的地址。格式为hdfs://namenode_host:port
。- YARN ResourceManager 相关参数。
- 如果 Spark 为单点 ResourceManager,则需要配置
spark.hadoop.yarn.resourcemanager.address
,表示单点 ResourceManager 地址。 - 如果 Spark 为 ResourceManager HA,则需要配置(其中 hostname 和 address 任选一个配置):
spark.hadoop.yarn.resourcemanager.ha.enabled
: ResourceManager 启用 HA,设置为true
。spark.hadoop.yarn.resourcemanager.ha.rm-ids
: ResourceManager 逻辑 ID 列表。spark.hadoop.yarn.resourcemanager.hostname.rm-id
: 对于每个 rm-id,指定 ResourceManager 对应的主机名。spark.hadoop.yarn.resourcemanager.address.rm-id
: 对于每个 rm-id,指定host:port
以供客户端提交作业。
- 如果 Spark 为单点 ResourceManager,则需要配置
- Broker 相关参数
broker
: Broker 组的名称。需要使用ALTER SYSTEM ADD BROKER
命令提前完成配置。broker.property_key
: Broker 读取 ETL 生成的中间文件时需要指定的认证信息等,详细可参考 BROKER LOAD。
- 其他参数
working_dir
: 必填,一个 HDFS 文件路径,用于存放 ETL 作业生成的文件。例如hdfs://host: port/tmp/starrocks
。
注意
以上为通过 Broker 进程执行导入时的参数说明,如果使用无 Broker 进程的方式导入,则需要注意如下事项:
-
无需传入
broker
。 -
如果您需要配置用户身份认证、NameNode 节点的 HA,则需要在 HDFS 集群中的 hdfs-site.xml 文件中配置参数,具体参数和说明,请参见 broker_properties。并且将 hdfs-site.xml 文件放到每一个 FE 的 $FE_HOME/conf 下以及每个 BE 的 $BE_HOME/conf 下。
说明
如果 HDFS 文件只能由特定用户访问,则您仍然需要传入 HDFS 用户名
broker.name
和 HDFS 用户密码broker.password
。