从 Apache Flink® 持续导入
StarRocks 提供 Apache Flink® 连接器 (以下简称 Flink connector),可以通过 Flink 导入数据至 StarRocks表。
基本原理是 Flink connector 在内存中积攒小批数据,再通过 Stream Load 一次性导入 StarRocks。
Flink Connector 支持 DataStream API,Table API & SQL 和 Python API。
StarRocks 提供的 Flink connector,相比于 Flink 提供的 flink-connector-jdbc,性能更优越和稳定。
注意
使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。
版本要求
| Connector | Flink | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 及以上 | 8 | 2.11,2.12 |
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及以上 | 8 | 2.11,2.12 |
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 及以上 | 8 | 2.11,2.12 |
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 及以上 | 8 | 2.11,2.12 |
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 及以上 | 8 | 2.11,2.12 |
获取 Flink connector
您可以通过以下方式获取 Flink connector JAR 文件:
- 直接下载已经编译好的 JAR 文件。
- 在 Maven 项目的 pom 文件添加 Flink connector 为依赖项,作为依赖下载。
- 通过源码手动编译成 JAR 文件。
Flink connector JAR 文件的命名格式如下:
- 适用于 Flink 1.15 版本及以后的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar。例如您安装了 Flink 1.15,并且想要使用 1.2.7 版本的 Flink connector,则您可以使用flink-connector-starrocks-1.2.7_flink-1.15.jar。 - 适用于 Flink 1.15 版本之前的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar。例如您安装了 Flink 1.14 和 Scala 2.12,并且您想要使用 1.2.7 版本的 Flink connector,您可以使用flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar。
注意
一般情况下最新版本的 Flink connector 只维护最近 3 个版本的 Flink。
直接下载
可以在 Maven Central Repository 获取不同版本的 Flink connector JAR 文件。
Maven 依赖
在 Maven 项目的 pom.xml 文件中,根据以下格式将 Flink connector 添加为依赖项。将 flink_version、scala_version 和 connector_version 分别替换为相应的版本。
-
适用于 Flink 1.15 版本及以后的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
适用于 Flink 1.15 版本之前的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
手动编译
-
执行以下命令将 Flink connector 的源代码编译成一个 JAR 文件。请注意,将
flink_version替换为相应的Flink 版本。sh build.sh <flink_version>例如,如果您的环境中的 Flink 版本为1.15,您需要执行以下命令:
sh build.sh 1.15 -
前往
target/目录,找到编译完成的 Flink connector JAR 文件,例如flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar,该文件在编译过程中生成。注意:
未正式发布的 Flink connector 的名称包含
SNAPSHOT后缀。
参数说明
| 参数 | 是否必填 | 默认值 | 描述 |
|---|---|---|---|
| connector | Yes | NONE | 固定设置为 starrocks。 |
| jdbc-url | Yes | NONE | 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>。 |
| load-url | Yes | NONE | 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>。 |
| database-name | Yes | NONE | StarRocks 数据库名。 |
| table-name | Yes | NONE | StarRocks 表名。 |
| username | Yes | NONE | StarRocks 集群的用户名。使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。 |
| password | Yes | NONE | StarRocks 集群的用户密码。 |
| sink.semantic | No | at-least-once | sink 保证的语义。有效值:at-least-once 和 exactly-once。 |
| sink.version | No | AUTO | 导入数据的接口。此参数自 Flink connector 1.2.4 开始支持。
|
| sink.label-prefix | No | NONE | 指定 Stream Load 使用的 label 的前缀。 如果 Flink connector 版本为 1.2.8 及以上,并且 sink 保证 exactly-once 语义,则建议配置 label 前缀。详细信息,参见exactly once。 |
| sink.buffer-flush.max-bytes | No | 94371840(90M) | 积攒在内存的数据大小,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64MB, 10GB]。将此参数设置为较大的值可以提高导入性能,但可能会增加导入延迟。 该参数只在 sink.semantic 为at-least-once才会生效。 sink.semantic 为 exactly-once,则只有 Flink checkpoint 触发时 flush 内存的数据,因此该参数不生效。 |
| sink.buffer-flush.max-rows | No | 500000 | 积攒在内存的数据条数,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64000, 5000000]。该参数只在 sink.version 为 V1,sink.semantic 为 at-least-once 才会生效。 |
| sink.buffer-flush.interval-ms | No | 300000 | 数据发送的间隔,用于控制数据写入 StarRocks 的延迟,取值范围:[1000, 3600000]。该参数只在 sink.semantic 为 at-least-once才会生效。 |
| sink.max-retries | No | 3 | Stream Load 失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0, 10]。该参数只在 sink.version 为 V1 才会生效。 |
| sink.connect.timeout-ms | No | 30000 | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 Flink connector v1.2.9 之前,默认值为 1000。 |
| sink.socket.timeout-ms | No | -1 | 此参数自 Flink connector 1.2.10 开始支持。HTTP 客户端等待数据的超时时间。单位:毫秒。默认值 -1 表示没有超时时间。 |
| sink.sanitize-error-log | No | false | 此参数自 Flink connector 1.2.12 开始支持。用于控制是否对错误日志中的敏感数据进行清理,保护生产环境数据安全。当此项设置为 true 时,Stream Load 错误日志中的敏感行数据和列值将在连接器和 SDK 日志中被屏蔽。为保持向后兼容性,默认值为 false。 |
| sink.wait-for-continue.timeout-ms | No | 10000 | 此参数自 Flink connector 1.2.7 开始支持。等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。 |
| sink.ignore.update-before | No | TRUE | 此参数自 Flink connector 1.2.8 开始支持。将数据导入到主键表时,是否忽略来自 Flink 的 UPDATE_BEFORE 记录。如果将此参数设置为 false,则将该记录在主键表中视为 DELETE 操作。 |
| sink.parallelism | No | NONE | 写入的并行度。仅适用于 Flink SQL。如果未设置, Flink planner 将决定并行度。在多并行度的场景中,用户需要确保数据按正确顺序写入。 |
| sink.properties.* | No | NONE | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 sink.properties.format 表示 Stream Load 所导入的数据格式,如 CSV 或者 JSON。全部参数和解释,请参见 STREAM LOAD。 |
| sink.properties.format | No | csv | Stream Load 导入时的数据格式。Flink connector 会将内存的数据转换为对应格式,然后通过 Stream Load 导入至 StarRocks。取值为 CSV 或者 JSON。 |
| sink.properties.column_separator | No | \t | CSV 数据的列分隔符。 |
| sink.properties.row_delimiter | No | \n | CSV 数据的行分隔符。 |
| sink.properties.max_filter_ratio | No | 0 | 导入作业的最大容错率,即导入作业能够容忍的因数据质量不合格而过滤掉的数据行所占的最大比例。取值范围:0~1。默认值:0 。详细信息,请参见 STREAM LOAD。 |
| sink.properties.partial_update | No | false | 是否使用部分更新。取值包括 TRUE 和 FALSE。默认值:FALSE。 |
| sink.properties.partial_update_mode | No | row | 指定部分更新的模式,取值包括 row 和 column。
|
| sink.properties.strict_mode | No | false | 是否为 Stream Load 启用严格模式。在导入数据中出现不合格行(如列值不一致)时,严格模式会影响导入行为。有效值: true 和 false。具体参考 STREAM LOAD。 |
| sink.properties.compression | No | NONE | 用于 Stream Load 的压缩算法。有效值:lz4_frame。压缩 JSON 格式需要 Flink Connector 1.2.10+ 和 StarRocks v3.2.7+。压缩 CSV 格式仅需要 Flink Connector 1.2.11+。 |
| sink.properties.prepared_timeout | No | NONE | 自 Flink Connector 1.2.12版本起支持,且仅当 sink.version 为 V2 时生效。需StarRocks 3.5.4 及以上版本。设置事务 Stream Load 阶段从 PREPARED 到 COMMITTED 的超时时间(单位:秒)。通常仅需在 exactly-once 模式下设置;at-least-once 模式通常无需设置(Connector 默认值为 300 秒)。若在精确一次模式下未设置,则采用 StarRocks FE 配置项 prepared_transaction_default_timeout_second(默认 86400 秒)。详见StarRocks 事务超时管理。 |
数据类型映射
| Flink 数据类型 | StarRocks 数据类型 |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| BINARY | INT |
| CHAR | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| DATE | DATE |
| TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
| ARRAY<T> | ARRAY<T> |
| MAP<KT,VT> | JSON STRING |
| ROW<arg T...> | JSON STRING |
使用说明
Exactly Once
-
如果您希望 sink 保证 exactly-once 语义,则建议升级 StarRocks 到 2.5 或更高版本,并将 Flink connector 升级到 1.2.4 或更高版本。
- 自 2.4 版本 StarRocks 开始支持 Stream Load 事务接口。自 Flink connector 1.2.4 版本起, Sink 基于 Stream Load 事务接口重新设计 exactly-once 的实现,相较于原来基于 Stream Load 非事务接口实现的 exactly-once,降低了内存使用和 checkpoint 耗时,提高了作业的实时性和稳定性。
- 自 Flink connector 1.2.4 版本起,如果 StarRocks 支持 Stream Load 事务接口,则 Sink 默认使用 Stream Load 事务接口,如果需要使用 Stream Load 非事务接口实现,则需要配置
sink.version为V1。
注意
如果只升级 StarRocks 或 Flink connector,sink 会自动选择 Stream Load 非事务接口实现。
-
sink 保证 exactly-once 语义相关配置
-
sink.semantic的值必须为exactly-once. -
如果 Flink connector 版本为 1.2.8 及更高,则建议指定
sink.label-prefix的值。需要注意的是,label 前缀在 StarRocks 的所有类型的导入作业中必须是唯一的,包括 Flink job、Routine Load 和 Broker Load。-
如果指定了 label 前缀,Flink connector 将使用 label 前缀清理因为 Flink job 失败而生成的未完成事务,例如在checkpoint 进行过程中 Flink job 失败。如果使用
SHOW PROC '/transactions/<db_id>/running';查看这些事务在 StarRock 的状态,则返回结果会显示事务通常处于PREPARED状态。当 Flink job 从 checkpoint 恢复时,Flink connector 将根据 label 前缀和 checkpoint 中的信息找到这些未完成的事务,并中止事务。当 Flink job 因某种原因退出时,由于采用了两阶段提交机制来实现 exactly-once语义,Flink connector 无法中止事务。当 Flink 作业退出时,Flink connector 尚未收到来自 Flink checkpoint coordinator 的通知,说明这些事务是否应包含在成功的 checkpoint 中,如果中止这些事务,则可能导致数据丢失。您可以在这篇文章中了解如何在 Flink 中实现端到端的 exactly-once。 -
若未指定 label 前缀,StarRocks 仅会在超时后清理滞留事务。但若 Flink 作业在事务超时前频繁失败,运行中的事务数量可能达到 StarRocks
max_running_txn_num_per_db的限制。当标签前缀未指定时,可为PREPARED事务设置更短的超时时间使其更快失效。关于预备状态超时设置方法,请参阅以下说明。
-
-
-
如果您确定 Flink job 将在长时间停止后最终会使用 checkpoint 或 savepoint 恢复,则为避免数据丢失,请调整以下 StarRocks 配置:
-
调整
PREPARED事务超时。关于如何设置超时,请参阅以下说明。该超时时间需大于 Flink 作业的停机时间。否则,在重启 Flink 作业前,包含在成功 checkpoint 中的滞留事务可能因超时而被中止,导致数据丢失。
请注意:当您将此配置值设为较大数值时,建议同时指定
sink.label-prefix的值,以便根据标签前缀和检查点中的信息清理滞留事务,而非依赖超时机制(后者可能导致数据丢失)。 -
label_keep_max_second和label_keep_max_num:StarRocks FE 参数,默认值分别为259200和1000。更多信息,参见FE 配置。label_keep_max_second的值需要大于 Flink job 的停止时间。否 则,Flink connector 无法使用保存在 Flink 的 savepoint 或 checkpoint 中的事务 label 来检查事务在 StarRocks 中的状态,并判断这些事务是否已提交,最终可能导致数据丢失。
-
-
如何设置
PREPARED事务的超时时间-
对于 Connector 1.2.12+ 和 StarRocks 3.5.4+,可通过配置连接器参数
sink.properties.prepared_timeout设置超时值。默认情况下该值未设置,此时将回退至 StarRocks FE 的全局配置prepared_transaction_default_timeout_second(默认值为86400)。 -
对于其他版本的连接器或 StarRocks,可通过配置 StarRocks FE 的全局配置项
prepared_transaction_default_timeout_second(默认值为86400)来设置超时。
-
Flush 策略
Flink connector 先在内存中 buffer 数据,然后通过 Stream Load 将其一次性 flush 到 StarRocks。在 at-least-once 和 exactly-once 场景中使用不同的方式触发 flush 。
对于 at-least-once,在满足以下任何条件时触发 flush:
- buffer 数据的字节达到限制
sink.buffer-flush.max-bytes - buffer 数据行数达到限制
sink.buffer-flush.max-rows。(仅适用于版本 V1) - 自上次 flush 以来经过的时间达到限制
sink.buffer-flush.interval-ms - 触发了 checkpoint
对于 exactly-once,仅在触发 checkpoint 时触发 flush。