通过导入实现数据变更
StarRocks 的主键表支持通过 Stream Load、Broker Load 或 Routine Load 导入作业,对 StarRocks 表进行数据变更,包括插入、更新和删除数据。不支持通过 Spark Load 导入作业或 INSERT 语句对 StarRocks 表进行数据变更。
StarRocks 还支持部分更新 (Partial Update) 和条件更新 (Conditional Update)。
注意
导入操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权。
本文以 CSV 格式的数据文件为例介绍如何通过导入实现数据变更。具体支持的数据文件类型,跟您选择的导入方式有关。
说明
对于 CSV 格式的数据,StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符,包括常见的逗号 (,)、Tab 和 Pipe (|)。
内部实现
StarRocks 的主键表目前支持 UPSERT 和 DELETE 操作,不支持区分 INSERT 和 UPDATE 操作。
在创建导入作业时,StarRocks 支持在导入作业的创建语句或命令中添加 __op 字段,用于指定操作类型。
说明
不需要在创建 StarRocks 表时添加
__op列。
不同的导入方式,定义 __op 字段的方法也不相同:
-
如果使用 Stream Load 导入方式,需要通过
columns参数来定义__op字段。 -
如果使用 Broker Load 导入方式,需要通过 SET 子句来定义
__op字段。 -
如果使用 Routine Load 导入方式,需要通过
COLUMNS参数来定义__op字段。
根据要做的数据变更操作,您可以选择添加或者不添加 __op 字段。不添加 __op 字段的话,默认为 UPSERT 操作。主要涉及的数据变更操作场景如下:
-
当数据文件只涉及 UPSERT 操作时,可以不添加
__op字段。 -
当数据文件只涉及 DELETE 操作时,必须添加
__op字段,并且指定操作类型为 DELETE。 -
当数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加
__op字段,并且确保数据文件中包含一个代表操作类型的列,取值为0或1。其中,取值为0时代表 UPSERT 操作,取值为1时代表 DELETE 操作。
使用说明
-
必须确保待导入的数据文件中每一行的列数都相同。
-
所更新的列必须包含主键列。
前提条件
Broker Load
Routine Load
如果使用 Routine Load 导入数据,必须确保您的 Apache Kafka® 集群已创建 Topic。本文假设您已部署四个 Topic,分别为 topic1、topic2、topic3 和 topic4。
基本操作
下面通过几个示例来展示具体的导入操作。有关使用 Stream Load、Broker Load 和 Routine Load 导入数据的详细语法和参数介绍,请参见 STREAM LOAD、BROKER LOAD 和 CREATE ROUTINE LOAD。
UPSERT
当数据文件只涉及 UPSERT 操作时,可以不添加 __op 字段。
如果您添加 __op 字段:
-
可以指定
__op为 UPSERT 操作。 -
也可以不做任何指定,StarRocks 默认导入为 UPSERT 操作。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example1.csv。文件包含三列,分别代表用户 ID、用户姓名和用户得分,如下所示:101,Lily,100
102,Rose,100b. 把
example1.csv文件中的数据上传到 Kafka 集群的topic1中。 -
准备 StarRocks 表。
a. 在数据库
test_db中创建一张名为table1的主键表。表包含id、name和score三列,分别代表用户 ID、用户名称和用户得分,主键为id列,如下所示:CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table1表中插入一条数据,如下所示:INSERT INTO table1 VALUES
(101, 'Lily',80);
导入数据
通过导入,把 example1.csv 文件中 id 为 101 的数据更新到 table1 表中,并且把 example1.csv 文件中 id 为 102 的数据插入到 table1 表中。
-
通过 Stream Load 导入:
-
不添加
__op字段:curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label1" \
-H "column_separator:," \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load -
添加
__op字段:curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator:," \
-H "columns:__op ='upsert'" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
-
-
通过 Broker Load 导入:
-
不添加
__op字段:LOAD LABEL test_db.label1
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
)
WITH BROKER; -
添加
__op字段:LOAD LABEL test_db.label2
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
set (__op = 'upsert')
)
WITH BROKER;
-
-
通过 Routine Load 导入:
-
不添加
__op字段:CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
添加
__op字段:CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score, __op ='upsert')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
-
查询数据
导入完成后,查询 table1 表的数据,如下所示:
SELECT * FROM table1;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 101 | Lily | 100 |
| 102 | Rose | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
从查询结果可以看到,example1.csv 文件中 id 为 101 的数据已经更新到 table1 表中,并且 example1.csv 文件中 id 为 102 的数据已经插入到 table1 表中。
DELETE
当数据文件只涉及 DELETE 操作时,必须添加 __op 字段,并且指定操作类型为 DELETE。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example2.csv。文件包含三列,分别代表用户 ID、用户姓名和用户得分,如下所示:101,Jack,100b. 把
example2.csv文件中的数据上传到 Kafka 集群的topic2中。 -
准备 StarRocks 表。
a. 在数据库
test_db中创建一张名为table2的主键表。表包含id、name和score三列,分别代表用户 ID、用户名称和用户得分,主键为id列,如下所示:CREATE TABLE `table2`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table2表中插入数据,如下所示:INSERT INTO table2 VALUES
(101, 'Jack', 100),
(102, 'Bob', 90);
导入数据
通过导入,把 example2.csv 文件中 id 为 101 的数据从 table2 表中删除。
-
通过 Stream Load 导入:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label3" \
-H "column_separator:," \
-H "columns:__op='delete'" \
-T example2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load -
通过 Broker Load 导入:
LOAD LABEL test_db.label3
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example2.csv")
into table table2
columns terminated by ","
format as "csv"
set (__op = 'delete')
)
WITH BROKER; -
通过 Routine Load 导入:
CREATE ROUTINE LOAD test_db.table2 ON table2
COLUMNS(id, name, score, __op = 'delete')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test2",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
查询数据
导入完成后,查询 table2 表的数据,如下所示:
SELECT * FROM table2;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Bob | 90 |
+------+------+-------+
1 row in set (0.00 sec)
从查询结果可以看到,example2.csv 文件中 id 为 101 的数据已经从 table2 表中删除。
UPSERT 和 DELETE
当数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加 __op 字段,并且确保数据文件中包含一个代表操作类型的列,取值为 0 或 1。其中,取值为 0 时代表 UPSERT 操作,取值为 1 时代表 DELETE 操作。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example3.csv。文件包含四列,分别代表用户 ID、用户姓名、用户得分和操作类型,如下所示:101,Tom,100,1
102,Sam,70,0
103,Stan,80,0b. 把
example3.csv文件中的数据上传到 Kafka 集群的topic3中。 -
准备 StarRocks 表。
a. 在数据库
test_db中创建一张名为table3的主键表。表包含id、name和score三列,分别代表用户 ID、用户名称和用户得分,主键为id列,如下所示:CREATE TABLE `table3`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table3表中插入数据,如下所示:INSERT INTO table3 VALUES
(101, 'Tom', 100),
(102, 'Sam', 90);
导入数据
通过导入,把 example3.csv 文件中 id 为 101 的数据从 table3 表中删除,把 example3.csv 文件中 id 为 102 的数据更新到 table3 表,并且把 example3.csv 文件中 id 为 103 的数据插入到 table3 表:
-
通过 Stream Load 导入:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label4" \
-H "column_separator:," \
-H "columns: id, name, score, temp, __op = temp" \
-T example3.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load说明
上述示例中,通过
columns参数把example3.csv文件中代表组别代码的第四列临时命名为temp,然后定义__op字段等于临时命名的temp列。这样,StarRocks 可以根据example3.csv文件中第四列的取值是0还是1来确定执行 UPSERT 还是 DELETE 操作。 -
通过 Broker Load 导入:
LOAD LABEL test_db.label4
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
(id, name, score, temp)
set (__op=temp)
)
WITH BROKER; -
通过 Routine Load 导入:
CREATE ROUTINE LOAD test_db.table3 ON table3
COLUMNS(id, name, score, temp, __op = temp)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test3",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
查询数据
导入完成后,查询 table3 表的数据,如下所示:
SELECT * FROM table3;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Sam | 70 |
| 103 | Stan | 80 |
+------+------+-------+
2 rows in set (0.01 sec)
从查询结果可以看到,example3.csv 文件中 id 为 101 的数据已经从 table3 表中删除,example3.csv 文件中 id 为 102 的数据已经更新到 table3 表中,并且 example3.csv 文件中 id 为 103 的数据已经插入到 table3 表中。
部分更新
主键表还支持部分列更新(Partial Updates),并且针对不同的数据更新场景,提供了行模式和列模式两种部分列更新,在不影响查询性能的同时,尽可能地降低部分更新的开销,从而能够保证更新的实时性。行模式比较适用于较多列且小批量的实时更新场景。列模式适用于少数列并且大量行的批处理更新场景。
注意
部分更新时,如果要更新的行不存在,那么 StarRocks 会插入新的一行,并自动对缺失的列填充默认值。如果没有定义默认值,则自动填充
0。
如下以 CSV 格式的数据文件为例进行说明。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example4.csv。文件包含两列,分别代表用户 ID 和用户姓名,如下所示:101,Lily
102,Rose
103,Aliceb. 把
example4.csv文件中的数据上传到 Kafka 集群的topic4中。 -
准备 StarRocks 表。
a. 在数据库
test_db中创建一张名为table4的主键表。表包含id、name和score三列,分别代表用户 ID、用户名称和用户得分,主键为id列,如下所示:CREATE TABLE `table4`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table4表中插入一条数据,如下所示:INSERT INTO table4 VALUES
(101, 'Tom',80);
导入数据
通过导入,把 example4.csv 里的两列数据更新到 table4 表的 id 和 name 两列。
-
通过 Stream Load 导入:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label7" -H "column_separator:," \
-H "partial_update:true" \
-H "columns:id,name" \
-T example4.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table4/_stream_load说明
使用 Stream Load 导入数据时,需要设置
partial_update为true,以开启部分更新特性,默认为行模式部分更新,如果需要使用列模式部分更新,则需要设置partial_update_mode为column。另外,还需要在columns中声明待更新数据的列的名称。 -
通过 Broker Load 导入:
LOAD LABEL test_db.table4
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example4.csv")
into table table4
format as "csv"
(id, name)
)
WITH BROKER
PROPERTIES
(
"partial_update" = "true"
);说明
使用 Broker Load 导入数据时,需要设置
partial_update为true,以开启部分更新特性,默认为行模式部分更新,如果需要使用列模式部分更新,则需要设置partial_update_mode为column。另外,还需要在column_list中声明待更新数据的列的名称。 -
通过 Routine Load 导入:
CREATE ROUTINE LOAD test_db.table4 on table4
COLUMNS (id, name),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"partial_update" = "true"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test4",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);说明
- 使用 Routine Load 导入数据时,需要设置
partial_update为true,以开启部分更新特性。另外,还需要在COLUMNS中声明待更新数据的列的名称。 - Routine Load 仅支持行模式部分更新,不支持列模式部分更新。
- 使用 Routine Load 导入数据时,需要设置
查询数据
导入完成后,查询 table4 表的数据,如下所示:
SELECT * FROM table4;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 102 | Rose | 0 |
| 101 | Lily | 80 |
| 103 | Alice | 0 |
+------+-------+-------+
3 rows in set (0.01 sec)
从查询结果可以看到,example4.csv 文件中 id 为 101 的数据已经更新到 table4 表中,并且 example4.csv 文件中 id 为 102 和 103 的数据已经插入到 table4 表中。
条件更新
自 StarRocks v2.5 起,主键表支持条件更新 (Conditional Update)。您可以指定某一非主键列为更新条件,这样只有当导入的数据中该列的值大于等于当前值的时候,更新才会生效。
条件更新功能用于解决数据乱序的问题。如果上游数据发生乱序,可以使用条件更新功能保证新的数据不被老的数据覆盖。
说明
不支持给同一批导入的数据指定不同的条件。
不支持删除操作。
在 3.1.3 版本及以前,StarRocks 不支持条件更新同部分更新一并使用。自 3.1.3 版本起,StarRocks 才支持条件更新同部分更新一并使用。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example5.csv。文件包含三列,分别代表用户 ID、版本号和用户得分,如下所示:101,1,100
102,3,100b. 把
example5.csv文件中的数据上传到 Kafka 集群的topic5中。 -
准备 StarRocks 表。
a. 在数据库
test_db中创建一张名为table5的主键表。表包含id、version和score三列,分别代表用户 ID、版本号和用户得分,主键为id列,如下所示:CREATE TABLE `table5`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`version` int NOT NULL COMMENT "版本号",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table5表中插入两条数据,如下所示:INSERT INTO table5 VALUES
(101, 2, 80),
(102, 2, 90);
导入数据
通过导入,把 example5.csv 文件中 id 为 101、102 的数据更新到 table5 表中,指定 merge_condition 为 version 列,表示只有当导入的数据中 version 大于等于 table5 中对应行的version 值时,更新才会生效。
-
通过 Stream Load 导入:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label10" \
-H "column_separator:," \
-H "merge_condition:version" \
-T example5.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table5/_stream_load -
通过 Routine Load 导入:
CREATE ROUTINE LOAD test_db.table5 on table5
COLUMNS (id, version, score),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"merge_condition" = "version"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic5",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
通过 Broker Load 导入:
LOAD LABEL test_db.table5
( DATA INFILE ("s3://xxx.csv")
INTO TABLE table5 COLUMNS TERMINATED BY "," FORMAT AS "CSV"
)
WITH BROKER
PROPERTIES
(
"merge_condition" = "version"
);
查询数据
导入完成后,查询 table5 表的数据,如下所示:
SELECT * FROM table5;
+------+------+-------+
| id | version | score |
+------+------+-------+
| 101 | 2 | 80 |
| 102 | 3 | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
从查询结果可以看到,example5.csv 文件中 id 为 101 的数据并没有被更新,而 id 为 102 已经被更新。