Routine Load
Routine Load を使用したデータロード
この クイックスタート で Routine Load を試してみてください。
このトピックでは、Kafka メッセージ(イベント)を StarRocks にストリームするための Routine Load ジョブの作成方法を紹介し、Routine Load に関する基本的な概念を説明します。
ストリームのメッセージを StarRocks に継続的にロードするには、Kafka トピックにメッセージストリームを保存し、Routine Load ジョブを作成してメッセージを消費します。Routine Load ジョブは StarRocks に保持され、トピック内のすべてまたは一部のパーティションのメッセージを消費する一連のロードタスクを生成し、メッセージを StarRocks にロードします。
Routine Load ジョブは、StarRocks にロードされるデータが失われたり重複したりしないことを保証するために、厳密な一度だけの配信セマンティクスをサポートしています。
Routine Load はデータロード時のデータ変換をサポートし、データロード中に UPSERT および DELETE 操作によるデータ変更をサポートします。詳細については、Transform data at loading および Change data through loading を参照してください。
StarRocks テーブルにデータを ロード するには、その StarRocks テーブルに対して INSERT 権限を持つユーザーである必要があります。INSERT 権限を持っていない場合は、 GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。構文は GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>} です。
サポートされているデータ形式
Routine Load は現在、Kafka クラスターからの CSV、JSON、および Avro(v3.0.1 以降でサポート)形式のデータの消費をサポートしています。
注意
CSV データについては、以下の点に注意してください:
- テキスト区切り文字として、長さが 50 バイトを超えない UTF-8 文字列(カンマ(,)、タブ、パイプ(|)など)を使用できます。
- Null 値は
\Nを使用して示されます。たとえば、データファイルが 3 列で構成されており、そのデータファイルのレコードが第 1 列と第 3 列にデータを持ち、第 2 列にデータがない場合、この状況では第 2 列に\Nを使用して Null 値を示す必要があります。つまり、レコードはa,\N,bとしてコンパイルされる必要があり、a,,bではありません。a,,bは、レコードの第 2 列が空の文字列を持っていることを示します。
基本概念

用語
-
Load job
Routine Load ジョブは長時間実行されるジョブです。そのステータスが RUNNING である限り、ロードジョブは Kafka クラスターのトピック内のメッセージを消費し、データを StarRocks にロードするための 1 つまたは複数の同時ロードタスクを継続的に生成します。
-
Load task
ロードジョブは特定のルールに基づいて複数のロードタスクに分割されます。ロードタスクはデータロードの基本単位です。個々のイベントとして、ロードタスクは Stream Load に基づいてロードメカニズムを実装します。複数のロードタスクが異なるトピックのパーティションからメッセージを同時に消費し、データを StarRocks にロードします。
ワークフロー
-
Routine Load ジョブを作成します。 Kafka からデータをロードするには、CREATE ROUTINE LOAD ステートメントを実行して Routine Load ジョブを作成する必要があります。FE はステートメントを解析し、指定したプロパティに基づいてジョブを作成します。
-
FE はジョブを複数のロードタスクに分割します。
FE は特定のルールに基づいてジョブを複数のロードタスクに分割します。各ロードタスクは個別のトランザクションです。 分割ルールは以下の通りです:
- FE は、Kafka トピックのパーティション数と生存している BE ノードの数に基づいて、ロードタスクの実際の同時数を計算します。
- FE は計算された実際の同時数に基づいてジョブをロードタスクに分割し、タスクをタスクキューに配置します。
各 Kafka トピックは複数のパーティションで構成されています。トピックパーティションとロードタスクの関係は以下の通りです:
- パーティションはロードタスクに一意に割り当てられ、そのパーティションからのすべてのメッセージはロードタスクによって消費されます。
- ロードタスクは 1 つ以上のパーティションからメッセージを消費できます。
- すべてのパーティションはロードタスク間で均等に分配されます。
-
複数のロードタスクが同時に複数の Kafka トピックパーティションからメッセージを消費し、データを StarRocks にロードします。
-
FE はロードタスクをスケジュールして提出します:FE はキュー内のロードタスクをタイムリーにスケジュールし、選択された Coordinator BE ノードに割り当てます。ロードタスク間の間隔は、構成項目
max_batch_intervalによって定義されます。FE はロードタスクをすべ ての BE ノードに均等に分配します。max_batch_intervalに関する詳細は CREATE ROUTINE LOAD を参照してください。 -
Coordinator BE はロードタスクを開始し、パーティション内のメッセージを消費し、データを解析およびフィルタリングします。ロードタスクは、事前に定義されたメッセージ量が消費されるか、事前に定義された時間制限に達するまで続きます。メッセージバッチサイズと時間制限は、FE 構成の
max_routine_load_batch_sizeおよびroutine_load_task_consume_secondに定義されています。詳細情報は FE Configuration を参照してください。Coordinator BE はその後、メッセージを Executor BEs に分配します。Executor BEs はメッセージをディスクに書き込みます。注意
StarRocks は、SASL_SSL、SAS_PLAINTEXT、SSL、および PLAINTEXT を含むセキュリティプロトコルを介した Kafka へのアクセスをサポートしています。このトピックでは、PLAINTEXT を介して Kafka に接続する例を使用しています。他のセキュリティプロトコルを介して Kafka に接続する必要がある場合は、CREATE ROUTINE LOAD を参照してください。
-
-
FE はデータを継続的にロードするための新しいロードタスクを生成します。 Executor BEs がデータをディスクに書き込んだ後、Coordinator BE はロードタスクの結果を FE に報告します。その結果に基づいて、FE はデータを継続的にロードするための新しいロードタスクを生成します。または、FE は失敗したタスクを再試行して、StarRocks にロードされたデータが失われたり重複したりしないようにします。
Routine Load ジョブを作成する
以下の 3 つの例では、Kafka で CSV 形式、JSON 形式、および Avro 形式のデータを消費し、Routine Load ジョブを作成して StarRocks にデータをロードする方法を説明します。詳細な構文とパラメータの説明については、CREATE ROUTINE LOAD を参照してください。
CSV 形式のデータをロードする
このセクションでは、Kafka クラスターで CSV 形式のデータを消費し、StarRocks にデータをロードするための Routine Load ジョブを作成する方法を説明します。