使用 Routine Load 导入数据
本文介绍 Routine Load 的基本原理、以及如何通过 Routine Load 持续消费 Apache Kafka® 的消息并导入至 StarRocks 中。
如果您需要将消息流不间断地导入至 StarRocks,则可以将消息流存储在 Kafka 的 Topic 中,并向 StarRocks 提交一个 Routine Load 导入作业。 StarRocks 会常驻地运行这个导入作业,持续生成一系列导入任务,消费 Kafka 集群中该 Topic 中的全部或部分分区的消息并导入到 StarRocks 中。
Routine Load 支持 Exactly-Once 语义,能够保证数据不丢不重。
Routine Load 支持在导入过程中做数据转换、以及通过 UPSERT 和 DELETE 操作实现数据变更。请参见导入过程中实现数据转换和通过导入实现数据变更。
注意
Routine Load 操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权。
支持的数据文件格式
Routine Load 目前支持从 Kakfa 集群中消费 CSV、JSON、Avro (自 v3.0.1) 格式的数据。
说明
对于 CSV 格式的数据,需要注意以下两点:
- StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符,包括常见的逗号 (,)、Tab 和 Pipe (|)。
- 空值 (null) 用
\N
表示。比如,数据文件一共有三列,其中某行数据的第一列、第三列数据分别为a
和b
,第二列没有数据,则第二列需要用\N
来表示空值,写作a,\N,b
,而不是a,,b
。a,,b
表示第二列是一个空字符串。
基本原理
概念
-
导入作业(Load job)
导入作业会常驻运行,当导入作业的状态为 RUNNING 时,会持续不断生成一个或多个并行的导入任务,不断消费 Kafka 集群中一个 Topic 的消息,并导入至 StarRocks 中。
-
导入任务(Load task)
导入作业会按照一定规则拆分成若干个导入任务。导入任务是执行导入的基本单位,作为一个独立的事务,通过 Stream Load 导入机制实现。若干个导入任务并行消费一个 Topic 中不同分区的消息,并导入至 StarRocks 中。
作业流程
-
创建常驻的导入作业
您需要向 StarRocks 提交创建导入作业的 SQL 语句。 FE 会解析 SQL 语句,并创建一个常驻的导入作业。
-
拆分导入作业为多个导入任务
FE 将导入作业按照一定规则拆分成若干个导入任务。一个导入任务作为一个独立的事务。
拆分规则如下:
- FE 根据期望任务并行度
desired_concurrent_number
、Kafka Topic 的分区数量、存活 BE 数量等,计算得出实际任务并行度。 - FE 根据实际任务并行度将导入作业分为若干导入任务,放入任务执行队列中。
每个 Topic 会有多个分区,分区与导入任务之间的对应关系为:
- 每个分区的消息只能由一个对应的导入任务消费。
- 一个导入任务可能会消费一个或多个分区。
- 分区会尽可能均匀地分配给导入任务。
- FE 根据期望任务并行度
-
多个导入任务并行进行,消费 Kafka 多个分区的消息,导入至 StarRocks
-
调度和提交导入任务:FE 定时调度任务执行队列中的导入任务,分配给选定的 Coordinator BE。调度导入任务的时间间隔由
max_batch_interval
参数,并且 FE 会尽可能均匀地向所有 BE 分配导入任务。有关max_batch_interval
参数的详细介绍,请参见 CREATE ROUTINE LOAD。 -
执行导入任务:Coordinator BE 执行导入任务,消费分区的消息,解析并过滤数据。导入任务需要消费足够多的消息,或者消费足够长时间。消费时间和消费的数据量由 FE 配置项
max_routine_load_batch_size
、routine_load_task_consume_second
决定,有关该配置项的更多说明,请参见 配置参数。然后,Coordinator BE 将消息分发至相关 Executor BE 节点, Executor BE 节点将消息写入磁盘。
说明
StarRocks 支持通过安全协议,包括 SASL_SSL、SAS_PLAINTEXT、SSL,或者 PLAINTEXT 来连接 Kafka。本文以 PLAINTEXT 方式连接 Kafka 为例进行演示,如果您需要通过其他安全协议连接 Kafka,请参见CREATE ROUTINE LOAD。
-
-
持续生成新的导入任务,不间断地导入数据
Executor BE 节点成功写入数据后, Coordonator BE 会向 FE 汇报导入结果。
FE 根据汇报结果,继续生成新的导入任务,或者对失败的导入任务进行重试,连续地导入数据,并且能够保证导入数据不丢不重。
准备工作
下载并安装 Kafka。创建 Topic 并且上传数据。操作步骤,请参见Apache Kafka quickstart。