Apache Flink
从 Apache Flink® 持续导入
StarRocks 提供 Apache Flink® 连接器 (以下简称 Flink connector),可以通过 Flink 导入数据至 StarRocks表。
基本原理是 Flink connector 在内存中积攒小批数据,再通过 Stream Load 一次性导入 StarRocks。
Flink Connector 支持 DataStream API,Table API & SQL 和 Python API。
StarRocks 提供的 Flink connector,相比于 Flink 提供的 flink-connector-jdbc,性能更优越和稳定。
注意
使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。
版本要求
| Connector | Flink | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 |
| 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 |
| 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 |
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及更高版本 | 8 | 2.11,2.12 |
获取 Flink connector
您可以通过以下方式获取 Flink connector JAR 文件:
- 直接下载已经编译好的 JAR 文件。
- 在 Maven 项目的 pom 文件添加 Flink connector 为依赖项,作为依赖下载。
- 通过源码手动编译成 JAR 文件。
Flink connector JAR 文件的命名格式如下:
- 适用于 Flink 1.15 版本及以后的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar。例如您安装了 Flink 1.15,并且想要使用 1.2.7 版本的 Flink connector,则您可以使用flink-connector-starrocks-1.2.7_flink-1.15.jar。 - 适用于 Flink 1.15 版本之前的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar。例如您安装了 Flink 1.14 和 Scala 2.12,并且您想要使用 1.2.7 版本的 Flink connector,您可以使用flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar。
注意
一般情况下最新版本的 Flink connector 只维护最近 3 个版本的 Flink。
直接下载
可以在 Maven Central Repository 获取不同版本的 Flink connector JAR 文件。
Maven 依赖
在 Maven 项目的 pom.xml 文件中,根据以下格式将 Flink connector 添加为依赖项。将 flink_version、scala_version 和 connector_version 分别替换为相应的版本。
-
适用于 Flink 1.15 版本及以后的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
适用于 Flink 1.15 版本之前的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
手动编译
-
执行以下命令将 Flink connector 的源代码编译成一个 JAR 文件。请注意,将
flink_version替换为相应的Flink 版本。sh build.sh <flink_version>例如,如果您的环境中的 Flink 版本为1.15,您需要执行以下命令:
sh build.sh 1.15 -
前往
target/目录,找到编译完成的 Flink connector JAR 文件,例如flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar,该文件在编译过程中生成。注意:
未正式发布的 Flink connector 的名称包含
SNAPSHOT后缀。
参数说明
常用选项
connector
- 是否必填: 是
- 默认值: NONE
- 描述: 您要使用的连接器。该值必须为 "starrocks"。
jdbc-url
- 是否必填: 是
- 默认值: NONE
- 描述: 用于连接 FE 的 MySQL 服务器的地址。您可以指定多个地址,地址之间必须使用英文逗号 (,) 分隔。格式:
jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>。
load-url
- 是否必填:是
- 默认值:无
- 描述:用于连接 FE 的 HTTP 服务的地址。您可以指定多个地址,地址之间使用分号 (;) 分隔。格式:
<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>。
database-name
- 是否必填:是
- 默认值:无
- 描述:您要将数据导入的 StarRocks 数据库的名称。
table-name
- 是否必填:是
- 默认值:无
- 描述:您要将数据导入到 StarRocks 中的表的名称。
username
- 是否必填:是
- 默认值:无
- 描述:用于将数据导入到 StarRocks 中的帐户的用户名。该帐户需要具有目标 StarRocks 表的 SELECT 和 INSERT 权限 。
password
- 是否必填:是
- 默认值:无
- 描述:上述账号的密码。
sink.version
- 是否必填:否
- 默认值:AUTO
- 描述:用于数据导入的接口。该参数自 Flink connector 1.2.4 版本起支持。取值范围:
V1: 使用 Stream Load 接口导入数据。1.2.4 之前的 Connector 仅支持此模式。V2: 使用 Stream Load transaction 接口导入数据。要求 StarRocks 版本至少为 2.4。推荐使用V2,因为它优化了内存使用,并提供了更稳定的 exactly-once 实现。AUTO: 如果 StarRocks 版本支持事务 Stream Load,则自动选择V2,否则选择V1。
sink.label-prefix
- 是否必填:否
- 默认值:无
- 描述:Stream Load使用的标签前缀。如果您正在使用connector 1.2.8及更高版本的exactly-once,建议您配置它。请参见 exactly-once 使用说明。
sink.semantic
- 是否必填: 否
- 默认值: at-least-once
- 描述: sink 提供的语义保障。有效值:at-least-once 和 exactly-once。
sink.buffer-flush.max-bytes
- 是否必须配置:否
- 默认值:94371840(90M)
- 描述:在一次性发送到 StarRocks 之前,可以在内存中累积的最大数据量。最大值的范围是 64 MB 到 10 GB。将此参数设置为较大的值可以提高数据导入性能,但也可能会增加数据导入延迟。此参数仅在
sink.semantic设置为at-least-once时生效。如果sink.semantic设置为exactly-once,则会在触发 Flink checkpoint 时刷新内存中的数据。在这种情况下,此参数不生效。
sink.buffer-flush.max-rows
- 是否必填:否
- 默认值:500000
- 描述:一次发送到 StarRocks 之前可以在内存中累积的最大行数。此参数仅在
sink.version为V1且sink.semantic为at-least-once时可用。有效值:64000 到 5000000。