ロードによるデータの変更
StarRocks が提供する Primary Key tables を使用すると、Stream Load、Broker Load、または Routine Load ジョブを実行して StarRocks テーブルにデータ変更を加えることができます。これらのデータ変更には、挿入、更新、削除が含まれます。ただし、Primary Key tables は Spark Load または INSERT を使用したデータ変更をサポートしていません。
StarRocks は部分更新と条件付き更新もサポートしています。
You can load data into StarRocks tables only as a user who has the INSERT privilege on those StarRocks tables. If you do not have the INSERT privilege, follow the instructions provided in GRANT to grant the INSERT privilege to the user that you use to connect to your StarRocks cluster. The syntax is GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}.
このトピックでは、CSV データを例にして、ロードを通じて StarRocks テーブルにデータ変更を加える方法を説明します。サポートされるデータファイル形式は、選択したロード方法によって異なります。
注意
CSV データの場合、UTF-8 文字列(カンマ(,)、タブ、パイプ(|)など)をテキスト区切り文字として使用できますが、その長さは 50 バイトを超えないようにしてください。
実装
StarRocks が提供する Primary Key tables は UPSERT および DELETE 操作をサポートしており、INSERT 操作と UPDATE 操作を区別しません。
ロードジョブを作成する際、StarRocks はジョブ作成文またはコマンドに __op という名前のフィールドを追加することをサポートしています。__op フィールドは、実行したい操作の種 類を指定するために使用されます。
注意
テーブルを作成する際、そのテーブルに
__opという名前の列を追加する必要はありません。
__op フィールドの定義方法は、選択したロード方法によって異なります。
-
Stream Load を選択した場合、
columnsパラメータを使用して__opフィールドを定義します。 -
Broker Load を選択した場合、SET 句を使用して
__opフィールドを定義します。 -
Routine Load を選択した場合、
COLUMNS列を使用して__opフィールドを定義します。
__op フィールドを追加するかどうかは、行いたいデータ変更に基づいて決定できます。__op フィールドを追加しない場合、操作の種類はデフォルトで UPSERT になります。主なデータ変更シナリオは次のとおりです。
-
ロードしたいデータファイルが UPSERT 操作のみを含む場合、
__opフィールドを追加する必要はありません。 -
ロードしたいデータファイルが DELETE 操作のみを含む場合、
__opフィールドを追加し、操作の種類を DELETE と指定する必要があります。 -
ロードしたいデータファイルが UPSERT と DELETE 操作の両方を含む場合、
__opフィールドを追加し、データファイルに0または1の値を持つ列が含まれていることを確認する必要があります。値が0の場合は UPSERT 操作を示し、値が1の場合は DELETE 操作を示します。
使用上の注意
-
データファイルの各行が同じ数の列を持っていることを確認してください。
-
データ変更に関与する列には、主キー列が含まれている必要があります。
基本操作
このセクションでは、ロードを通じて StarRocks テーブルにデータ変更を加える方法の例を示します。詳細な構文とパラメータの説明については、STREAM LOAD、BROKER LOAD、および CREATE ROUTINE LOAD を参照してください。
UPSERT
ロードしたいデータファイルが UPSERT 操作のみを含む場合、__op フィールドを追加する必要はありません。
注意
__opフィールドを追加する場合:
操作の種類を UPSERT と指定できます。
__opフィールドを空のままにしておくことができます。操作の種類 はデフォルトで UPSERT になります。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example1.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。101,Lily,100
102,Rose,100b.
example1.csvのデータを Kafka クラスターのtopic1に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_dbにtable1という名前の Primary Key table を作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table1にレコードを挿入します。INSERT INTO table1 VALUES
(101, 'Lily',80);
データのロード
example1.csv の id が 101 のレコードを table1 に更新し、id が 102 のレコードを table1 に挿入するためにロードジョブを実行します。
-
Stream Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次 のコマンドを実行します。curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label1" \
-H "column_separator:," \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load -
__opフィールドを含めたい場合、次のコマンドを実行します。curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator:," \
-H "columns:__op ='upsert'" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
-
-
Broker Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次のコマンドを実行します。LOAD LABEL test_db.label1
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
)
WITH BROKER; -
__opフィールドを含めたい場合、次のコマンドを実行します。LOAD LABEL test_db.label2
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
set (__op = 'upsert')
)
WITH BROKER;
-
-
Routine Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次のコマンドを実行します。CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
__opフィールドを含めたい場合 、次のコマンドを実行します。CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score, __op ='upsert')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
-
データのクエリ
ロードが完了したら、table1 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table1;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 101 | Lily | 100 |
| 102 | Rose | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
上記のクエリ結果に示されているように、example1.csv の id が 101 のレコードは table1 に更新され、id が 102 のレコードは table1 に挿入されています。
DELETE
ロードしたいデータファイルが DELETE 操作のみを含む場合、__op フィールドを追加し、操作の種類を DELETE と指定する必要があります。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example2.csvという名前の CSV ファイルを 作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。101,Jack,100b.
example2.csvのデータを Kafka クラスターのtopic2に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks テーブル
test_dbにtable2という名前の Primary Key table を作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table2`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table2に 2 つのレコードを挿入します。INSERT INTO table2 VALUES
(101, 'Jack', 100),
(102, 'Bob', 90);
データのロード
example2.csv の id が 101 のレコードを table2 から削除するためにロードジョブを実行します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label3" \
-H "column_separator:," \
-H "columns:__op='delete'" \
-T example2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load -
Broker Load ジョブを実行します。
LOAD LABEL test_db.label3
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example2.csv")
into table table2
columns terminated by ","
format as "csv"
set (__op = 'delete')
)
WITH BROKER; -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table2 ON table2
COLUMNS(id, name, score, __op = 'delete')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test2",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
データのクエリ
ロードが完了したら、table2 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table2;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Bob | 90 |
+------+------+-------+
1 row in set (0.00 sec)
上記のクエリ結果に示されているように、example2.csv の id が 101 のレコードは table2 から削除されています。
UPSERT と DELETE
ロードしたいデータファイルが UPSERT と DELETE 操作の両方を含む場合、__op フィールドを追加し、データファイルに 0 または 1 の値を持つ列が含まれていることを確認する必要があります。値が 0 の場合は UPSERT 操作を示し、値が 1 の場合は DELETE 操作を示します。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example3.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコア、操作の種類を順に表す 4 つの列で構成されています。101,Tom,100,1
102,Sam,70,0
103,Stan,80,0b.
example3.csvのデータを Kafka クラスターのtopic3に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_dbにtable3という名前の Primary Key table を作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table3`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);
注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b. table3 に 2 つのレコードを挿入します。
INSERT INTO table3 VALUES
(101, 'Tom', 100),
(102, 'Sam', 90);