CREATE ROUTINE LOAD
在本 快速入门 中尝试 Routine Load。
Routine Load 可以持续从 Apache Kafka® 消费消息并将数据导入到 StarRocks。Routine Load 可以从 Kafka 集群中消费 CSV、JSON 和 Avro(自 v3.0.1 起支持)格式的数据,并通过多种安全协议访问 Kafka,包括 plaintext、ssl、sasl_plaintext 和 sasl_ssl。
本主题介绍 CREATE ROUTINE LOAD 语句的语法、参数和示例。
- 有关 Routine Load 的应用场景、原理和基本操作的信息,请参见 使用 Routine Load 导入数据。
- 只有具有 StarRocks 表 INSERT 权限的用户才能将数据导入到 StarRocks 表中。如果您没有 INSERT 权限,请按照 GRANT 中提供的说明授予您用于连接 StarRocks 集群的用户 INSERT 权限。
语法
CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
参数
database_name, job_name, table_name
database_name
可选。StarRocks 数据库的名称。
job_name
必需。Routine Load 作业的名称。一个表可以从多个 Routine Load 作业接收数据。我们建议您使用可识别的信息(例如 Kafka 主题名称和大致的作业创建时间)设置一个有意义的 Routine Load 作业名称,以区分多个 Routine Load 作业。Routine Load 作业的名称在同一数据库中必须唯一。
table_name
必需。数据导入的 StarRocks 表的名称。
load_properties
可选。数据的属性。语法:
[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]
COLUMNS TERMINATED BY
CSV 格式数据的列分隔符。默认列分隔符是 \t(Tab)。例如,您可以使用 COLUMNS TERMINATED BY "," 将列分隔符指定为逗号。
- 确保此处指定的列分隔符与要导入的数据中的列分隔符相同。
- 您可以使用 UTF-8 字符串(如逗号(,)、制表符或管道符号(|)),其长度不超过 50 字节,作为文本分隔符。
- 空值用
\N表示。例如,一个数据记录由三列组成,数据记录在第一列和第三列中有数据,但在第二列中没有数据。在这种情况下,您需要在第二列中使用\N表示空值。这意味着记录必须编译为a,\N,b而不是a,,b。a,,b表示记录的第二列包含一个空字符串。
ROWS TERMINATED BY
CSV 格式数据的行分隔符。默认行分隔符是 \n。
COLUMNS
源数据中的列与 StarRocks 表中的列之间的映射。有关更多信息,请参见本主题中的 列映射。
column_name:如果源数据中的某列可以直接映射到 StarRocks 表中的某列,则只需指定列名。这些列可以称为映射列。column_assignment:如果源数据中的某列不能直接映射到 StarRocks 表中的某列,并且该列的值必须在数据导入之前通过函数进行计算,则必须在expr中指定计算函数。这些列可以称为派生列。建议将派生列放在映射列之后,因为 StarRocks 首先解析映射列。
WHERE
过滤条件。只有满足过滤条件的数据才能导入到 StarRocks。例如,如果您只想导入 col1 值大于 100 且 col2 值等于 1000 的行,可以使用 WHERE col1 > 100 and col2 = 1000。
过滤条件中指定的列可以是源列或派生列。
PARTITION
如果 StarRocks 表分布在分区 p0、p1、p2 和 p3 上,并且您只想将数据导入到 StarRocks 中的 p1、p2 和 p3,并过滤掉将存储在 p0 中的数据,则可以指定 PARTITION(p1, p2, p3) 作为过滤条件。默认情况下,如果您未指定此参数,数据将导入到所有分区。示例:
PARTITION (p1, p2, p3)
TEMPORARY PARTITION
要导入数据的 临时分区 的名称。您可以指定多个临时分区,必须用逗号(,)分隔。
job_properties
必需。导入作业的属性。语法:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
desired_concurrent_number
必需:否
描述:单个 Routine Load 作业的期望任务并行度。默认值:3。实际任务并行度由多个参数的最小值决定:min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)。
alive_be_number:存活的 BE 节点数。partition_number:要消费的分区数。desired_concurrent_number:单个 Routine Load 作业的期望任务并行度。默认值:3。max_routine_load_task_concurrent_num:Routine Load 作业的默认最大任务并行度,为5。请参见 FE 动态参数。