从 Apache Flink® 持续导入
StarRocks 提供 Apache Flink® 连接器 (以下简称 Flink connector),可以通过 Flink 导入数据至 StarRocks表。
基本原理是 Flink connector 在内存中积攒小批数据,再通过 Stream Load 一次性导入 StarRocks。
Flink Connector 支持 DataStream API,Table API & SQL 和 Python API。
StarRocks 提供的 Flink connector,相比于 Flink 提供的 flink-connector-jdbc,性能更优越和稳定。
注意
使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。
版本要求
Connector | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 及以上 | 8 | 2.11,2.12 |
获取 Flink connector
您可以通过以下方式获取 Flink connector JAR 文件:
- 直接下载已经编译好的 JAR 文件。
- 在 Maven 项目的 pom 文件添加 Flink connector 为依赖项,作为依赖下载。
- 通过源码手动编译成 JAR 文件。
Flink connector JAR 文件的命名格式如下:
- 适用于 Flink 1.15 版本及以后的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar
。例如您安装了 Flink 1.15,并且想要使用 1.2.7 版本的 Flink connector,则您可以使用flink-connector-starrocks-1.2.7_flink-1.15.jar
。 - 适用于 Flink 1.15 版本之前的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar
。例如您安装了 Flink 1.14 和 Scala 2.12,并且您想要使用 1.2.7 版本的 Flink connector,您可以使用flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar
。
注意
一般情况下最新版本的 Flink connector 只维护最近 3 个版本的 Flink。
直接下载
可以在 Maven Central Repository 获取不同版本的 Flink connector JAR 文件。
Maven 依赖
在 Maven 项目的 pom.xml
文件中,根据以下格式将 Flink connector 添加为依赖项。将 flink_version
、scala_version
和 connector_version
分别替换为相应的版本。
-
适用于 Flink 1.15 版本及以后的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
适用于 Flink 1.15 版本之前的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
手动编译
-
执行以下命令将 Flink connector 的源代码编译成一个 JAR 文件。请注意,将
flink_version
替换为相应的Flink 版本。sh build.sh <flink_version>
例如,如果您的环境中的 Flink 版本为1.15,您需要执行以下命令:
sh build.sh 1.15
-
前往
target/
目录,找到编译完成的 Flink connector JAR 文件,例如flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar
,该文件在编译过程中生成。注意:
未正式发布的 Flink connector 的名称包含
SNAPSHOT
后缀。
参数说明
参数 | 是否必填 | 默认值 | 描述 |
---|---|---|---|
connector | Yes | NONE | 固定设置为 starrocks 。 |
jdbc-url | Yes | NONE | 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2> 。 |
load-url | Yes | NONE | 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2> 。 |
database-name | Yes | NONE | StarRocks 数据库名。 |
table-name | Yes | NONE | StarRocks 表名。 |
username | Yes | NONE | StarRocks 集群的用户名。使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。 |
password | Yes | NONE | StarRocks 集群的用户密码。 |
sink.semantic | No | at-least-once | sink 保证的语义。有效值:at-least-once 和 exactly-once。 |
sink.version | No | AUTO | 导入数据的接口。此参数自 Flink connector 1.2.4 开始支持。
|
sink.label-prefix | No | NONE | 指定 Stream Load 使用的 label 的前缀。 如果 Flink connector 版本为 1.2.8 及以上,并且 sink 保证 exactly-once 语义,则建议配置 label 前缀。详细信息,参见exactly once。 |
sink.buffer-flush.max-bytes | No | 94371840(90M) | 积攒在内存的数据大小,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64MB, 10GB]。将此参数设置为较大的值可以提高导入性能,但可能会增加导入延迟。 该参数只在 sink.semantic 为at-least-once 才会生效。 sink.semantic 为 exactly-once ,则只有 Flink checkpoint 触发时 flush 内存的数据,因此该参数不生效。 |
sink.buffer-flush.max-rows | No | 500000 | 积攒在内存的数据条数,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64000, 5000000]。该参数只在 sink.version 为 V1 ,sink.semantic 为 at-least-once 才会生效。 |
sink.buffer-flush.interval-ms | No | 300000 | 数据发送的间隔,用于控制数据写入 StarRocks 的延迟,取值范围:[1000, 3600000]。该参数只在 sink.semantic 为 at-least-once 才会生效。 |
sink.max-retries | No | 3 | Stream Load 失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0, 10]。该参数只在 sink.version 为 V1 才会生效。 |
sink.connect.timeout-ms | No | 30000 | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 Flink connector v1.2.9 之前,默认值为 1000 。 |
sink.socket.timeout-ms | No | -1 | 此参数自 Flink connector 1.2.10 开始支持。HTTP 客户端等待数据的超时时间。单位:毫秒。默认值 -1 表示没有超时时间。 |
sink.wait-for-continue.timeout-ms | No | 10000 | 此参数自 Flink connector 1.2.7 开始支持。等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。 |
sink.ignore.update-before | No | TRUE | 此参数自 Flink connector 1.2.8 开始支持。将数据导入到主键表时,是否忽略来自 Flink 的 UPDATE_BEFORE 记录。如果将此参数设置为 false,则将该记录在主键表中视为 DELETE 操作。 |
sink.parallelism | No | NONE | 写入的并行度。仅适用于 Flink SQL。如果未设置, Flink planner 将决定并行度。在多并行度的场景中,用户需要确保数据按正确顺序写入。 |
sink.properties.* | No | NONE | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 sink.properties.format 表示 Stream Load 所导入的数据格式,如 CSV 或者 JSON。全部参数和解释,请参见 STREAM LOAD。 |
sink.properties.format | No | csv | Stream Load 导入时的数据格式。Flink connector 会将内存的数据转换为对应格式,然后通过 Stream Load 导入至 StarRocks。取值为 CSV 或者 JSON。 |
sink.properties.column_separator | No | \t | CSV 数据的列分隔符。 |
sink.properties.row_delimiter | No | \n | CSV 数据的行分隔符。 |
sink.properties.max_filter_ratio | No | 0 |