Routine Load を使用したデータロード
このトピックでは、Kafka メッセージ(イベント)を StarRocks にストリームするための Routine Load ジョブの作成方法を紹介し、Routine Load に関する基本的な概念を説明します。
ストリームのメッセージを StarRocks に継続的にロードするには、メッセージストリーム を Kafka トピックに保存し、Routine Load ジョブを作成してメッセージを消費します。Routine Load ジョブは StarRocks に保持され、トピック内のすべてまたは一部のパーティションのメッセージを消費する一連のロードタスクを生成し、メッセージを StarRocks にロードします。
Routine Load ジョブは、StarRocks にロードされたデータが失われたり重複したりしないことを保証するために、正確に一度だけの配信セマンティクスをサポートします。
Routine Load はデータロード時のデータ変換をサポートし、データロード時に UPSERT および DELETE 操作によるデータ変更をサポートします。詳細については、Transform data at loading および Change data through loading を参照してください。
StarRocks テーブルにデータを ロード できるのは、これらの StarRocks テーブルに対して INSERT 権限を持つユーザーのみです。INSERT 権限を持っていない場合は、 GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。
サポートされているデータ形式
Routine Load は現在、Kafka クラスターからの CSV、JSON、および Avro(v3.0.1 以降でサポート)形式のデータの消 費をサポートしています。
注意
CSV データについては、以下の点に注意してください:
- テキスト区切り文字として、長さが 50 バイトを超えない UTF-8 文字列(カンマ(,)、タブ、パイプ(|)など)を使用できます。
- Null 値は
\Nを使用して示されます。たとえば、データファイルが 3 列で構成されており、そのデータファイルのレコードが最初と第三の列にデータを持ち、第二の列にデータがない場合、この状況では第二の列に\Nを使用して Null 値を示す必要があります。つまり、レコードはa,\N,bとしてコンパイルされる必要があり、a,,bではありません。a,,bはレコードの第二の列が空の文字列を持っていることを示します。
基本概念

