使用 Spark connector 导入数据(推荐)
StarRocks 提供 Apache Spark™ 连接器 (StarRocks Connector for Apache Spark™),可以通过 Spark 导入数据至 StarRocks(推荐)。 基本原理是对数据攒批后,通过 Stream Load 批量导入StarRocks。Connector 导入数据基于Spark DataSource V2 实现, 可以通过 Spark DataFrame 或 Spark SQL 创建 DataSource,支持 Batch 和 Structured Streaming。
注意
使用 Spark connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。
版本要求
Connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 及以上 | 8 | 2.12 |
1.1.1 | 3.2, 3.3, 3.4 | 2.5 及以上 | 8 | 2.12 |
1.1.0 | 3.2, 3.3, 3.4 | 2.5 及以上 | 8 | 2.12 |
注意
- 了解不同版本的 Spark connector 之间的行为变化,请查看升级 Spark connector。
- 自 1.1.1 版本起,Spark connector 不再提供 MySQL JDBC 驱动程序,您需要将驱动程序手动放到 Spark 的类路径中。您可以在 MySQL 官网或 Maven 中央仓库上找到该驱动程序。
获取 Connector
您可以通过以下方式获取 connector jar 包
- 直接下载已经编译好的jar
- 通过 Maven 添加 connector 依赖
- 通过源码手动编译
connector jar包的命名格式如下
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
比如,想在 Spark 3.2 和 scala 2.12 上使用 1.1.0 版本的 connector,可以选择 starrocks-spark-connector-3.2_2.12-1.1.0.jar
。
注意
一般情况下最新版本的 connector 只维护最近3个版本的 Spark。
直接下载
可以在 Maven Central Repository 获取不同版本的 connector jar。
Maven 依赖
依赖配置的格式如下,需要将 spark_version
、scala_version
和 connector_version
替换成对应的版本。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>
比如,想在 Spark 3.2 和 scala 2.12 上使用 1.1.0 版本的 connector,可以添加如下依赖
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
手动编译
-
下载 Spark 连接器代码。
-
通过如下命令进行编译,需要将
spark_version
替换成相应的 Spark 版本sh build.sh <spark_version>
比如,在 Spark 3.2 上使用,命令如下
sh build.sh 3.2
-
编译完成后,
target/
目录下会生成 connector jar 包,比如starrocks-spark-connector-3.2_2.12-1.1-SNAPSHOT.jar
。
注意
非正式发布的connector版本会带有
SNAPSHOT
后缀。
参数说明
参数 | 是否必填 | 默认值 | 描述 |
---|---|---|---|
starrocks.fe.http.url | 是 | 无 | FE 的 HTTP 地址,支持输入多个FE地址,使用逗号 , 分隔。格式为 <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2> 。自版本 1.1.1 开始,您还可以在 URL 中添加 http:// 前缀,例如 http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2> 。 |
starrocks.fe.jdbc.url | 是 | 无 | FE 的 MySQL Server 连接地址。格式为 jdbc:mysql://<fe_host>:<fe_query_port> 。 |
starrocks.table.identifier | 是 | 无 | StarRocks 目标表的名称,格式为 <database_name>.<table_name> 。 |
starrocks.user | 是 | 无 | StarRocks 集群账号的用户名。使用 Spark connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。 |
starrocks.password | 是 | 无 | StarRocks 集群账号的用户密码。 |
starrocks.write.label.prefix | 否 | spark- | 指定Stream Load使用的label的前缀。 |
starrocks.write.enable.transaction-stream-load | 否 | true | 是否使用 Stream Load 事务接口导入数据。要求 StarRocks 版本为 v2.5 或更高。此功能可以在一次导入事务中导入更多数据,同时减少内存使用量,提高性能。 注意: 自 1.1.1 版本以来,只有当 starrocks.write.max.retries 的值为非正数时,此参数才会生效,因为 Stream Load 事务接口不支持重试。 |
starrocks.write.buffer.size | 否 | 104857600 | 积攒在内存中的数据量,达到该阈值后数据一次性发送给 StarRocks,支持带单位k , m , g 。增大该值能提高导入性能,但会带来写入延迟。 |
starrocks.write.buffer.rows | 否 | Integer.MAX_VALUE | 自 1.1.1 版本起支持。积攒在内存中的数据行数,达到该阈值后数据一次性发送给 StarRocks。 |
starrocks.write.flush.interval.ms | 否 | 300000 | 数据攒批发送的间隔,用于控制数据写入StarRocks的延迟。 |
starrocks.write.max.retries | 否 | 3 | 自 1.1.1 版本起支持。如果一批数据导入失败,Spark connector 导入该批数据的重试次数上线。 **注意:**由于 Stream Load 事务接口不支持重试。如果此参数为正数,则 Spark connector 始终使用 Stream Load 接口,并忽略 starrocks.write.enable.transaction-stream-load 的值。 |
starrocks.write.retry.interval.ms | 否 | 10000 | 自 1.1.1 版本起支持。如果一批数据导入失败,Spark connector 尝试再次导入该批数据的时间间隔。 |
starrocks.columns | 否 | 无 | 支持向 StarRocks 表中写入部分列,通过该参数指定列名,多个列名之间使用逗号 (,) 分隔,例如"c0,c1,c2"。 |
starrocks.write.properties.* | 否 | 无 |