ロードによるデータの変更
StarRocks が提供する Primary Key tables を使用すると、Stream Load、Broker Load、または Routine Load ジョブを実行して StarRocks テーブルにデータ変更を加えることができます。これらのデータ変更には、挿入、更新、削除が含まれます。ただし、Primary Key tables は Spark Load または INSERT を使用したデータ変更をサポートしていません。
StarRocks は部分更新と条件付き更新もサポートしています。
You can load data into StarRocks tables only as a user who has the INSERT privilege on those StarRocks tables. If you do not have the INSERT privilege, follow the instructions provided in GRANT to grant the INSERT privilege to the user that you use to connect to your StarRocks cluster. The syntax is GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.
このトピックでは、CSV データを例にして、ロードを通じて StarRocks テーブルにデータ変更を加える方法を説明します。サポートされるデータファイル形式は、選択したロード方法によって異なります。
注意
CSV データの場合、UTF-8 文字列(カンマ(,)、タブ、パイプ(|)など)をテキスト区切り文字として使用できますが、その長さは 50 バイトを超えないようにしてください。
実装
StarRocks が提供する Primary Key tables は UPSERT および DELETE 操作をサポートしており、INSERT 操作と UPDATE 操作を区別しません。
ロードジョブを作成する際、StarRocks はジョブ作成文またはコマンドに __op という名前のフィールドを追加することをサポートしています。__op フィールドは、実行したい操作の種類を指定するために使用されます。
注意
テーブルを作成する際、そのテーブルに
__opという名前の列を追加する必要はありません。
__op フィールドの定義方法は、選択したロード方法によって異なります。
-
Stream Load を選択した場合、
columnsパラメータを使用して__opフィールドを定義します。 -
Broker Load を選択した場合、SET 句を使用して
__opフィールドを定義します。 -
Routine Load を選択した場合、
COLUMNS列を使用して__opフィールドを定義します。
__op フィールドを追加するかどうかは、行いたいデータ変更に基づいて決定できます。__op フィールドを追加しない場合、操作の種類はデフォルトで UPSERT になります。主なデータ変更シナリオは次のとおりです。
-
ロードしたいデータファイルが UPSERT 操作のみを含む場合、
__opフィールドを追加する必要はありません。 -
ロードしたいデータファイルが DELETE 操作のみを含む場合、
__opフィールドを追加し、操作の種類を DELETE と指定する必要があります。 -
ロードしたいデータファイルが UPSERT と DELETE 操作の両方を含む場合、
__opフィールドを追加し、データファイルに0または1の値を持つ列が含まれていることを確認する必要があります。値が0の場合は UPSERT 操作を示し、値が1の場合は DELETE 操作を示します。
使用上の注意
-
データファイルの各行が同じ数の列を持っていることを確認してください。
-
データ変更に関与する列には、主キー列が含まれている必要があります。
基本操作
このセクションでは、ロードを通じて StarRocks テーブルにデータ変更を加える方法の例を示します。詳細な構文とパラメータの説明については、STREAM LOAD、BROKER LOAD、および CREATE ROUTINE LOAD を参照してください。
UPSERT
ロードしたいデータファイルが UPSERT 操作のみを含む場合、__op フィールドを追加する必要はありません。
注意
__opフィールドを追加する場合:
操作の種類を UPSERT と指定できます。
__opフィールドを空のままにしておくことができます。操作の種類はデフォルトで UPSERT になります。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example1.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。101,Lily,100
102,Rose,100b.
example1.csvのデータを Kafka クラスターのtopic1に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_dbにtable1という名前の Primary Key table を作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table1にレコードを挿入します。INSERT INTO table1 VALUES
(101, 'Lily',80);
データのロード
example1.csv の id が 101 のレコードを table1 に更新し、id が 102 のレコードを table1 に挿入するためにロードジョブを実行します。
-
Stream Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次のコマンドを実行します。curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label1" \
-H "column_separator:," \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load -
__opフィールドを含めたい場合、次のコマンドを実行します。curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator:," \
-H "columns:__op ='upsert'" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
-
-
Broker Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次のコマンドを実行します。LOAD LABEL test_db.label1
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
)
WITH BROKER; -
__opフィールドを含めたい場合、次のコマンドを実行します。LOAD LABEL test_db.label2
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
set (__op = 'upsert')
)
WITH BROKER;
-
-
Routine Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次のコマンドを実行します。CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
__opフィールドを含めたい場合、次のコマンドを実行します。CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score, __op ='upsert')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
-
データのクエリ
ロードが完了したら、table1 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table1;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 101 | Lily | 100 |
| 102 | Rose | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
上記のクエリ結果に示されているように、example1.csv の id が 101 のレコードは table1 に更新され、id が 102 のレコードは table1 に挿入されています。
DELETE
ロードしたいデータファイルが DELETE 操作のみを含む場合、__op フィールドを追加し、操作の種類を DELETE と指定する必要があります。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example2.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。101,Jack,100b.
example2.csvのデータを Kafka クラスターのtopic2に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks テーブル
test_dbにtable2という名前の Primary Key table を作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table2`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table2に 2 つのレコードを挿入します。INSERT INTO table2 VALUES
(101, 'Jack', 100),
(102, 'Bob', 90);
データのロード
example2.csv の id が 101 のレコードを table2 から削除するためにロードジョブを実行します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label3" \
-H "column_separator:," \
-H "columns:__op='delete'" \
-T example2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load -
Broker Load ジョブを実行します。
LOAD LABEL test_db.label3
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example2.csv")
into table table2
columns terminated by ","
format as "csv"
set (__op = 'delete')
)
WITH BROKER; -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table2 ON table2
COLUMNS(id, name, score, __op = 'delete')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test2",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
データのクエリ
ロードが完了したら、table2 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table2;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Bob | 90 |
+------+------+-------+
1 row in set (0.00 sec)
上記のクエリ結果に示されているように、example2.csv の id が 101 のレコードは table2 から削除されています。
UPSERT と DELETE
ロードしたいデータファイルが UPSERT と DELETE 操作の両方を含む場合、__op フィールドを追加し、データファイルに 0 または 1 の値を持つ列が含まれていることを確認する必要があります。値が 0 の場合は UPSERT 操作を示し、値が 1 の場合は DELETE 操作を示します。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example3.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコア、操作の種類を順に表す 4 つの列で構成されています。101,Tom,100,1
102,Sam,70,0
103,Stan,80,0b.
example3.csvのデータを Kafka クラスターのtopic3に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_dbにtable3という名前の Primary Key table を作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table3`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);
注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b. table3 に 2 つのレコードを挿入します。
INSERT INTO table3 VALUES
(101, 'Tom', 100),
(102, 'Sam', 90);
データのロード
example3.csv の id が 101 のレコードを table3 から削除し、id が 102 のレコードを table3 に更新し、id が 103 のレコードを table3 に挿入するためにロードジョブを実行します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label4" \
-H "column_separator:," \
-H "columns: id, name, score, temp, __op = temp" \
-T example3.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load注意
上記の例では、
example3.csvの操作の種類を表す第 4 列が一時的にtempとして命名され、columnsパラメータを使用して__opフィールドがtemp列にマッピングされています。このようにして、StarRocks はexample3.csvの第 4 列の値が0または1であるかに応じて、UPSERT または DELETE 操作を実行するかどうかを決定できます。 -
Broker Load ジョブを実行します。
LOAD LABEL test_db.label4
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
(id, name, score, temp)
set (__op=temp)
)
WITH BROKER; -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table3 ON table3
COLUMNS(id, name, score, temp, __op = temp)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test3",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
データのクエリ
ロードが完了したら、table3 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table3;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Sam | 70 |
| 103 | Stan | 80 |
+------+------+-------+
2 rows in set (0.01 sec)
上記のクエリ結果に示されているように、example3.csv の id が 101 のレコードは table3 から削除され、id が 102 のレコードは table3 に更新され、id が 103 のレコードは table3 に挿入されています。
部分更新
Primary Key tables は部分更新もサポートしており、異なるデータ更新シナリオに対して行モードと列モードの 2 つの部分更新モードを提供します。これらの 2 つの部分更新モードは、クエリパフォーマンスを保証しながら、可能な限り部分更新のオーバーヘッドを最小限に抑え、リアルタイム更新を実現します。行モードは、多くの列と小さなバッチを含むリアルタイム更新シナリオに適しています。列モードは、少数の列と多数の行を含むバッチ処理更新シナリオに適しています。
注意
部分更新を実行する際、更新対象の行が存在しない場合、StarRocks は新しい行を挿入し、データ更新が挿入されていないフィールドにはデフォルト値を入力します。
このセクションでは、CSV を例にして部分更新を実行する方法を説明します。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example4.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID とユーザー名を順に表す 2 つの列で構成されています。101,Lily
102,Rose
103,Aliceb.
example4.csvのデータを Kafka クラスターのtopic4に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_dbにtable4という名前の Primary Key table を作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table4`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table4にレコードを挿入します。INSERT INTO table4 VALUES
(101, 'Tom',80);
データのロード
example4.csv の 2 つの列のデータを table4 の id 列と name 列に更新するためにロードを実行します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label7" -H "column_separator:," \
-H "partial_update:true" \
-H "columns:id,name" \
-T example4.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table4/_stream_load注意
Stream Load を選択する場合、部分更新機能を有効にするために
partial_updateパラメータをtrueに設定する必要があります。デフォルトは行モードでの部分更新です。列モードで部分更新を行う必要がある場合は、partial_update_modeをcolumnに設定する必要があります。さらに、更新したい列を指定するためにcolumnsパラメータを使用する必要があります。 -
Broker Load ジョブを実行します。
LOAD LABEL test_db.table4
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example4.csv")
into table table4
format as "csv"
(id, name)
)
WITH BROKER
PROPERTIES
(
"partial_update" = "true"
);注意
Broker Load を選択する場合、部分更新機能を有効にするために
partial_updateパラメータをtrueに設定する必要があります。デフォルトは行モードでの部分更新です。列モードで部分更新を行う必要がある場合は、partial_update_modeをcolumnに設定する必要があります。さらに、更新したい列を指定するためにcolumn_listパラメータを使用する必要があります。 -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table4 on table4
COLUMNS (id, name),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"partial_update" = "true"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test4",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);注意
- Routine Load を選択する場合、部分更新機能を有効にするために
partial_updateパラメータをtrueに設定する必要があります。さらに、更新したい列を指定するためにCOLUMNSパラメータを使用する必要があります。 - Routine Load は行モードでの部分更新のみをサポートし、列モードでの部分更新はサポートしていません。
- Routine Load を選択する場合、部分更新機能を有効にするために
データのクエリ
ロードが完了したら、table4 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table4;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 102 | Rose | 0 |
| 101 | Lily | 80 |
| 103 | Alice | 0 |
+------+-------+-------+
3 rows in set (0.01 sec)
上記のクエリ結果に示されているように、example4.csv の id が 101 のレコードは table4 に更新され、id が 102 と 103 のレコードは table4 に挿入されています。
条件付き更新
StarRocks v2.5 以降、Primary Key tables は条件付き更新をサポートしています。非主キー列を条件として指定し、更新が有効になるかどうかを決定できます。このようにして、ソースレコードから宛先レコードへの更新は、指定された列でソースデータレコードが宛先データレコードよりも大きいか等しい値を持つ場合にのみ有効になります。
条件付き更新機能はデータの順序の乱れを解決するために設計されています。ソースデータが順序が乱れている場合、この機能を使用して新しいデータが古いデータによって上書きされないようにすることができます。
注意
- 同じバッチのデータに対して異なる列を更新条件として指定することはできません。
- DELETE 操作は条件付き更新をサポートしていません。
- バージョン v3.1.3 より前のバージョンでは、部分更新と条件付き更新を同時に使用することはできません。v3.1.3 以降、StarRocks は部分更新と条件付き更新を同時に使用することをサポートしています。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example5.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、バージョン、ユーザースコアを順に表す 3 つの列で構成されています。101,1,100
102,3,100b.
example5.csvのデータを Kafka クラスターのtopic5に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_dbにtable5という名前の Primary Key table を作成します。このテーブルは、id、version、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table5`
(
`id` int(11) NOT NULL COMMENT "user ID",
`version` int NOT NULL COMMENT "version",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table5にレコードを挿入します。INSERT INTO table5 VALUES
(101, 2, 80),
(102, 2, 90);
データのロード
example5.csv から id が 101 と 102 のレコードを table5 に更新し、それぞれのレコードの version 値が現在の version 値以上の場合にのみ更新が有効になるように指定します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label10" \
-H "column_separator:," \
-H "merge_condition:version" \
-T example5.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table5/_stream_load -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table5 on table5
COLUMNS (id, version, score),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"merge_condition" = "version"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic5",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
Broker Load ジョブを実行します。
LOAD LABEL test_db.table5
( DATA INFILE ("s3://xxx.csv")
INTO TABLE table5 COLUMNS TERMINATED BY "," FORMAT AS "CSV"
)
WITH BROKER
PROPERTIES
(
"merge_condition" = "version"
);
データのクエリ
ロードが完了したら、table5 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table5;
+------+------+-------+
| id | version | score |
+------+------+-------+
| 101 | 2 | 80 |
| 102 | 3 | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
上記のクエリ結果に示されているように、example5.csv の id が 101 のレコードは table5 に更新されておらず、id が 102 のレコードは table5 に挿入されています。