用語
-
Load job
Routine Load ジョブは長時間実行されるジョブです。そのステータスが RUNNING である限り、ロードジョブは Kafka クラスターのトピック内のメッセージを消費し、StarRocks にデータをロードするための 1 つまたは複数の同時ロードタスクを継続的に生成します。
-
Load task
ロードジョブは特定のルールに基づいて複数のロードタスクに分割されます。ロードタスクはデータロードの基本単位です。個々のイベントとして、ロードタスクは Stream Load に基づくロードメカニズムを実装します。複数のロードタスクがトピックの異なるパーティションからメッセージを同時に消費し、StarRocks にデータをロードします。
ワークフロー
-
Routine Load ジョブを作成します。 Kafka からデータをロードするには、CREATE ROUTINE LOAD ステートメントを実行して Routine Load ジョブを作成する必要があります。FE はステートメントを解析し、指定されたプロパティに従ってジョブを作成します。
-
FE がジョブを複数のロードタスクに分割します。
FE は特定のルールに基づいてジョブを複数のロードタスクに分割します。各ロードタスクは個別のトランザクションです。 分割ルールは次のとおりです:
- FE は、希望する同時数
desired_concurrent_number、Kafka トピックのパーティション数、および生存している BE ノードの数に基づいて、ロードタスクの実際の同時数を計算します。 - FE は計算された実際の同時数に基づいてジョブをロードタスクに分割し 、タスクをタスクキューに配置します。
各 Kafka トピックは複数のパーティションで構成されています。トピックパーティションとロードタスクの関係は次のとおりです:
- パーティションはロードタスクに一意に割り当てられ、そのパーティションからのすべてのメッセージはロードタスクによって消費されます。
- ロードタスクは 1 つ以上のパーティションからメッセージを消費できます。
- すべてのパーティションはロードタスク間で均等に分散されます。
- FE は、希望する同時数
-
複数のロードタスクが同時に複数の Kafka トピックパーティションからメッセージを消費し、StarRocks にデータをロードします。
-
FE がロードタスクをスケジュールして送信します: FE はキュー内のロードタスクをタイムリーにスケジュールし、選択された Coordinator BE ノードに割り当てます。ロードタスク間の間隔は、構成項目
max_batch_intervalによって定義されます。FE はロードタスクをすべての BE ノードに均等に分配します。max_batch_intervalに関する詳細は、CREATE ROUTINE LOAD を参照してください。 -
Coordinator BE はロードタスクを開始し、パーティション内のメッセージを消費し、データを解析してフィルタリングします。ロードタスクは、事前に定義されたメッセージの量が消費されるか、事前に定義された時間制限に達するまで続きます。メッセージバッチサイズと時間制限は、FE 構成
max_routine_load_batch_sizeおよびroutine_load_task_consume_secondに定義されています。詳細については、FE Configuration を参照してください。その後、Coordinator BE はメッセージを Executor BEs に分配します。Executor BEs はメッセージをディスクに書き込みます。注意
StarRocks は、SASL_SSL、SAS_PLAINTEXT、SSL、および PLAINTEXT を含むセキュリティプロトコルを介した Kafka へのアクセスをサポートしています。このトピックでは、PLAINTEXT を介して Kafka に接続する例を使用しています。他のセキュリティプロトコルを介して Kafka に接続する必要がある場合は、CREATE ROUTINE LOAD を参照してください。
-
-
FE がデータを継続的にロードするための新しいロードタスクを生成します。 Executor BEs がデータをディスクに書き込んだ後、Coordinator BE はロードタスクの結果を FE に報告します。その結果に基づいて、FE はデータを継続的にロードするための新しいロードタスクを生成します。または、FE は失敗したタスクを再試行して、StarRocks にロードされたデータが失われたり重複したりしないようにします。
Routine Load ジョブを作成する
次の 3 つの例では、Kafka で CSV 形式、JSON 形式、および Avro 形式のデータを消費し、Routine Load ジョブを作成して StarRocks にデータをロードする方法を説明します。詳細な構文とパラメータの説明については、CREATE ROUTINE LOAD を参照してください。
CSV 形式のデータをロードする
このセクションでは、Kafka クラスターで CSV 形式のデータを消費し、StarRocks にデータをロードするための Routine Load ジョブを作成する方法を説明します。
データセットを準備する
Kafka クラスターのトピック ordertest1 に CSV 形式のデータセットがあるとします。データセット内の各メッセージには、注文 ID、支払い日、顧客名、国籍、性別、価格の 6 つのフィールドが含まれています。
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina",Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924
テーブルを作成する
CSV 形式のデータのフィールドに基づいて、データベース example_db にテーブル example_tbl1 を作成します。以下の例では、CSV 形式のデータの顧客性別フィールドを除く 5 つのフィールドを持つテーブルを作成します。
CREATE TABLE example_db.example_tbl1 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price`double NULL COMMENT "Price"
)
ENGINE=OLAP
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);
注意
v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数(BUCKETS)を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、determine the number of buckets を参照してください。
Routine Load ジョブを送信する
以下のステートメントを実行して、トピック ordertest1 のメッセージを消費し、テーブル example_tbl1 にデータをロードする Routine Load ジョブ example_tbl1_ordertest1 を送信します。ロードタスクは、トピックの指定されたパーティションの初期オフセットからメッセージを消費します。
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" = "0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
ロードジョブを送信した後、SHOW ROUTINE LOAD ステートメントを実行して、ロードジョブのステータスを確認できます。
-
ロードジョブ名
テーブルには複数のロードジョブが存在する可能性があります。そのため、ロードジョブには対応する Kafka トピックとロードジョブが送信された時間を名前として付けることをお勧めします。これにより、各テーブルのロードジョブを区別するのに役立ちます。
-
カラムセパレータ
プロパ ティ
COLUMN TERMINATED BYは、CSV 形式のデータのカラムセパレータを定義します。デフォルトは\tです。 -
Kafka トピックパーティションとオフセット
プロパティ
kafka_partitionsとkafka_offsetsを指定して、メッセージを消費するパーティションとオフセットを指定できます。たとえば、ロードジョブがトピックordertest1の Kafka パーティション"0,1,2,3,4"からすべて初期オフセットでメッセージを消費するようにしたい場合、プロパティを次のように指定できます。Kafka パーティション"0,1,2,3,4"からメッセージを消費し、各パーティションの開始オフセットを個別に指定する必要がある場合は、次のように設定できます。"kafka_partitions" ="0,1,2,3,4",
"kafka_offsets" = "OFFSET_BEGINNING, OFFSET_END, 1000, 2000, 3000"また、すべてのパーティションのデフォルトオフセットをプロパティ
property.kafka_default_offsetsで設定することもできます。"kafka_partitions" ="0,1,2,3,4",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"詳細については、CREATE ROUTINE LOAD を参照してください。
-
データマッピングと変換
CSV 形式のデータと StarRocks テーブルの間のマッピングと変換の関係を指定するには、
COLUMNSパラメータを使用する必要があります。データマッピング:
-
StarRocks は CSV 形式のデータのカラムを抽出し、それらを
COLUMNSパラメータで宣言されたフィールドに順番にマッピングします。 -
StarRocks は
COLUMNSパラメータで宣言されたフィールドを抽出し、それらを StarRocks テーブルのカラムに名前でマッピングします。
データ変換:
例では、CSV 形式のデータから顧客性別のカラムを除外しているため、
COLUMNSパラメータのフィールドtemp_genderはこのフィールドのプレースホルダーとして使用されます。他のフィールドは直接 StarRocks テーブルexample_tbl1のカラムにマッピングされます。データ変換の詳細については、Transform data at loading を参照してください。
注意
CSV 形式のデータのカラムの名前、数、および順序が StarRocks テーブルのそれらと完全に一致する場合、
COLUMNSパラメータを指定する必要はありません。 -
-
タスクの同時実行
Kafka トピックのパーティションが多く、BE ノードが十分にある場合、タスクの同時実行を増やすことでロードを加速できます。
実際のロードタスクの同時実行を増やすには、Routine Load ジョブを作成する際に希望するロードタスクの同時実行数
desired_concurrent_numberを増やすことができます。また、FE の動的構成項目max_routine_load_task_concurrent_num(デフォルトの最大ロードタスクの同時実行数)を大きな値に設定することもできます。max_routine_load_task_concurrent_numの詳細については、FE configuration items を参照してください。実際のタスクの同時実行は、生存している BE ノードの数、事前に指定された Kafka トピックパーティションの数、および
desired_concurrent_numberとmax_routine_load_task_concurrent_numの値の最小値によって定義されます。例では、生存している BE ノードの数は
5、事前に指定された Kafka トピックパーティションの数は5、max_routine_load_task_concurrent_numの値は5です。実際のロードタスクの同時実行を増やすには、デフォルト値3から5にdesired_concurrent_numberを増やすことができます。プロパティの詳細については、CREATE ROUTINE LOAD を参照してください。
JSON 形式のデータをロードする
このセクションでは、Kafka クラスターで JSON 形式のデータを消費し、StarRocks にデータをロードするための Routine Load ジョブを作成する方法を説明します。