从 Apache Flink® 持续导入
StarRocks 提供 Apache Flink® 连接器 (以下简称 Flink connector),可以通过 Flink 导入数据至 StarRocks表。
基本原理是 Flink connector 在内存中积攒小批数据,再通过 Stream Load 一次性导入 StarRocks。
Flink Connector 支持 DataStream API,Table API & SQL 和 Python API。
StarRocks 提供的 Flink connector,相比于 Flink 提供的 flink-connector-jdbc,性能更优越和稳定。
注意
使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。
版本要求
| Connector | Flink | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 |
| 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 |
| 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 及更高版本 | 8 | 2.11,2.12 |
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及更高版本 | 8 | 2.11,2.12 |
获取 Flink connector
您可以通过以下方式获取 Flink connector JAR 文件:
- 直接下载已经编译好的 JAR 文件。
- 在 Maven 项目的 pom 文件添加 Flink connector 为依赖项,作为依赖下载。
- 通过源码手动编译成 JAR 文件。
Flink connector JAR 文件的命名格式如下:
- 适用于 Flink 1.15 版本及以后的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar。例如您安装了 Flink 1.15,并且想要使用 1.2.7 版本的 Flink connector,则您可以使用flink-connector-starrocks-1.2.7_flink-1.15.jar。 - 适用于 Flink 1.15 版本之前的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar。例如您安装了 Flink 1.14 和 Scala 2.12,并且您想要使用 1.2.7 版本的 Flink connector,您可以使用flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar。
注意
一般情况下最新版本的 Flink connector 只维护最近 3 个版本的 Flink。
直接下载
可以在 Maven Central Repository 获取不同版本的 Flink connector JAR 文件。
Maven 依赖
在 Maven 项目的 pom.xml 文件中,根据以下格式将 Flink connector 添加为依赖项。将 flink_version、scala_version 和 connector_version 分别替换为相应的版本。
-
适用于 Flink 1.15 版本及以后的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
适用于 Flink 1.15 版本之前的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
手动编译
-
执行以下命令将 Flink connector 的源代码编译成一个 JAR 文件。请注意,将
flink_version替换为相应的Flink 版本。sh build.sh <flink_version>例如,如果您的环境中的 Flink 版本为1.15,您需要执行以下命令:
sh build.sh 1.15 -
前往
target/目录,找到编译完成的 Flink connector JAR 文件,例如flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar,该文件在编译过程中生成。注意:
未正式发布的 Flink connector 的名称包含
SNAPSHOT后缀。
参数说明
常用选项
connector
- 是否必填: 是
- 默认值: NONE
- 描述: 您要使用的连接器。该值必须为 "starrocks"。
jdbc-url
- 是否必填: 是
- 默认值: NONE
- 描述: 用于连接 FE 的 MySQL 服务器的地址。您可以指定多个地址,地址之间必须使用英文逗号 (,) 分隔。格式:
jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>。
load-url
- 是否必填:是
- 默认值:无
- 描述:用于连接 FE 的 HTTP 服务的地址。您可以指定多个地址,地址之间使用分号 (;) 分隔。格式:
<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>。
database-name
- 是否必填:是
- 默认值:无
- 描述:您要将数据导入的 StarRocks 数据库的名称。
table-name
- 是否必填:是
- 默认值:无
- 描述:您要将数据导入到 StarRocks 中的表的名称。
username
- 是否必填:是
- 默认值:无
- 描述:用于将数据导入到 StarRocks 中的帐户的用户名。该帐户需要具有目标 StarRocks 表的 SELECT 和 INSERT 权限 。
password
- 是否必填:是
- 默认值:无
- 描述:上述账号的密码。
sink.version
- 是否必填:否
- 默认值:AUTO
- 描述:用于数据导入的接口。该参数自 Flink connector 1.2.4 版本起支持。取值范围:
V1: 使用 Stream Load 接口导入数据。1.2.4 之前的 Connector 仅支持此模式。V2: 使用 Stream Load transaction 接口导入数据。要求 StarRocks 版本至少为 2.4。推荐使用V2,因为它优化了内存使用,并提供了更稳定的 exactly-once 实现。AUTO: 如果 StarRocks 版本支持事务 Stream Load,则自动选择V2,否则选择V1。
sink.label-prefix
- 是否必填:否
- 默认值:无
- 描述:Stream Load使用的标签前缀。如果您正在使用connector 1.2.8及更高版本的exactly-once,建议您配置它。请参见 exactly-once 使用说明。
sink.semantic
- 是否必填: 否
- 默认值: at-least-once
- 描述: sink 提供的语义保障。有效值:at-least-once 和 exactly-once。
sink.buffer-flush.max-bytes
- 是否必须配置:否
- 默认值:94371840(90M)
- 描述:在一次性发送到 StarRocks 之前,可 以在内存中累积的最大数据量。最大值的范围是 64 MB 到 10 GB。将此参数设置为较大的值可以提高数据导入性能,但也可能会增加数据导入延迟。此参数仅在
sink.semantic设置为at-least-once时生效。如果sink.semantic设置为exactly-once,则会在触发 Flink checkpoint 时刷新内存中的数据。在这种情况下,此参数不生效。
sink.buffer-flush.max-rows
- 是否必填:否
- 默认值:500000
- 描述:一次发送到 StarRocks 之前可以在内存中累积的最大行数。此参数仅在
sink.version为V1且sink.semantic为at-least-once时可用。有效值:64000 到 5000000。
sink.buffer-flush.interval-ms
- 是否必填: 否
- 默认值: 300000
- 描述: 数据刷新的间隔。仅当
sink.semantic为at-least-once时,此参数才可用。单位:毫秒。有效取值范围:- v1.2.14 之前的版本:[1000, 3600000]
- v1.2.14 及更高版本:(0, 3600000]
sink.max-retries
- 是否必填:否
- 默认值:3
- 描述:系统重试执行 Stream Load 作业的次数。仅当您将
sink.version设置为V1时,此参数才可用。有效值:0 到 10。
sink.connect.timeout-ms
- 是否必填:否
- 默认值:30000
- 描述:建立 HTTP 连接的超时时间。有效值:100 到 60000。单位:毫秒。在 Flink connector v1.2.9 之前的版本中,默认值为
1000。
sink.socket.timeout-ms
- 是否必填:否
- 默认值:-1
- 描述:自 1.2.10 版本起支持。HTTP 客户端等待数据的时间。单位:毫秒。默认值
-1表示没有超时时间。
sink.sanitize-error-log
- Required: No
- Default value: false
- Description: 自 1.2.12 版本起支持。是否对生产环境安全相关的错误日志中的敏感数据进行脱敏。如果设置为
true,连接器和 SDK 日志中的 Stream Load 错误日志中的敏感行数据和列值将被删除。为了向后兼容,该值默认为false。
sink.wait-for-continue.timeout-ms
- 是否必填:否
- 默认值:10000
- 描述:自 1.2.7 版本起支持。等待 FE 返回 HTTP 100-continue 响应的超时时间。取值范围:
3000到60000。单位:毫秒(ms)。
sink.ignore.update-before
- 是否必填:否
- 默认值:true
- 描述:自 1.2.8 版本起支持。是否在向主键表导入数据时忽略来自 Flink 的
UPDATE_BEFORE类型记录。如果设置为 false,则该记录会被当做删除操作。
sink.parallelism
- 是否必填:否
- 默认值:NONE
- 描述:数据导入的并行度。仅 适用于 Flink SQL。如果未指定此参数,则由 Flink planner 决定并行度。在多并行度的情况下,用户需要保证数据以正确的顺序写入。
sink.properties.*
- 是否必填:否
- 默认值:无
- 描述:用于控制 Stream Load 行为的参数。例如,参数
sink.properties.format指定用于 Stream Load 的格式,例如 CSV 或 JSON。有关支持的参数及其描述的列表,请参见 STREAM LOAD 。
sink.properties.format
- 是否必填:否
- 默认值:csv
- 描述:用于 Stream Load 的数据格式。Flink Connector 会将每批数据转换为指定格式,然后再发送到 StarRocks。有效值:
csv和json。
sink.properties.column_separator
- 是否必填: 否
- 默认值: \t
- 描述: CSV 格式数据的列分隔符。
sink.properties.row_delimiter
- 是否必填:否
- 默认值:\n
- 描述:CSV 格式数据中的行分隔符。
sink.properties.max_filter_ratio
- 是否必填:否
- 默认值:0
- 描述:Stream Load 的最大容错率。表示因数据质量不合格而允许过滤掉的数据记录的最大百分比。取值范围:
0~1。默认值:0。更多信息,请参见 Stream Load 。
sink.properties.partial_update
- 是否必填:否
- 默认值:
FALSE - 描述:是否使用部分更新。有效值为
TRUE和FALSE。默认值为FALSE,表示禁用此功能。
sink.properties.partial_update_mode
- 是否必填:否
- 默认值:
row - 描述:指定部分更新的模式。有效值:
row和column。row(默认值)表示行模式下的部分更新,更适合多列、小批量的实时更新。column表示列模式下的部分更新,更适合少列、多行的批量更新。在这种情况下,启用列模式可以提供更快的更新速度。例如,在一张有 100 列的表中,如果只更新所有行的 10 列(总列数的 10%),那么列模式的更新速度会快 10 倍。
sink.properties.strict_mode
- 是否必填:否
- 默认值:false
- 描述:是否开启 Stream Load 的严格模式。它会影响存在不合格行(例如列值不一致)时的数据导入行为。有效值:
true和false。默认值:false。详情请参见 Stream Load 。