StarRocks Spark Connector
使用 Spark connector 导入数据(推荐)
StarRocks 提供 Apache Spark™ 连接器 (StarRocks Connector for Apache Spark™),可以通过 Spark 导入数据至 StarRocks(推荐)。 基本原理是对数据攒批后,通过 Stream Load 批量导入StarRocks。Connector 导入数据基于Spark DataSource V2 实现, 可以通过 Spark DataFrame 或 Spark SQL 创建 DataSource,支持 Batch 和 Structured Streaming。
注意
使用 Spark connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。
版本要求
Connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 及以上 | 8 | 2.12 |
1.1.1 | 3.2, 3.3, 3.4 | 2.5 及以上 | 8 | 2.12 |
1.1.0 | 3.2, 3.3, 3.4 | 2.5 及以上 | 8 | 2.12 |
注意
- 了解不同版本的 Spark connector 之间的行为变化,请查看升级 Spark connector。
- 自 1.1.1 版本起,Spark connector 不再提供 MySQL JDBC 驱动程序,您需要将驱动程序手动放到 Spark 的类路径中。您可以在 MySQL 官网或 Maven 中央仓库上找到该驱动程序。
获取 Connector
您可以通过以下方式获取 connector jar 包
- 直接下载已经编译好的jar
- 通过 Maven 添加 connector 依赖
- 通过源码手动编译
connector jar包的命名格式如下
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
比如,想在 Spark 3.2 和 scala 2.12 上使用 1.1.0 版本的 connector,可以选择 starrocks-spark-connector-3.2_2.12-1.1.0.jar
。
注意
一般情况下最新版本的 connector 只维护最近3个版本的 Spark。
直接下载
可以在 Maven Central Repository 获取不同版本的 connector jar。
Maven 依赖
依赖配置的格式如下,需要将 spark_version
、scala_version
和 connector_version
替换成对应的版本。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>
比如,想在 Spark 3.2 和 scala 2.12 上使用 1.1.0 版本的 connector,可以添加如下依赖
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
手动编译
-
下载 Spark 连接器代码。
-
通过如下命令进行编译,需要将
spark_version
替换成相应的 Spark 版本sh build.sh <spark_version>
比如,在 Spark 3.2 上使用,命令如下
sh build.sh 3.2
-
编译完成后,
target/
目录下会生成 connector jar 包,比如starrocks-spark-connector-3.2_2.12-1.1-SNAPSHOT.jar
。
注意
非正式发布的connector版本会带有
SNAPSHOT
后缀。
参数说明
参数 | 是否必填 | 默认值 | 描述 |
---|---|---|---|
starrocks.fe.http.url | 是 | 无 | FE 的 HTTP 地址,支持输入多个FE地址,使用逗号 , 分隔。格式为 <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2> 。自版本 1.1.1 开始,您还可以在 URL 中添加 http:// 前缀,例如 http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2> 。 |
starrocks.fe.jdbc.url | 是 | 无 | FE 的 MySQL Server 连接地址。格式为 jdbc:mysql://<fe_host>:<fe_query_port> 。 |
starrocks.table.identifier | 是 | 无 | StarRocks 目标表的名称,格式为 <database_name>.<table_name> 。 |
starrocks.user | 是 | 无 | StarRocks 集群账号的用户名。使用 Spark connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。 |
starrocks.password | 是 | 无 | StarRocks 集群账号的用户密码。 |
starrocks.write.label.prefix | 否 | spark- | 指定Stream Load使用的label的前缀。 |
starrocks.write.enable.transaction-stream-load | 否 | true | 是否使用 Stream Load 事务接口导入数据。要求 StarRocks 版本为 v2.5 或更高。此功能可以在一次导入事务中导入更多数据,同时减少内存使用量,提高性能。 注意: 自 1.1.1 版本以来,只有当 starrocks.write.max.retries 的值为非正数时,此参数才会生效,因为 Stream Load 事务接口不支持重试。 |
starrocks.write.buffer.size | 否 | 104857600 | 积攒在内存中的数据量,达到该阈值后数据一次性发送给 StarRocks,支持带单位k , m , g 。增大该值能提高导入性能,但会带来写入延迟。 |
starrocks.write.buffer.rows | 否 | Integer.MAX_VALUE | 自 1.1.1 版本起支持。积攒在内存中的数据行数,达到该阈值后数据一次性发送给 StarRocks。 |
starrocks.write.flush.interval.ms | 否 | 300000 | 数据攒批发送的间隔,用于控制数据写入StarRocks的延迟。 |
starrocks.write.max.retries | 否 | 3 | 自 1.1.1 版本起支持。如果一批数据导入失败,Spark connector 导入该批数据的重试次数上线。 **注意:**由于 Stream Load 事务接口不支持重试。如果此参数为正数,则 Spark connector 始终使用 Stream Load 接口,并忽略 starrocks.write.enable.transaction-stream-load 的值。 |
starrocks.write.retry.interval.ms | 否 | 10000 | 自 1.1.1 版本起支持。如果一批数据导入失败,Spark connector 尝试再次导入该批数据的时间间隔。 |
starrocks.columns | 否 | 无 | 支持向 StarRocks 表中写入部分列,通过该参数指定列名,多个列名之间使用逗号 (,) 分隔,例如"c0,c1,c2"。 |
starrocks.write.properties.* | 否 | 无 | 指定 Stream Load 的参数,用于控制导入行为,例如使用 starrocks.write.properties.format 指定导入数据的格式为 CSV 或者 JSON。更多参数和说明,请参见 Stream Load。 |
starrocks.write.properties.format | 否 | CSV | 指定导入数据的格式,取值为 CSV 和 JSON。connector 会将每批数据转换 成相应的格式发送给 StarRocks。 |
starrocks.write.properties.row_delimiter | 否 | \n | 使用CSV格式导入时,用于指定行分隔符。 |
starrocks.write.properties.column_separator | 否 | \t | 使用CSV格式导入时,用于指定列分隔符。 |
starrocks.write.properties.partial_update | 否 | FALSE | 是否使用部分列更新。取值包括 TRUE 和 FALSE 。默认值:FALSE 。 |
starrocks.write.properties.partial_update_mode | 否 | row | 指定部分更新的模式,取值包括 row 和 column 。
|
starrocks.write.num.partitions | 否 | 无 | Spark用于并行写入的分区数,数据量小时可以通过减少分区数降低导入并发和频率,默认分区数由Spark决定。使用该功能可能会引入 Spark Shuffle cost。 |
starrocks.write.partition.columns | 否 | 无 | 用于Spark分区的列,只有指定 starrocks.write.num.partitions 后才有效,如果不指定则使用所有写入的列进行分区 |
starrocks.timezone | 否 | JVM 默认时区 | 自 1.1.1 版本起支持。StarRocks 的时区。用于将 Spark 的 TimestampType 类型的值转换为 StarRocks 的 DATETIME 类型的值。默认为 ZoneId#systemDefault() 返回的 JVM 时区。格式可以是时区名称,例如 Asia/Shanghai,或时区偏移,例如 +08:00。 |
数据类型映射
-
数据类型映射默认如下:
Spark 数据类型 StarRocks 数据类型 BooleanType BOOLEAN ByteType TINYINT ShortType SMALLINT IntegerType INT LongType BIGINT StringType LARGEINT FloatType FLOAT DoubleType DOUBLE DecimalType DECIMAL StringType CHAR StringType VARCHAR StringType STRING StringType JSON DateType DATE TimestampType DATETIME ArrayType ARRAY
说明:
自版本 1.1.1 开始支持。 详细步骤, 请参见 导入至 ARRAY 类型的列. -
您还可以自定义数据类型映射。
例如,一个 StarRocks 表包含了 BITMAP 和 HLL 类型的列,但 Spark 不支持这两种数据类型。则您需要在 Spark 中设置其支持的数据类型,并且自定义数据类型映射关系。详细步骤,参见导入至 BITMAP 和 HLL 类型的列。自版本 1.1.1 起支持导入至 BITMAP 和 HLL 类型的列。
升级 Spark connector
1.1.0 升级至 1.1.1
- 自 1.1.1 版本开始,Spark connector 不再提供 MySQL 官方 JDBC 驱动程序
mysql-connector-java
,因为该驱动程序使用 GPL 许可证,存在一些限制。然而,Spark连接器仍然需要 MySQL JDBC 驱动程序才能连接到 StarRocks 以获取表的元数据,因此您需要手动将驱动程序添加到 Spark 类路径中。您可以在 MySQL 官网 或 Maven 中央仓库上找到这个驱动程序。 - 自 1.1.1 版本开始,Spark connector 默认使用 Stream Load 接口,而不是 1.1.0 版本中的 Stream Load 事务接口。如果您仍然希望使用 Stream Load 事务接口,您可以将选项
starrocks.write.max.retries
设置为0
。详细信息,参见starrocks.write.enable.transaction-stream-load
和starrocks.write.max.retries
的说明。
使用示例
通过一个例子说明如何使用 connector 写入 StarRocks 表,包括使用 Spark DataFrame 和 Spark SQL,其中 DataFrame 包括 Batch 和 Structured Streaming 两种模式。
更多示例请参考 Spark Connector Examples,后续会补充更多例子。
准备工作
创建 StarRocks 表
创建数据库 test
,并在其中创建名为 score_board
的主键表。
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`)
;
Spark 环境
示例基于 Spark 3.2.4,使用 spark-shell
,pyspark
和 spark-sql
进行演示,运行前请将 connector jar放置在 $SPARK_HOME/jars
目录下。
网络配置
确保 Spark 所在机器能够访问 StarRocks 集群中 FE 节点的 http_port
(默认 8030
) 和 query_port
端口(默认 9030
),以及 BE 节点的 be_http_port
端口(默认 8040
)。