メインコンテンツまでスキップ
バージョン: 3.1

ロードによるデータ変更

StarRocks が提供する 主キーテーブル を使用すると、Stream LoadBroker Load、または Routine Load ジョブを実行して StarRocks テーブルにデータ変更を加えることができます。これらのデータ変更には、挿入、更新、および削除が含まれます。ただし、主キーテーブルは Spark Load または INSERT を使用したデータ変更をサポートしていません。

StarRocks は部分更新と条件付き更新もサポートしています。

StarRocks テーブルにデータを ロード できるのは、これらの StarRocks テーブルに対して INSERT 権限を持つユーザーのみです。INSERT 権限を持っていない場合は、 GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。

このトピックでは、CSV データを例にして、ロードを通じて StarRocks テーブルにデータ変更を加える方法を説明します。サポートされるデータファイル形式は、選択したロード方法によって異なります。

注意

CSV データの場合、UTF-8 文字列(カンマ (,) 、タブ、またはパイプ (|) など)をテキスト区切り文字として使用できますが、その長さは 50 バイトを超えないようにしてください。

実装

StarRocks が提供する主キーテーブルは、UPSERT および DELETE 操作をサポートしており、INSERT 操作と UPDATE 操作を区別しません。

ロードジョブを作成する際、StarRocks は __op というフィールドをジョブ作成ステートメントまたはコマンドに追加することをサポートしています。__op フィールドは、実行したい操作の種類を指定するために使用されます。

注意

テーブルを作成する際、__op という名前のカラムをそのテーブルに追加する必要はありません。

__op フィールドの定義方法は、選択したロード方法によって異なります。

  • Stream Load を選択した場合、columns パラメータを使用して __op フィールドを定義します。

  • Broker Load を選択した場合、SET 句を使用して __op フィールドを定義します。

  • Routine Load を選択した場合、COLUMNS カラムを使用して __op フィールドを定義します。

データ変更に基づいて __op フィールドを追加するかどうかを決定できます。__op フィールドを追加しない場合、操作の種類はデフォルトで UPSERT になります。主なデータ変更シナリオは次のとおりです。

  • ロードしたいデータファイルが UPSERT 操作のみを含む場合、__op フィールドを追加する必要はありません。

  • ロードしたいデータファイルが DELETE 操作のみを含む場合、__op フィールドを追加し、操作の種類を DELETE と指定する必要があります。

  • ロードしたいデータファイルが UPSERT と DELETE 操作の両方を含む場合、__op フィールドを追加し、データファイルに 0 または 1 の値を持つカラムが含まれていることを確認する必要があります。値が 0 の場合は UPSERT 操作を示し、値が 1 の場合は DELETE 操作を示します。

使用上の注意

  • データファイルの各行が同じ数のカラムを持っていることを確認してください。

  • データ変更に関わるカラムには、主キーのカラムが含まれている必要があります。

基本操作

このセクションでは、ロードを通じて StarRocks テーブルにデータ変更を加える方法の例を示します。詳細な構文とパラメータの説明については、STREAM LOADBROKER LOAD、および CREATE ROUTINE LOAD を参照してください。

UPSERT

ロードしたいデータファイルが UPSERT 操作のみを含む場合、__op フィールドを追加する必要はありません。

注意

__op フィールドを追加する場合:

  • 操作の種類を UPSERT と指定できます。
  • __op フィールドを空のままにしておくことができます。操作の種類はデフォルトで UPSERT になります。

データ例

  1. データファイルを準備します。

    a. ローカルファイルシステムに example1.csv という名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つのカラムで構成されています。

    101,Lily,100
    102,Rose,100

    b. example1.csv のデータを Kafka クラスターの topic1 に公開します。

  2. StarRocks テーブルを準備します。

    a. StarRocks データベース test_dbtable1 という名前の主キーテーブルを作成します。このテーブルは、idnamescore の 3 つのカラムで構成されており、id が主キーです。

    CREATE TABLE `table1`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NOT NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際に、バケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細情報については、バケット数の決定 を参照してください。

    b. table1 にレコードを挿入します。

    INSERT INTO table1 VALUES
    (101, 'Lily',80);

データのロード

