CREATE ROUTINE LOAD
在本 快速入门 中尝试 Routine Load。
Routine Load 可以持续从 Apache Kafka® 消费消息并将数据导入到 StarRocks。Routine Load 可以从 Kafka 集群中消费 CSV、JSON 和 Avro(自 v3.0.1 起支持)格式的数据,并通过多种安全协议访问 Kafka,包括 plaintext、ssl、sasl_plaintext 和 sasl_ssl。
本主题介绍 CREATE ROUTINE LOAD 语句的语法、参数和示例。
- 有关 Routine Load 的应用场景、原理和基本操作的信息,请参见 使用 Routine Load 导入数据。
- 只有具有 StarRocks 表 INSERT 权限的用户才能将数据导入到 StarRocks 表中。如果您没有 INSERT 权限,请按照 GRANT 中提供的说明授予您用于连接 StarRocks 集群的用户 INSERT 权限。
语法
CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
参数
database_name, job_name, table_name
database_name
可选。StarRocks 数据库的名称。
job_name
必需。Routine Load 作业的名称。一个表可以从多个 Routine Load 作业接收数据。我们建议您使用可识别的信息(例如 Kafka 主题名称和大致的作业创建时间)设置一个有意义的 Routine Load 作业名称,以区分多个 Routine Load 作业。Routine Load 作业的名称在同一数据库中必须唯一。
table_name
必需。数据导入的 StarRocks 表的名称。
load_properties
可选。数据的属性。语法:
[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]
COLUMNS TERMINATED BY
CSV 格式数据的列分隔符。默认列分隔符是 \t(Tab)。例如,您可以使用 COLUMNS TERMINATED BY "," 将列分隔符指定为逗号。
- 确保此处指定的列分隔符与要导入的数据中的列分隔符相同。
- 您可以使用 UTF-8 字符串(如逗号(,)、制表符或管道符号(|)),其长度不超过 50 字节,作为文本分隔符。
- 空值用
\N表示。例如,一个数据记录由三列组成,数据记录在第一列和第三列中有数据,但在第二列中没有数据。在这种情况下,您需要在第二列中使用\N表示空值。这意味着记录必须编译为a,\N,b而不是a,,b。a,,b表示记录的第二列包含一个空字符串。
ROWS TERMINATED BY
CSV 格式数据的行分隔符。默认行分隔符是 \n。
COLUMNS
源数据中的列与 StarRocks 表中的列之间的映射。有关更多信息,请参见本主题中的 列映射。
column_name:如果源数据中的某列可以直接映射到 StarRocks 表中的某列,则只需指定列名。这些列可以称为映射列。column_assignment:如果源数据中的某列不能直接映射到 StarRocks 表中的某列,并且该列的值必须在数据导入之前通过函数进行计算,则必须在expr中指定计算函数。这些列可以称为派生列。建议将派生列放在映射列之后,因为 StarRocks 首先解析映射列。
WHERE
过滤条件。只有满足过滤条件的数据才能导入到 StarRocks。例如,如果您只想导入 col1 值大于 100 且 col2 值等于 1000 的行,可以使用 WHERE col1 > 100 and col2 = 1000。
过滤条件中指定的列可以是源列或派生列。
PARTITION
如果 StarRocks 表分布在分区 p0、p1、p2 和 p3 上,并且您只想将数据导入到 StarRocks 中的 p1、p2 和 p3,并过滤掉将存储在 p0 中的数据,则可以指定 PARTITION(p1, p2, p3) 作为过滤条件。默认情况下,如果您未指定此参数,数据将导入到所有分区。示例:
PARTITION (p1, p2, p3)
TEMPORARY PARTITION
要导入数据的 临时分区 的名称。您可以指定多个临时分区,必须用逗号(,)分隔。
job_properties
必需。导入作业的属性。语法:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
desired_concurrent_number
必需:否
描述:单个 Routine Load 作业的期望任务并行度。默认值:3。实际任务并行度由多个参数的最小值决定:min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)。
alive_be_number:存活的 BE 节点数。partition_number:要消费的分区数。desired_concurrent_number:单个 Routine Load 作业的期望任务并行度。默认值:3。max_routine_load_task_concurrent_num:Routine Load 作业的默认最大任务并行度,为5。请参见 FE 动态参数。
max_batch_interval
必需:否
描述:任务的调度间隔,即任务执行的频率。单位:秒。取值范围:5 ~ 60。默认值:10。建议设置大于 10 的值。如果调度时间小于 10 秒,由于导入频率过高,会生成过多的 tablet 版本。
max_batch_rows
必需:否
描述:此属性仅用于定义错误检测窗口。窗口是单个 Routine Load 任务消费的数据行数。值为 10 * max_batch_rows。默认值为 10 * 200000 = 2000000。Routine Load 任务在错误检测窗口中检测错误数据。错误数据是指 StarRocks 无法解析的数据,例如无效的 JSON 格式数据。
max_error_number
必需:否
描述:错误检测窗口内允许的最大错误数据行数。如果错误数据行数超过此值,导入作业将暂停。您可以执行 SHOW ROUTINE LOAD 并使用 ErrorLogUrls 查看错误日志。之后,您可以根据错误日志在 Kafka 中纠正错误。默认值为 0,表示不允许错误行。
注意
- 当错误数据行数过多时,最后一批任务将在导入作业暂停之前成功。也就是说,合格的数据将被导入,不合格的数据将被过 滤。如果您不想过滤太多不合格的数据行,请设置参数
max_filter_ratio。 - 错误数据行不包括 WHERE 子句过滤掉的数据行。
- 此参数与下一个参数
max_filter_ratio一起控制最大错误数据记录数。当未设置max_filter_ratio时,此参数的值生效。当设置了max_filter_ratio时,一旦错误数据记录数达到此参数或max_filter_ratio参数设置的阈值,导入作业将暂停。
max_filter_ratio
必需:否
描述:导入作业的最大错误容忍度。错误容忍度是指由于数据质量不足而被过滤掉的数据记录在所有请求导入作业的数据记录中的最大百分比。有效值:0 到 1。默认值:1(这意味着它实际上不会生效)。
建议您将其设置为 0。这样,如果检测到不合格的数据记录,导入作业将暂停,从而确保数据的正确性。
如果您想忽略不合格的数据记录,可以将此参数设置为大于 0 的值。这样,即使数据文件包含不合格的数据记录,导入作业也可以成功。
注意
- 当错误数据行数大于
max_filter_ratio时,最后一批任务将失败。这与max_error_number的效果有些不同。 - 不合格的数据记录不包括 WHERE 子句过滤掉的数据记录。
- 此参数与上一个参数
max_error_number一起控制最大错误数据记录数。当未设置此参数(与设置max_filter_ratio = 1的效果相同)时,max_error_number参数的值生效。当设置了此参数时,一旦错误数据记录数达到此参数或max_error_number参数设置的阈值,导入作业将暂停。
strict_mode
必需:否
描述:指定是否启用 strict mode。有效值:true 和 false。默认值:false。当启用严格模式时,如果导入数据中某列的值为 NULL,但目标表不允许该列的 NULL 值,则该数据行将被过滤掉。
log_rejected_record_num
必需:否
描述:指定可以记录的最大不合格数据行数。此参数自 v3.1 起支持。有效值:0、-1 和任何非零正整数。默认值:0。
- 值
0指定被过滤掉的数据行不会被记录。 - 值
-1指定所有被过滤掉的数据行都会被记录。 - 非零正整数如
n指定每个 BE 上最多可以记录n个被过滤掉的数据行。
information_schema.loads 视图中的 REJECTED_RECORD_PATH 字段返回的路径访问导入作业中被过滤掉的所有不合格数据行。