使用 Spark Connector 读取数据
StarRocks 提供 Apache Spark™ Connector (StarRocks Connector for Apache Spark™),支持通过 Spark 读取 StarRocks 中存储的数据。您可以使用 Spark 对读取到的数据进行复杂处理、机器学习等。
Spark Connector 支持三种数据读取方式:Spark SQL、Spark DataFrame 和 Spark RDD。
您可以使用 Spark SQL 在 StarRocks 表上创建临时视图,然后通过临时视图直接读取 StarRocks 表的数据。
您也可以将 StarRocks 表映射为 Spark DataFrame 或者 Spark RDD,然后从 Spark DataFrame 或者 Spark RDD 中读取数据。推荐使用 Spark DataFrame 来读取 StarRocks 中存储的数据。
使用说明
-
支持在 StarRocks 端完成数据过滤,从而减少数据传输量。
-
如果读取数据的开销比较大,可以通过合理的表设计和使用过滤条件,控制 Spark不要一次读取过多的数据, 从而避免给磁盘和网络造成过大的 I/O 压力或影响正常的查询业务。
版本要求
Spark Connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 及以上 | 8 | 2.12 |
1.1.1 | 3.2, 3.3, 3.4 | 2.5 及以上 | 8 | 2.12 |
1.1.0 | 3.2, 3.3, 3.4 | 2.5 及以上 | 8 | 2.12 |
1.0.0 | 3.x | 1.18 及以上 | 8 | 2.12 |
1.0.0 | 2.x | 1.18 及以上 | 8 | 2.11 |
注意
- 了解不同版本的 Spark connector 之间的行为变化,请查看升级 Spark connector。
- 自 1.1.1 版本起,Spark connector 不再提供 MySQL JDBC 驱动程序,您需要将驱动程序手动放到 Spark 的类路径中。您可以在 MySQL 官网或 Maven 中央仓库上找到该驱动程序。
- 1.0.0 版本只支持读取 StarRocks,从 1.1.0 版本开始同时支持读写 StarRocks。
- 1.0.0 版本和 1.1.0 版本在参数和类型映射上存在差别,请查看升级 Spark connector。
- 1.0.0 版本一般情况下不再增加新功能,条件允许请尽快升级 Spark Connector。
获取 Spark Connector
您可以通过以下方式获取 Spark Connector Jar 包:
- 直接下载已经编译好的 Jar 包。
- 通过 Maven 添加 Spark Connector 的依赖 (仅支持 1.1.0 及以上版本)。
- 通过源码手动编译。
Spark Connector 1.1.0 及以上版本
Spark Connector Jar 包的命名格式如下:
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
比如,您想在 Spark 3.2 和 Scala 2.12 上使用 1.1.0 版本的 Spark Connector,可以选择 starrocks-spark-connector-3.2_2.12-1.1.0.jar
。
注意
一般情况下最新版本的 Spark Connector 只维护最近三个版本的 Spark。
直接下载
您可以在 Maven Central Repository 获取不同版本的 Spark Connector Jar 包。
添加 Maven 依赖
依赖配置的格式如下:
注意
需要将
spark_version
、scala_version
和connector_version
替 换成对应的版本。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>
比如,您想在 Spark 3.2 和 Scala 2.12 上使用 1.1.0 版本的 Spark Connector,可以添加如下依赖:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
手动编译
-
通过如下命令进行编译:
注意
需要将
spark_version
替换成相应的 Spark 版本。sh build.sh <spark_version>
比如,您想在 Spark 3.2 上使用 Spark Connector,可以通过如下命令进行编译:
sh build.sh 3.2
-
编译完成后,进入
target/
路径查看。路径下会生成 Spark Connector Jar 包,比如starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar
。注意
如果使用的是非正式发布的 Spark Connector 版本,生成的 Spark Connector Jar 包名称中会带有
SNAPSHOT
后缀。
Spark Connector 1.0.0
直接下载
手动编译
-
注意
需要切换到
spark-1.0
分支。 -
通过如下命令对 Spark Connector 进行编译:
-
如果 Spark 版本是 2.x,则执行如下命令,默认编译的是配套 Spark 2.3.4 的 Spark Connector:
sh build.sh 2
-
如果 Spark 版本是 3.x,则执行如下命令,默认编译的是配套 Spark 3.1.2 的 Spark Connector:
sh build.sh 3
-
-
编译完成后,进入
output/
路径查看。路径下会生成starrocks-spark2_2.11-1.0.0.jar
文件。将该文件拷贝至 Spark 的类文件路径 (Classpath) 下:- 如果您的 Spark 以
Local
模式运行,需要把该文件放在jars/
路径下。 - 如果您的 Spark 以
Yarn
模式运行,需要把该文件放在预安装程序包 (Pre-deployment Package) 里。
- 如果您的 Spark 以
把文件放置到指定位置后,才可以开始使用 Spark Connector 读取数据。
参数说明
本小节描述您在使用 Spark Connector 读取数据的过程中需要配置的参数。
通用参数
以下参数适用于 Spark SQL、Spark DataFrame、Spark RDD 三种读取方式。
参数名称 | 默认值 | 说明 |
---|---|---|
starrocks.fenodes | 无 | StarRocks 集群中 FE 的 HTTP 地址,格式为 <fe_host>:<fe_http_port> 。支持输入多个地址,使用逗号 (,) 分隔。 |
starrocks.table.identifier | 无 | StarRocks 表的名称,格式为 <database_name>.<table_name> 。 |
starrocks.request.retries | 3 | Spark Connector 向 StarRocks 发送一个读请求的重试次数。 |
starrocks.request.connect.timeout.ms | 30000 | 一个读请求的连接建立超时时间。 |
starrocks.request.read.timeout.ms | 30000 | 一个读请求读取 StarRocks 数据超时时间。 |
starrocks.request.query.timeout.s | 3600 | 从 StarRocks 查询数据的超时时间。默认超时时间为 1 小时。 |
starrocks.request.tablet.size | Integer.MAX_VALUE | 一个 Spark RDD 分区对应的 StarRocks Tablet 的个数。参数设置越小,生成的分区越多,Spark 侧的并行度也就越大,但与此同时会 给 StarRocks 侧造成更大的压力。 |
starrocks.batch.size | 4096 | 单次从 BE 读取的最大行数。调大参数取值可减少 Spark 与 StarRocks 之间建立连接的次数,从而减轻网络延迟所带来的的额外时间开销。对于StarRocks 2.2及以后版本最小支持的batch size为4096,如果配置小于该值,则按4096处理 |
starrocks.exec.mem.limit | 2147483648 | 单个查询的内存限制。单位:字节。默认内存限制为 2 GB。 |
starrocks.deserialize.arrow.async | false | 是否支持把 Arrow 格式异步转换为 Spark Connector 迭代所需的 RowBatch。 |
starrocks.deserialize.queue.size | 64 | 异步转换 Arrow 格式时内部处理队列的大小,当 starrocks.deserialize.arrow.async 为 true 时生效。 |
starrocks.filter.query | 无 | 指定过滤条件。多个过滤条件用 and 连接。StarRocks 根据指定的过滤条件完成对待读取数据的过滤。 |
starrocks.timezone | JVM 默认时区 | 自 1.1.1 版本起支持。StarRocks 的时区。用于将 StarRocks 的 DATETIME 类型的值转换为 Spark 的 TimestampType 类型的值。默认为 ZoneId#systemDefault() 返回的 JVM 时区。格式可以是时区名称,例如 Asia/Shanghai,或时区偏移,例如 +08:00。 |
Spark SQL 和 Spark DataFrame 专有参数
以下参数仅适用于 Spark SQL 和 Spark DataFrame 读取方式。
参数名称 | 默认值 | 说明 |
---|---|---|
starrocks.fe.http.url | 无 | FE 的 HTTP 地址。从 Spark Connector 1.1.0 版本开始支持,与 starrocks.fenodes 等价,两者填一个即可。在 Spark Connector 1.1.0 及以后版本,推荐使用该参数,starrocks.fenodes 在后续版本可能会淘汰。 |
starrocks.fe.jdbc.url | 无 | FE 的 MySQL Server 连接地址。格式为 jdbc:mysql://<fe_host>:<fe_query_port> 。注意 在 Spark Connector 1.1.0 及以后版本,该参数必填。 |
user | 无 | StarRocks 集群账号的用户名。 |
starrocks.user | 无 | StarRocks 集群账号的用户名。从 Spark Connector 1.1.0 版本开始支持,与 user 等价,两者填一个即可。在 Spark Connector 1.1.0 及以后版本,推荐使用该参数,user 在后续版本可能会淘汰。 |
password | 无 | StarRocks 集群账号的用户密码。 |
starrocks.password | 无 | StarRocks 集群账号的用户密码。 从 Spark Connector 1.1.0 版本开始支持,与 password 等价,两者填一个即可。在 Spark Connector 1.1.0 及以后版本,推荐使用该参数,password 在后续版本可能会淘汰。 |
starrocks.filter.query.in.max.count | 100 | 谓词下推中,IN 表达式支持的取值数量上限。如果 IN 表达式中指定的取值数量超过该上限,则 IN 表达式中指定的条件过滤在 Spark 侧处理。 |
Spark RDD 专有参数
以下参数仅适用于 Spark RDD 读取方式。
参数名称 | 默认值 | 说明 |
---|---|---|
starrocks.request.auth.user | 无 | StarRocks 集群账号的用户名。 |
starrocks.request.auth.password | 无 | StarRocks 集群账号的用户密码。 |
starrocks.read.field | 无 | 指定从 StarRocks 表中读取哪些列的数据。多个列名之间使用逗号 (,) 分隔。 |
数据类型映射关系
Spark Connector 1.1.0 及以上版本
StarRocks 数据类型 | Spark 数据类型 |
---|---|
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
LARGEINT | DataTypes.StringType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
STRING | DataTypes.StringType |
DATE | DataTypes.DateType |
DATETIME | DataTypes.TimestampType |
JSON | DataTypes.StringType NOTE: 自 1.1.2 版本起支持该类型映射,并且 StarRocks 版本需要为 2.5.13、3.0.3、3.1.0 或更高版本。 |
ARRAY | Unsupported datatype |
HLL | Unsupported datatype |
BITMAP | Unsupported datatype |
Spark Connector 1.0.0 版本
StarRocks 数据类型 | Spark 数据类型 |
---|---|
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
LARGEINT | DataTypes.StringType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
DATE | DataTypes.StringType |
DATETIME | DataTypes.StringType |
ARRAY | Unsupported datatype |
HLL | Unsupported datatype |
BITMAP | Unsupported datatype |
Spark Connector 中,将 DATE 和 DATETIME 数据类型映射为 STRING 数据类型。因为 StarRocks 底层存储引擎处理逻辑,直接使用 DATE 和 DATETIME 数据类型时,覆盖的时间范围无法满足需求。所以,使用 STRING 数据类型直接返回对应的时间可读文本。
Spark Connector 升级
1.0.0 升级至 1.1.0
-
自 1.1.1 版本开始,Spark connector 不再提供 MySQL 官方 JDBC 驱动程序
mysql-connector-java
,因为该驱动程序使用 GPL 许可证,存在一些限制。然而,Spark连接器仍然需要 MySQL JDBC 驱动程序才能连接到 StarRocks 以获取表的元数据,因此您需要手动将驱动程序添加到 Spark 类路径中。您可以在 MySQL 官网 或 Maven 中央仓库上找到这个驱动程序。 -
1.1.0 版本需要通过 JDBC 访问 StarRocks 以获取更详细的表信息,因此必须配置
starrocks.fe.jdbc.url
。 -
1.1.0 版本调整了一些参数命名,目前同时保留了调整前后的参数,只需配置一个即可,但是推荐使用新的,旧的参数在之后的版本可能会淘汰:
starrocks.fenodes
调整为starrocks.fe.http.url
。user
调整为starrocks.user
。password
调整为starrocks.password
。
-
1.1.0 版本基于 Spark 3.x 调整了部分类型映射:
- StarRocks 的
DATE
映射为 Spark 的DataTypes.DateType
,原来是DataTypes.StringType
。 - StarRocks 的
DATETIME
映射为 Spark 的DataTypes.TimestampType
,原来是DataTypes.StringType
。
- StarRocks 的