example1.csvid101 のレコードを table1 に更新し、example1.csvid102 のレコードを table1 に挿入するためにロードジョブを実行します。

  • Stream Load ジョブを実行します。

    • __op フィールドを含めたくない場合、次のコマンドを実行します。

      curl --location-trusted -u <username>:<password> \
      -H "Expect:100-continue" \
      -H "label:label1" \
      -H "column_separator:," \
      -T example1.csv -XPUT \
      http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
    • __op フィールドを含めたい場合、次のコマンドを実行します。

      curl --location-trusted -u <username>:<password> \
      -H "Expect:100-continue" \
      -H "label:label2" \
      -H "column_separator:," \
      -H "columns:__op ='upsert'" \
      -T example1.csv -XPUT \
      http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
  • Broker Load ジョブを実行します。

    • __op フィールドを含めたくない場合、次のコマンドを実行します。

      LOAD LABEL test_db.label1
      (
      data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
      into table table1
      columns terminated by ","
      format as "csv"
      )
      WITH BROKER;
    • __op フィールドを含めたい場合、次のコマンドを実行します。

      LOAD LABEL test_db.label2
      (
      data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
      into table table1
      columns terminated by ","
      format as "csv"
      set (__op = 'upsert')
      )
      WITH BROKER;
  • Routine Load ジョブを実行します。

    • __op フィールドを含めたくない場合、次のコマンドを実行します。

      CREATE ROUTINE LOAD test_db.table1 ON table1
      COLUMNS TERMINATED BY ",",
      COLUMNS (id, name, score)
      PROPERTIES
      (
      "desired_concurrent_number" = "3",
      "max_batch_interval" = "20",
      "max_batch_rows"= "250000",
      "max_error_number" = "1000"
      )
      FROM KAFKA
      (
      "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
      "kafka_topic" = "test1",
      "property.kafka_default_offsets" ="OFFSET_BEGINNING"
      );
    • __op フィールドを含めたい場合、次のコマンドを実行します。

      CREATE ROUTINE LOAD test_db.table1 ON table1
      COLUMNS TERMINATED BY ",",
      COLUMNS (id, name, score, __op ='upsert')
      PROPERTIES
      (
      "desired_concurrent_number" = "3",
      "max_batch_interval" = "20",
      "max_batch_rows"= "250000",
      "max_error_number" = "1000"
      )
      FROM KAFKA
      (
      "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
      "kafka_topic" = "test1",
      "property.kafka_default_offsets" ="OFFSET_BEGINNING"
      );

データのクエリ

ロードが完了したら、table1 のデータをクエリしてロードが成功したことを確認します。

SELECT * FROM table1;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 101 | Lily | 100 |
| 102 | Rose | 100 |
+------+------+-------+
2 rows in set (0.02 sec)

上記のクエリ結果に示されているように、example1.csvid101 のレコードは table1 に更新され、example1.csvid102 のレコードは table1 に挿入されました。

DELETE

ロードしたいデータファイルが DELETE 操作のみを含む場合、__op フィールドを追加し、操作の種類を DELETE と指定する必要があります。

データ例

  1. データファイルを準備します。

    a. ローカルファイルシステムに example2.csv という名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つのカラムで構成されています。

    101,Jack,100

    b. example2.csv のデータを Kafka クラスターの topic2 に公開します。

  2. StarRocks テーブルを準備します。

    a. StarRocks テーブル test_dbtable2 という名前の主キーテーブルを作成します。このテーブルは、idnamescore の 3 つのカラムで構成されており、id が主キーです。

    CREATE TABLE `table2`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NOT NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際に、バケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細情報については、バケット数の決定 を参照してください。

    b. table2 に 2 つのレコードを挿入します。

    INSERT INTO table2 VALUES
    (101, 'Jack', 100),
    (102, 'Bob', 90);

データのロード

example2.csvid101 のレコードを table2 から削除するためにロードジョブを実行します。

  • Stream Load ジョブを実行します。

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label3" \
    -H "column_separator:," \
    -H "columns:__op='delete'" \
    -T example2.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load
  • Broker Load ジョブを実行します。

    LOAD LABEL test_db.label3
    (
    data infile("hdfs://<hdfs_host>:<hdfs_port>/example2.csv")
    into table table2
    columns terminated by ","
    format as "csv"
    set (__op = 'delete')
    )
    WITH BROKER;
  • Routine Load ジョブを実行します。

    CREATE ROUTINE LOAD test_db.table2 ON table2
    COLUMNS(id, name, score, __op = 'delete')
    PROPERTIES
    (
    "desired_concurrent_number" = "3",
    "max_batch_interval" = "20",
    "max_batch_rows"= "250000",
    "max_error_number" = "1000"
    )
    FROM KAFKA
    (
    "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
    "kafka_topic" = "test2",
    "property.kafka_default_offsets" ="OFFSET_BEGINNING"
    );

データのクエリ

ロードが完了したら、table2 のデータをクエリしてロードが成功したことを確認します。

SELECT * FROM table2;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Bob | 90 |
+------+------+-------+
1 row in set (0.00 sec)

上記のクエリ結果に示されているように、example2.csvid101 のレコードは table2 から削除されました。

UPSERT と DELETE

ロードしたいデータファイルが UPSERT と DELETE 操作の両方を含む場合、__op フィールドを追加し、データファイルに 0 または 1 の値を持つカラムが含まれていることを確認する必要があります。値が 0 の場合は UPSERT 操作を示し、値が 1 の場合は DELETE 操作を示します。

