CREATE ROUTINE LOAD
この クイックスタート で Routine Load を試してみてください。
Routine Load は Apache Kafka® からメッセージを継続的に消費し、StarRocks にデータをロードできます。Routine Load は Kafka クラスターから CSV、JSON、Avro(v3.0.1 以降でサポート)データを消費し、plaintext、ssl、sasl_plaintext、sasl_ssl などの複数のセキュリティプロトコルを介して Kafka にアクセスできます。
このトピックでは、CREATE ROUTINE LOAD ステートメントの構文、パラメーター、および例について説明します。
- Routine Load の適用シナリオ、原則、および基本操作については、 Load data using Routine Load を参照してください。
- StarRocks テーブルにデータをロードするには、その StarRocks テーブルに対して INSERT 権限を持つユーザーとしてのみ可能です。INSERT 権限を持っていない場合は、 GRANT の指示に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。
構文
CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
パラメーター
database_name, job_name, table_name
database_name
任意。StarRocks データベースの名前。
job_name
必須。Routine Load ジョブの名前。1 つのテーブルは複数の Routine Load ジョブからデータを受け取ることができます。識別可能な情報(例: Kafka トピック名やジョブ作成時刻)を使用して、意味のある Routine Load ジョブ名を設定することをお勧めします。Routine Load ジョブの名前は同じデータベース内で一意である必要があります。
table_name
必須。データがロードされる StarRocks テーブルの名前。
load_properties
任意。データのプロパティ。構文:
[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]
COLUMNS TERMINATED BY
CSV 形式データのカラム区切り文字。デフォルトのカラム区切り文字は \t(タブ)です。例えば、カラム区切り文字をカンマに指定するには COLUMNS TERMINATED BY "," を使用します。
- ここで指定したカラム区切り文字が、取り込むデータのカラム区切り文字と同じであることを確認してください。
- UTF-8 文字列(カンマ(,)、タブ、パイプ(|)など)をテキストデリミタとして使用できますが、長さは 50 バイトを超えないようにしてください。
- Null 値は
\Nを使用して示されます。例えば、データレコードが 3 つのカラムで構成されており、データレコードが最初と 3 番目のカラムにデータを保持しているが、2 番目のカラムにはデータを保持していない場合、この状況では 2 番目のカラムに\Nを使用して Null 値を示す必要があります。つまり、レコードはa,\N,bとしてコンパイルされる必要があり、a,,bではありません。a,,bはレコードの 2 番目のカラムが空の文字列を保持していることを示します。
ROWS TERMINATED BY
CSV 形式データの行区切り文字。デフォルトの行区切り文字は \n です。
COLUMNS
ソースデータのカラムと StarRocks テーブルのカラム間のマッピング。詳細については、このトピックの Column mapping を参照してください。
column_name: ソースデータのカラムが計算なしで StarRocks テーブルのカラムにマッピングできる場合、カラム名を指定するだけで済みます。これらのカラムはマップされたカラムと呼ばれます。column_assignment: ソースデータのカラムが直接 StarRocks テーブルのカラムにマッピングできない場合、データロード前に関数を使用してカラムの値を計算する必要があります。この場合、exprに計算関数を指定する必要があります。これらのカラムは派生カラムと呼ばれます。StarRocks は最初にマップされたカラムを解析するため、派生カラムはマップされたカラムの後に配置することをお勧めします。
WHERE
フィルター条件。フィルター条件を満たすデータのみが StarRocks にロードされます。例えば、col1 の値が 100 より大きく、col2 の値が 1000 と等しい行のみを取り込みたい場合、WHERE col1 > 100 and col2 = 1000 を使用できます。
フィルター条件で指定されたカラムは、ソースカラムまたは派生カラムであることができます。
PARTITION
StarRocks テーブルがパーティション p0、p1、p2、p3 に分散されており、StarRocks にデータをロードする際に p1、p2、p3 のみにデータをロードし、p0 に保存されるデータをフィルタリングしたい場合、フィルター条件として PARTITION(p1, p2, p3) を指定できます。デフォルトでは、このパラメーターを指定しない場合、データはすべてのパーティションにロードされます。例:
PARTITION (p1, p2, p3)
TEMPORARY PARTITION
データをロードしたい temporary partition の名前。複数の一時パーティションを指定することができ、カンマ(,)で区切る必要があります。
job_properties
必須。ロードジョブのプロパティ。構文:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
desired_concurrent_number
必須: いいえ
説明: 単一の Routine Load ジョブの期待されるタスク並行性。デフォルト値: 3。実際のタスク並行性は、複数のパ ラメーターの最小値によって決定されます: min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)。
alive_be_number: 生存している BE ノードの数。partition_number: 消費されるパーティションの数。desired_concurrent_number: 単一の Routine Load ジョブの期待されるタスク並行性。デフォルト値:3。max_routine_load_task_concurrent_num: Routine Load ジョブのデフォルトの最大タスク並行性で、5です。 FE dynamic parameter を参照してください。
max_batch_interval
必須: いいえ
説明: タスクのスケジューリング間隔、つまりタスクが実行される頻度。単位: 秒。値の範囲: 5 ~ 60。デフォルト値: 10。10 秒以上の値を設定することをお勧めします。スケジューリングが 10 秒未満の場合、ロード頻度が高すぎるために多くのタブレットバージョンが生成されます。
max_batch_rows
必須: いいえ
説明: このプロパティは、エラーデータ検出ウィンドウを定義するためにのみ使用されます。ウィンドウは、単一の Routine Load タスクによって消費されるデータ行数です。値は 10 * max_batch_rows です。デフォルト値は 10 * 200000 = 2000000 です。Routine Load タスクは、エラーデータ検出ウィンドウ内でエラーデータを検出します。エラーデータとは、StarRocks が解析できないデータを指します。たとえば、無効な JSON 形式のデータなどです。
max_error_number
必須: いいえ
説明: エラーデータ検出ウィンドウ内で許可されるエラーデータ行の最大数。この値を超えると、ロードジョブは一時停止します。 SHOW ROUTINE LOAD を実行し、ErrorLogUrls を使用してエラーログを表示できます。その後、エラーログに従って Kafka のエラーを修正できます。デフォルト値は 0 で、エラーデータ行は許可されません。
注意
- エラーデータ行が多すぎる場合、ロードジョブが一時停止する前に最後のバッチタスクは 成功 します。つまり、適格なデータはロードされ、不適格なデータはフィルタリングされます。あまりにも多くの不適格なデータ行をフィルタリングしたくない場合は、パラメーター
max_filter_ratioを設定してくださ い。 - エラーデータ行には、WHERE 句によってフィルタリングされたデータ行は含まれません。
- このパラメーターは、次のパラメーター
max_filter_ratioと共に、エラーデータレコードの最大数を制御します。max_filter_ratioが設定されていない場合、このパラメーターの値が有効になります。max_filter_ratioが設定されている場合、エラーデータレコードの数がこのパラメーターまたはmax_filter_ratioパラメーターで設定されたしきい値に達すると、ロードジョブは一時停止します。
max_filter_ratio
必須: いいえ
説明: ロードジョブの最大エラー許容度。エラー許容度は、ロードジョブによって要求されたすべてのデータレコードの中で、不適格なデータ品質のためにフィルタリングされる可能性のあるデータレコードの最大割合です。有効な値: 0 から 1。デフォルト値: 1(実際には効果を発揮しません)。0 に設定することをお勧めします。これにより、不適格なデータレコードが検出された場合、ロードジョブが一時停止し、データの正確性が確保されます。
不適格なデータレコードを無視したい場合は、このパラメーターを 0 より大きい値に設定できます。このようにすると、データファイルに不適格なデータレコードが含まれていても、ロードジョブは成功します。
注意
- エラーデータ行が
max_filter_ratioを超える場合、最後のバッチタスクは 失敗 します。これはmax_error_numberの効果とは 異なります。 - 不適格なデータレコードには、WHERE 句によってフィルタリングされたデータレコードは含まれません。
- このパラメーターは、前のパラメーター
max_error_numberと共に、エラーデータレコードの最大数を制御します。このパラメーターが設定されていない場合(max_filter_ratio = 1と同じように動作します)、max_error_numberパラメーターの値が有効になります。このパラメーターが設定されている場合、エラーデータレコードの数がこのパラメーターまたはmax_error_numberパラメーターで設定されたしきい値に達すると、ロードジョブは一時停止します。
strict_mode
必須: いいえ
説明: strict mode を有効にするかどうかを指定します。有効な値: true と false。デフォルト値: false。strict mode が有効な場合、ロードされたデータのカラムの値が NULL であり、ターゲットテーブルがこのカラムに NULL 値を許可しない場合、データ行はフィルタリングされます。
log_rejected_record_num
必須: いいえ
説明: ログに記録できる不適格なデータ行の最大数を指定します。このパラメーターは v3.1 以降でサポートされています。有効な値: 0、-1、および任意の非ゼロの正の整数。デフォルト値: 0。
- 値
0は、フィルタリングされたデータ行がログに記録されないことを指定します。 - 値
-1は、フィルタリングされたすべてのデータ行がログに記録されることを指定します。 - 非ゼロの正の整数
nは、各 BE でフィルタリングされた最大n行のデータ行がログに記録されることを指定します。
information_schema.loads ビューに対するクエリから返された REJECTED_RECORD_PATH フィールドのパスに移動します。
timezone
必須: いいえ
説明: ロードジョブで使用されるタイムゾーン。デフォルト値: Asia/Shanghai。このパラメーターの値は、strftime()、alignment_timestamp()、from_unixtime() などの関数によって返される結果に影響を与えます。このパラメーターで指定されたタイムゾーンは、セッションレベルのタイムゾーンです。詳細については、 Configure a time zone を参照してください。
partial_update
必須: いいえ
説明: 部分更新を使用するかどうか。 有効な値: TRUE と FALSE。デフォルト値: FALSE、この機能を無効にすることを示します。
merge_condition
必須: いいえ
説明: データを更新するかどうかを判断する条件として使用するカラムの名前を指定します。このカラムにロードされるデータの値がこのカラムの現在の値以上の場合にのみデータが更新されます。 注意
条件付き更新をサポートするのは主キーテーブルのみです。指定するカラムは主キーのカラムであってはなりません。
format
必須: いいえ
説明: ロードするデータの形式。 有効な値: CSV、JSON、および Avro(v3.0.1 以降でサポート)。デフォルト値: CSV。