Spark Load を使用した大量データのロード
このロードは、外部の Apache Spark™ リソースを使用してインポートデータを事前処理し、インポートのパフォーマンスを向上させ、計算リソースを節約します。主に 初期移行 や 大規模データのインポート に使用されます (データ量は TB レベルまで)。
Spark Load は 非同期 のインポート方法であり、ユーザーは MySQL プロトコルを介して Spark タイプのインポートジョブを作成し、SHOW LOAD を使用してインポート結果を確認します。
注意
- StarRocks テーブルに対して INSERT 権限を持つユーザーのみが、このテーブルにデータをロードできます。GRANT に従って必要な権限を付与できます。
- Spark Load は、主キーテーブルへのデータロードには使用できません。
用語の説明
- Spark ETL: インポートプロセスでのデータの ETL を主に担当し、グローバル辞書の構築(BITMAP 型)、パーティショニング、ソート、集約などを含みます。
- Broker: Broker は独立したステートレスプロセスです。ファイルシステムインターフェースをカプセル化し、StarRocks にリモートストレージシステムからファイルを読み取る能力を提供します。
- Global Dictionary: 元の値からエンコードされた値へのデータ構造を保存します。元の値は任意のデータ型であり、エンコードされた値は整数です。グローバル辞書は、正確なカウントディスティンクトが事前計算されるシナリオで主に使用されます。
背景情報
StarRocks v2.4 以前では、Spark Load は Broker プロセスに依存して、StarRocks クラスターとストレージシステム間の接続を設定していました。Spark Load ジョブを作成する際には、使用する Broker を指定するために WITH BROKER "<broker_name>" を入力する必要があります。Broker は独立したステートレスプロセスであり、ファイルシステムインターフェースと統合されています。Broker プロセスを使用することで、StarRocks はストレージシステムに保存されたデータファイルにアクセスして読み取り、独自の計算リソースを使用してこれらのデータファイルを事前処理およびロードできます。
StarRocks v2.5 以降では、Spark Load は Broker プロセスに依存せずに、StarRocks クラスターとストレージシステム間の接続を設定できます。Spark Load ジョブを作成する際には、Broker を指定する必要はありませんが、WITH BROKER キーワードは保持する必要があります。
注意
Broker プロセスを使用しないロードは、複数の HDFS クラスターや複数の Kerberos ユーザーがある場合など、特定の状況では機能しない場合があります。このような場合でも、Broker プロセスを使用してデータをロードできます。
基本原理
ユーザーは MySQL クライアントを通じて Spark タイプのインポートジョブを提出し、FE は メタデータを記録して提出結果を返します。
Spark Load タスクの実行は、以下の主要なフェーズに分かれています。
- ユーザーが Spark Load ジョブを FE に提出します。
- FE は ETL タスクを Apache Spark™ クラスターに提出して実行します。
- Apache Spark™ クラスターは、グローバル辞書の構築(BITMAP 型)、パーティショニング、ソート、集約などを含む ETL タスクを実行します。
- ETL タスクが完了すると、FE は各事前処理されたスライスのデータパスを取得し、関連する BE に Push タスクを実行させます。
- BE は Broker プロセスを通じて HDFS からデータを読み取り、StarRocks ストレージ形式に変換します。
Broker プロセスを使用しない場合、BE は HDFS から直接データを読み取ります。
- FE は有効なバージョンをスケジュールし、インポートジョブを完了させます。
以下の図は、Spark Load の主なフローを示しています。

グローバル辞書
適用シナリオ
現在、StarRocks の BITMAP 列は Roaringbitmap を 使用して実装されており、入力データ型は整数のみです。したがって、インポートプロセスで BITMAP 列の事前計算を実装する場合、入力データ型を整数に変換する必要があります。
StarRocks の既存のインポートプロセスでは、グローバル辞書のデータ構造は Hive テーブルに基づいて実装されており、元の値からエンコードされた値へのマッピングを保存します。
構築プロセス
- 上流のデータソースからデータを読み取り、一時的な Hive テーブル
hive-tableを生成します。 hive-tableの強調されていないフィールドの値を抽出し、新しい Hive テーブルdistinct-value-tableを生成します。- 元の値とエンコードされた値の列を持つ新しいグローバル辞書テーブル
dict-tableを作成します。 distinct-value-tableとdict-tableの間で左ジョインを行い、ウィンドウ関数を使用してこのセットをエンコードします。最終的に、重複排除された列の元の値とエンコードされた値をdict-tableに書き戻します。dict-tableとhive-tableの間でジョインを行い、hive-tableの元の値を整数のエンコード値に置き換える作業を完了します。hive-tableは次回のデータ事前処理で読み取られ、計算後に StarRocks にインポートされます。