データ例

  1. データファイルを準備します。

    a. ローカルファイルシステムに example3.csv という名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコア、および操作タイプを順に表す 4 つのカラムで構成されています。

    101,Tom,100,1
    102,Sam,70,0
    103,Stan,80,0

    b. example3.csv のデータを Kafka クラスターの topic3 に公開します。

  2. StarRocks テーブルを準備します。

    a. StarRocks データベース test_dbtable3 という名前の主キーテーブルを作成します。このテーブルは、idnamescore の 3 つのカラムで構成されており、id が主キーです。

    CREATE TABLE `table3`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NOT NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

注意

v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際に、バケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細情報については、バケット数の決定 を参照してください。

b. table3 に 2 つのレコードを挿入します。

INSERT INTO table3 VALUES
(101, 'Tom', 100),
(102, 'Sam', 90);

データのロード

example3.csvid101 のレコードを table3 から削除し、example3.csvid102 のレコードを table3 に更新し、example3.csvid103 のレコードを table3 に挿入するためにロードジョブを実行します。

  • Stream Load ジョブを実行します。

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label4" \
    -H "column_separator:," \
    -H "columns: id, name, score, temp, __op = temp" \
    -T example3.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load

    注意

    上記の例では、example3.csv の操作タイプを表す第 4 カラムが一時的に temp と名付けられ、columns パラメータを使用して __op フィールドが temp カラムにマッピングされています。このようにして、StarRocks は example3.csv の第 4 カラムの値が 0 または 1 であるかに応じて、UPSERT または DELETE 操作を実行するかどうかを決定できます。

  • Broker Load ジョブを実行します。

    LOAD LABEL test_db.label4
    (
    data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
    into table table1
    columns terminated by ","
    format as "csv"
    (id, name, score, temp)
    set (__op=temp)
    )
    WITH BROKER;
  • Routine Load ジョブを実行します。

    CREATE ROUTINE LOAD test_db.table3 ON table3
    COLUMNS(id, name, score, temp, __op = temp)
    PROPERTIES
    (
    "desired_concurrent_number" = "3",
    "max_batch_interval" = "20",
    "max_batch_rows"= "250000",
    "max_error_number" = "1000"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
    "kafka_topic" = "test3",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );

データのクエリ

ロードが完了したら、table3 のデータをクエリしてロードが成功したことを確認します。

SELECT * FROM table3;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Sam | 70 |
| 103 | Stan | 80 |
+------+------+-------+
2 rows in set (0.01 sec)

上記のクエリ結果に示されているように、example3.csvid101 のレコードは table3 から削除され、example3.csvid102 のレコードは table3 に更新され、example3.csvid103 のレコードは table3 に挿入されました。

部分更新

v2.2 以降、StarRocks は主キーテーブルの指定されたカラムのみを更新することをサポートしています。このセクションでは、CSV を例にして部分更新を実行する方法を説明します。

注意

部分更新を実行する際、更新対象の行が存在しない場合、StarRocks は新しい行を挿入し、データ更新が挿入されないフィールドにはデフォルト値を埋め込みます。

データ例

  1. データファイルを準備します。

    a. ローカルファイルシステムに example4.csv という名前の CSV ファイルを作成します。このファイルは、ユーザー ID とユーザー名を順に表す 2 つのカラムで構成されています。

    101,Lily
    102,Rose
    103,Alice

    b. example4.csv のデータを Kafka クラスターの topic4 に公開します。

  2. StarRocks テーブルを準備します。

    a. StarRocks データベース test_dbtable4 という名前の主キーテーブルを作成します。このテーブルは、idnamescore の 3 つのカラムで構成されており、id が主キーです。

    CREATE TABLE `table4`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NOT NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際に、バケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細情報については、バケット数の決定 を参照してください。

    b. table4 にレコードを挿入します。

    INSERT INTO table4 VALUES
    (101, 'Tom',80);

データのロード

