使用 Routine Load 导入数据
本文介绍 Routine Load 的基本原理、以及如何通过 Routine Load 持续消费 Apache Kafka® 的消息并导入至 StarRocks 中。
如果您需要将消息流不间断地导入至 StarRocks,则可以将消息流存储在 Kafka 的 Topic 中,并向 StarRocks 提交一个 Routine Load 导入作业。 StarRocks 会常驻地运行这个导入作业,持续生成一系列导入任务,消费 Kafka 集群中该 Topic 中的全部或部分分区的消息并导入到 StarRocks 中。
Routine Load 支持 Exactly-Once 语义,能够保证数据不丢不重。
Routine Load 支持在导入过程中做数据转换、以及通过 UPSERT 和 DELETE 操作实现数据变更。请参见导入过程中实现数据转换和通过导入实现数据变更。
支持的数据文件格式
Routine Load 目前支持从 Kakfa 集群中消费 CSV、JSON 格式的数据。
说明
对于 CSV 格式的数据,需要注意以下两点:
- StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符,包括常见的逗号 (,)、Tab 和 Pipe (|)。
- 空值 (null) 用
\N
表示。比如,数据文件一共有三列,其中某行数据的第一列、第三列数据分别为a
和b
,第二列没有数据,则第二列需要用\N
来表示空值,写作a,\N,b
,而不是a,,b
。a,,b
表示第二列是一个空字符串。
基本原理
概念
-
导入作业(Load job)
导入作业会常驻运行,当导入作业的状态为 RUNNING 时,会持续不断生成一个或多个并行的导入任务,不断消费 Kafka 集群中一个 Topic 的消息,并导入至 StarRocks 中。
-
导入任务(Load task)
导入作业会按照一定规则拆分成若干个导入任务。导入任务是执行导入的基本单位,作为一个独立的事务,通过 Stream Load 导入机制实现。若干个导入任务并行消费一个 Topic 中不同分区的消息,并导入至 StarRocks 中。
作业流程
-
创建常驻的导入作业
您需要向 StarRocks 提交创建导入作业的 SQL 语句。 FE 会解析 SQL 语句,并创建一个常驻的导入作业。
-
拆分导入作业为多个导入任务
FE 将导入作业按照一定规则拆分成若干个导入任务。一个导入任务作为一个独立的事务。
拆分规则如下:
- FE 根据期望任务并行度
desired_concurrent_number
、Kafka Topic 的分区数量、存活 BE 数量等,计算得出实际任务并行度。 - FE 根据实际任务并行度将导入作业分为若干导入任务,放入任务执行队列中。
每个 Topic 会有多个分区,分区与导入任务之间的对应关系为:
- 每个分区的消息只能由一个对应的导入任务消费。
- 一个导入任务可能会消费一个或多个分区。
- 分区会尽可能均匀地分配给导入任务。
- FE 根据期望任务并行度
-
多个导入任务并行进行,消费 Kafka 多个分区的消息,导入至 StarRocks
-
调度和提交导入任务:FE 定时调度任务执行队列中的导入任务,分配给选定的 Coordinator BE。调度导入任务的时间间隔由
max_batch_interval
参数,并且 FE 会尽可能均匀地向所有 BE 分配导入任务。有关max_batch_interval
参数的详细介绍,请参见 CREATE ROUTINE LOAD。 -
执行导入任务:Coordinator BE 执行导入任务,消费分区的消息,解析并过滤数据。导入任务需要消费足够多的消息,或者消费足够长时间。消费时间和消费的数据量由 FE 配置项
max_routine_load_batch_size
、routine_load_task_consume_second
决定,有关该配置项的更多说明,请参见 配置参数。然后,Coordinator BE 将消息分发至相关 Executor BE 节点, Executor BE 节点将消息写入磁盘。
说明
StarRocks 支持通过安全协议,包括 SASL_SSL、SAS_PLAINTEXT、SSL,或者 PLAINTEXT 来连接 Kafka。本文以 PLAINTEXT 方式连接 Kafka 为例进行演示,如果您需要通过其他安全协议连接 Kafka,请参见CREATE ROUTINE LOAD。
-
-
持续生成新的导入任务,不间断地导入数据
Executor BE 节点成功写入数据后, Coordonator BE 会向 FE 汇报导入结果。
FE 根据汇报结果,继续生成新的导入任务,或者对失败的导入任务进行重试,连续地导入数据,并且能够保证导入数据不丢不重。
准备工作
下载并安装 Kafka。创建 Topic 并且上传数据。操作步骤,请参见Apache Kafka quickstart。
创建导入作业
这里通过两个简单的示例,介绍如何通过 Routine Load 持续消费 Kafka 中 CSV 和 JSON 格式的数据,并导入至 StarRocks 中。有关创建 Routine Load 的详细语法和参数说明,请参见 CREATE ROUTINE LOAD。
导入 CSV 数据
本小节介绍如何创建一个 Routine Load 导入作业,持续不断地消费 Kafka 集群的 CSV 格式的数据,然后导入至 StarRocks 中。
数据集
假设 Kafka 集群的 Topic ordertest1
存在如下 CSV 格式的数据,其中 CSV 数据中列的含义依次是订单编号、支付日期、顾客姓名、国籍、性别、支付金额。
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina",Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924