使用 Spark Connector 读取数据
StarRocks 提供 Apache Spark™ Connector (StarRocks Connector for Apache Spark™),支持通过 Spark 读取 StarRocks 中存储的数据。您可以使用 Spark 对读取到的数据进行复杂处理、机器学习等。
Spark Connector 支持三种数据读取方式:Spark SQL、Spark DataFrame 和 Spark RDD。
您可以使用 Spark SQL 在 StarRocks 表上创建临时视图,然后通过临时视图直接读取 StarRocks 表的数据。
您也可以将 StarRocks 表映射为 Spark DataFrame 或者 Spark RDD,然后从 Spark DataFrame 或者 Spark RDD 中读取数据。推荐使用 Spark DataFrame 来读取 StarRocks 中存储的数据。
使用说明
-
支持在 StarRocks 端完成数据过滤,从而减少数据传输量。
-
如果读取数据的开销比较大,可以通过合理的表设计和使用过滤条件,控制 Spark不要一次读取过多的数据,从而避免给磁盘和网络造成过大的 I/O 压力或影响正常的查询业务。
版本要求
Spark Connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.2 | 3.2, 3.3, 3.4, 3.5 | 2.5 及以上 | 8 | 2.12 |
1.1.1 | 3.2, 3.3, 3.4 | 2.5 及以上 | 8 | 2.12 |
1.1.0 | 3.2, 3.3, 3.4 | 2.5 及以上 | 8 | 2.12 |
1.0.0 | 3.x | 1.18 及以上 | 8 | 2.12 |
1.0.0 | 2.x | 1.18 及以上 | 8 | 2.11 |
注意
- 了解不同版本的 Spark connector 之间的行为变化,请查看升级 Spark connector。
- 自 1.1.1 版本起,Spark connector 不再提供 MySQL JDBC 驱动程序,您需要将驱动程序手动放到 Spark 的类路径中。您可以在 MySQL 官网或 Maven 中央仓库上找到该驱动程序。
- 1.0.0 版本只支持读取 StarRocks,从 1.1.0 版本开始同时支持读写 StarRocks。
- 1.0.0 版本和 1.1.0 版本在参数和类型映射上存在差别,请查看升级 Spark connector。
- 1.0.0 版本一般情况下不再增加新功能,条件允许请尽快升级 Spark Connector。
获取 Spark Connector
您可以通过以下方式获取 Spark Connector Jar 包:
- 直接下载已经编译好的 Jar 包。
- 通过 Maven 添加 Spark Connector 的依赖 (仅支持 1.1.0 及以上版本)。
- 通过源码手动编译。
Spark Connector 1.1.0 及以上版本
Spark Connector Jar 包的命名格式如下:
starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
比如,您想在 Spark 3.2 和 Scala 2.12 上使用 1.1.0 版本的 Spark Connector,可以选择 starrocks-spark-connector-3.2_2.12-1.1.0.jar
。
注意
一般情况下最新版本的 Spark Connector 只维护最近三个版本的 Spark。
直接下载
您可以在 Maven Central Repository 获取不同版本的 Spark Connector Jar 包。
添加 Maven 依赖
依赖配置的格式如下:
注意
需要将
spark_version
、scala_version
和connector_version
替换成对应的版本。
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId>
<version>${connector_version}</version>
</dependency>
比如,您想在 Spark 3.2 和 Scala 2.12 上使用 1.1.0 版本的 Spark Connector,可以添加如下依赖:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
手动编译
-
通过如下命令进行编译:
注意
需要将
spark_version
替换成相应的 Spark 版本。sh build.sh <spark_version>
比如,您想在 Spark 3.2 上使用 Spark Connector,可以通过如下命令进行编译:
sh build.sh 3.2
-
编译完成 后,进入
target/
路径查看。路径下会生成 Spark Connector Jar 包,比如starrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar
。注意
如果使用的是非正式发布的 Spark Connector 版本,生成的 Spark Connector Jar 包名称中会带有
SNAPSHOT
后缀。
Spark Connector 1.0.0
直接下载
手动编译
-
注意
需要切换到
spark-1.0
分支。 -
通过如下命令对 Spark Connector 进行编译:
-
如果 Spark 版本是 2.x,则执行如下命令,默认编译的是配套 Spark 2.3.4 的 Spark Connector:
sh build.sh 2
-
如果 Spark 版本是 3.x,则执行如下命令,默认编译的是配套 Spark 3.1.2 的 Spark Connector:
sh build.sh 3
-
-
编译完成后,进入
output/
路径查看。路径下会生成starrocks-spark2_2.11-1.0.0.jar
文件。将该文件拷贝至 Spark 的类文件路径 (Classpath) 下:- 如果您的 Spark 以
Local
模式运行,需要把该文件放在jars/
路径下。 - 如果您的 Spark 以
Yarn
模式运行,需要把该文件放在预安装程序包 (Pre-deployment Package) 里。
- 如果您的 Spark 以
把文件放置到指定位置后,才可以开始使用 Spark Connector 读取数据。
参数说明
本小节描述您在使用 Spark Connector 读取数据的过程中需要配置的参数。
通用参数
以下参数适用于 Spark SQL、Spark DataFrame、Spark RDD 三种读取方式。
参数名称 | 默认值 | 说明 |
---|---|---|
starrocks.fenodes | 无 | StarRocks 集群中 FE 的 HTTP 地址,格式为 <fe_host>:<fe_http_port> 。支持输入多个地址,使用逗号 (,) 分隔。 |
starrocks.table.identifier | 无 | StarRocks 表的名称,格式为 <database_name>.<table_name> 。 |
starrocks.request.retries | 3 | Spark Connector 向 StarRocks 发送一个读请求的重试次数。 |
starrocks.request.connect.timeout.ms | 30000 | 一个读请求的连接建立超时时间。 |
starrocks.request.read.timeout.ms | 30000 | 一个读请求读 取 StarRocks 数据超时时间。 |
starrocks.request.query.timeout.s | 3600 | 从 StarRocks 查询数据的超时时间。默认超时时间为 1 小时。 |
starrocks.request.tablet.size | Integer.MAX_VALUE | 一个 Spark RDD 分区对应的 StarRocks Tablet 的个数。参数设置越小,生成的分区越多,Spark 侧的并行度也就越大,但与此同时会给 StarRocks 侧造成更大的压力。 |
starrocks.batch.size | 4096 | 单次从 BE 读取的最大行数。调大参数取值可减少 Spark 与 StarRocks 之间建立连接的次数,从而减轻网络延迟所带来的的额外时间开销。对于StarRocks 2.2及以后版本最小支持的batch size为4096,如果配置小于该值,则按4096处理 |
starrocks.exec.mem.limit | 2147483648 | 单个查询的内存限制。单位:字节。默认内存限制为 2 GB。 |
starrocks.deserialize.arrow.async | false | 是否支持把 Arrow 格式异步转换为 Spark Connector 迭代所需的 RowBatch。 |
starrocks.deserialize.queue.size | 64 | 异步转换 Arrow 格式时内部处理队列的大小,当 starrocks.deserialize.arrow.async 为 true 时生效。 |
starrocks.filter.query | 无 | 指定过滤条件。多个过滤条件用 and 连接。StarRocks 根据指定的过滤条件完成对待读取数据的过滤。 |
starrocks.timezone | JVM 默认时区 | 自 1.1.1 版本起支持。StarRocks 的时区。用于将 StarRocks 的 DATETIME 类型的值转换为 Spark 的 TimestampType 类型的值。默认为 ZoneId#systemDefault() 返回的 JVM 时区。格式可以是时区名称,例如 Asia/Shanghai,或时区偏移,例如 +08:00。 |