example4.csv の 2 つのカラムのデータを table4idname カラムに更新するためにロードを実行します。

  • Stream Load ジョブを実行します。

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label7" -H "column_separator:," \
    -H "partial_update:true" \
    -H "columns:id,name" \
    -T example4.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table4/_stream_load

    注意

    Stream Load を選択した場合、部分更新機能を有効にするために partial_update パラメータを true に設定する必要があります。さらに、更新したいカラムを指定するために columns パラメータを使用する必要があります。

  • Broker Load ジョブを実行します。

    LOAD LABEL test_db.table4
    (
    data infile("hdfs://<hdfs_host>:<hdfs_port>/example4.csv")
    into table table4
    format as "csv"
    (id, name)
    )
    WITH BROKER
    PROPERTIES
    (
    "partial_update" = "true"
    );

    注意

    Broker Load を選択した場合、部分更新機能を有効にするために partial_update パラメータを true に設定する必要があります。さらに、更新したいカラムを指定するために column_list パラメータを使用する必要があります。

  • Routine Load ジョブを実行します。

    CREATE ROUTINE LOAD test_db.table4 on table4
    COLUMNS (id, name),
    COLUMNS TERMINATED BY ','
    PROPERTIES
    (
    "partial_update" = "true"
    )
    FROM KAFKA
    (
    "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
    "kafka_topic" = "test4",
    "property.kafka_default_offsets" ="OFFSET_BEGINNING"
    );

    注意

    Broker Load を選択した場合、部分更新機能を有効にするために partial_update パラメータを true に設定する必要があります。さらに、更新したいカラムを指定するために COLUMNS パラメータを使用する必要があります。

データのクエリ

ロードが完了したら、table4 のデータをクエリしてロードが成功したことを確認します。

SELECT * FROM table4;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 102 | Rose | 0 |
| 101 | Lily | 80 |
| 103 | Alice | 0 |
+------+-------+-------+
3 rows in set (0.01 sec)

上記のクエリ結果に示されているように、example4.csvid101 のレコードは table4 に更新され、example4.csvid102103 のレコードは table4 に挿入されました。

条件付き更新

StarRocks v2.5 以降、主キーテーブルは条件付き更新をサポートしています。非主キーのカラムを条件として指定し、更新が有効になるかどうかを決定できます。このようにして、ソースレコードからデスティネーションレコードへの更新は、指定されたカラムにおいてソースデータレコードがデスティネーションデータレコードよりも大きいか等しい値を持つ場合にのみ有効になります。

条件付き更新機能はデータの無秩序を解決するために設計されています。ソースデータが無秩序である場合、この機能を使用して新しいデータが古いデータによって上書きされないようにすることができます。

注意

  • 同じバッチのデータに対して異なるカラムを更新条件として指定することはできません。
  • DELETE 操作は条件付き更新をサポートしていません。
  • v3.1.3 より前のバージョンでは、部分更新と条件付き更新を同時に使用することはできません。v3.1.3 以降、StarRocks は部分更新と条件付き更新の同時使用をサポートしています。

データ例

  1. データファイルを準備します。

    a. ローカルファイルシステムに example5.csv という名前の CSV ファイルを作成します。このファイルは、ユーザー ID、バージョン、およびユーザースコアを順に表す 3 つのカラムで構成されています。

    101,1,100
    102,3,100

    b. example5.csv のデータを Kafka クラスターの topic5 に公開します。

  2. StarRocks テーブルを準備します。

    a. StarRocks データベース test_dbtable5 という名前の主キーテーブルを作成します。このテーブルは、idversionscore の 3 つのカラムで構成されており、id が主キーです。

    CREATE TABLE `table5`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `version` int NOT NULL COMMENT "version",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際に、バケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細情報については、バケット数の決定 を参照してください。

    b. table5 にレコードを挿入します。

    INSERT INTO table5 VALUES
    (101, 2, 80),
    (102, 2, 90);

データのロード

example5.csvid101102 のレコードを table5 に更新し、それぞれのレコードの version 値が現在の version 値以上である場合にのみ更新が有効になるように指定します。

  • Stream Load ジョブを実行します。

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label10" \
    -H "column_separator:," \
    -H "merge_condition:version" \
    -T example5.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table5/_stream_load
  • Routine Load ジョブを実行します。

    CREATE ROUTINE LOAD test_db.table5 on table5
    COLUMNS (id, version, score),
    COLUMNS TERMINATED BY ','
    PROPERTIES
    (
    "merge_condition" = "version"
    )
    FROM KAFKA
    (
    "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
    "kafka_topic" = "topic5",
    "property.kafka_default_offsets" ="OFFSET_BEGINNING"
    );
  • Broker Load ジョブを実行します。

    LOAD LABEL test_db.table5
    ( DATA INFILE ("s3://xxx.csv")
    INTO TABLE table5 COLUMNS TERMINATED BY "," FORMAT AS "CSV"
    )
    WITH BROKER
    PROPERTIES
    (
    "merge_condition" = "version"
    );

データのクエリ

ロードが完了したら、table5 のデータをクエリしてロードが成功したことを確認します。

SELECT * FROM table5;
+------+------+-------+
| id | version | score |
+------+------+-------+
| 101 | 2 | 80 |
| 102 | 3 | 100 |
+------+------+-------+
2 rows in set (0.02 sec)

上記のクエリ結果に示されているように、example5.csvid101 のレコードは table5 に更新されず、example5.csvid102 のレコードは table5 に挿入されました。