使用 Flink Connector 读取数据
StarRocks 提供自研的 Apache Flink® Connector (StarRocks Connector for Apache Flink®),支持通过 Flink 批量读取某个 StarRocks 集群中的数据。
Flink Connector 支持两种数据读取方式:Flink SQL 和 Flink DataStream。推荐使用 Flink SQL。
说明
Flink Connector 还支持将 Flink 读取到的数据写入另外一个 StarRocks 集群或其他存储系统上。参见从 Apache Flink 持续导入。
功能简介
相较于 Flink 官方提供的 Flink JDBC Connector (简称 JDBC Connector),StarRocks 自研的 Flink Connector 具备从 StarRocks 集群中各 BE 节点并行读取数据的能力,大大提高了数据读取效率。以下是两种 Connector 的实现方案对比:
-
Flink Connector
Flink 先从 FE 节点获取查询计划 (Query Plan),然后将获取到的查询计划作为参数,下发至 BE 节点,最后获取 BE 节点返回的数据。
-
Flink JDBC Connector
Flink JDBC Connector 仅能从 FE 单点上串行读取数据,数据读取效率较低。
版本要求
Connector | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 及以上 | 8 | 2.11,2.12 |
前提条件
已部署 Flink。若您尚未部署 Flink,请参照如下步骤完成部署:
-
在操作系统中安装 Java 8 或者 Java 11,以正常运行 Flink。您可以通过以下命令来检查已经安装的 Java 版本:
java -version
例如,命令回显如下,则说明已经安装 Java 8:
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode) -
下载并解压 Flink。
说明
推荐使用 1.14 及以上版本,最低支持 1.11 版本。
# 下载 Flink
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# 解压 Flink
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# 进入 Flink 目录
cd flink-1.14.5 -
启动 Flink 集群。
# 启动 Flink 集群
./bin/start-cluster.sh
# 返回如下信息,表示成功启动 flink 集群
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
您也可以参考 Flink 官方文档 完成部署。
准备工作
部署 Flink Connector
通过如下步骤完成 Flink Connector 的部署:
-
根据 Flink 的版本,选择和下载对应版本的 flink-connector-starrocks JAR 包。如需调试代码,可选择对应分支代码自行编译。
注意
推荐您下载 Flink Connector 版本在 1.2.x 及以上、并且配套的 Flink 版本与您的业务环境中安装的 Flink 版本前两位一致的 JAR 包。例如,如果您的业务环境中安装的 Flink 版本为 1.14.x,可以下载
flink-connector-starrocks-1.2.4_flink-1.14_x.yy.jar
。 -
将下载或者编译的 JAR 包放在 Flink 的
lib
目录中。 -
重启 Flink。