HDFS Load
HDFS からのデータロード
StarRocks は、HDFS からデータをロードするために次のオプションを提供します。
- INSERT+
FILES()を使用した同期ロード - Broker Load を使用した非同期ロード
- Pipe を使用した継続的な非同期ロード
これらの各オプションには、それぞれの利点があり、詳細は以下のセクションで説明されています。
ほとんどの場合、使用が非常に簡単な INSERT+FILES() メソッドをお勧めします。
ただし、INSERT+FILES() メソッドは現在、Parquet、ORC、および CSV ファイル形式のみをサポートしています。したがって、JSON などの他のファイル形式のデータをロードする必要がある場合や、データロード中に DELETE などのデータ変更を行う必要がある場合は、Broker Load を利用できます。
大量のデータファイルを合計で大きなデータ量(例えば、100 GB 以上または 1 TB 以上)でロードする必要がある場合は、Pipe メソッドを使用することをお勧めします。Pipe はファイルの数やサイズに基づいてファイルを分割し、ロードジョブをより小さく順次のタスクに分解します。このアプローチにより、1 つのファイルのエラーが全体のロードジョブに影響を与えず、データエラーによる再試行の必要性を最小限に抑えます。
始める前に
ソースデータの準備
StarRocks にロードしたいソースデータが、HDFS クラスターに適切に保存されていることを確認してください。こ のトピックでは、HDFS から StarRocks に /user/amber/user_behavior_ten_million_rows.parquet をロードすることを前提としています。
権限の確認
StarRocks のテーブルにデータを ロード するには、その StarRocks テーブルに対して INSERT 権限を持つユーザーである必要があります。INSERT 権限がない場合は、GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。構文は GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>} です。
認証情報の収集
HDFS クラスターとの接続を確立するために、シンプルな認証方法を使用できます。シンプルな認証を使用するには、HDFS クラスターの NameNode にアクセスするためのアカウントのユーザー名とパスワードを収集する必要があります。
INSERT+FILES() の使用
この方法は v3.1 以降で利用可能で、現在は Parquet、ORC、お よび CSV (v3.3.0 以降) ファイル形式のみをサポートしています。
INSERT+FILES() の利点
FILES() は、指定したパス関連のプロパティに基づいてクラウドストレージに保存されたファイルを読み取り、ファイル内のデータのテーブルスキーマを推測し、ファイルからデータをデータ行として返すことができます。
FILES() を使用すると、次のことができます:
- SELECT を使用して HDFS から直接データをクエリする。
- CREATE TABLE AS SELECT (CTAS) を使用してテーブルを作成し、ロードする。
- INSERT を使用して既存のテーブルにデータをロードする。
典型的な例
SELECT を使用して HDFS から直接クエリする
SELECT+FILES() を使用して HDFS から直接クエリすることで、テーブルを作成する前にデータセットの内容をプレビューできます。例えば:
- データを保存せずにデータセットのプレビューを取得する。
- 最小値と最大値をクエリして、使用するデータ型を決定する。
NULL値をチェックする。
次の例は、HDFS クラスターに保存されているデータファイル /user/amber/user_behavior_ten_million_rows.parquet をクエリします:
SELECT * FROM FILES
(
"path" = "hdfs://<hdfs_ip>:<hdfs_port>/user/amber/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"hadoop.security.authentication" = "simple",
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
)
LIMIT 3;
システムは次のクエリ結果を返します:
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 543711 | 829192 | 2355072 | pv | 2017-11-27 08:22:37 |
| 543711 | 2056618 | 3645362 | pv | 2017-11-27 10:16:46 |
| 543711 | 1165492 | 3645362 | pv | 2017-11-27 10:17:00 |
+--------+---------+------------+--------------+---------------------+
NOTE
上記のように返される列名は、Parquet ファイルによって提供されます。
CTAS を使用してテーブルを作成しロードする
これは前の例の続きです。前のクエリは CREATE TABLE AS SELECT (CTAS) でラップされ、スキーマ推測を使用してテーブル作成を自動化します。これは、StarRocks がテーブルスキーマを推測し、作成したいテーブルを作成し、データをテーブルにロードすることを意味します。Parquet ファイルを使用する場合、Parquet 形式には列名が含まれているため、FILES() テーブル関数を使用する場合、テーブルを作成するために列名と型を指定する必要はありません。
NOTE
スキーマ推測を使用する場合の CREATE TABLE の構文では、レプリカの数を設定することはできませんので、テーブルを作成する前に設定してください。以下の例は、3 つのレプリカを持つシステムの例です:
ADMIN SET FRONTEND CONFIG ('default_replication_num' = "3");
データベースを作成し、それに切り替えます:
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
CTAS を使用してテーブルを作成し、データファイル /user/amber/user_behavior_ten_million_rows.parquet のデータをテーブルにロードします:
CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "hdfs://<hdfs_ip>:<hdfs_port>/user/amber/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"hadoop.security.authentication" = "simple",
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
テーブルを作成した後、DESCRIBE を使用してそのスキーマを表示できます:
DESCRIBE user_behavior_inferred;
システムは次のクエリ結果を返します:
+--------------+-----------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+-----------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varbinary | YES | false | NULL | |
| Timestamp | varbinary | YES | false | NULL | |
+--------------+-----------+------+-------+---------+-------+
テーブルにデータがロードされたことを確認するために、テーブルをクエリします。例:
SELECT * from user_behavior_inferred LIMIT 3;
次のクエリ結果が返され、データが正常にロードされたことを示しています:
+--------+--------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+--------+------------+--------------+---------------------+
| 84 | 56257 | 1879194 | pv | 2017-11-26 05:56:23 |
| 84 | 108021 | 2982027 | pv | 2017-12-02 05:43:00 |
| 84 | 390657 | 1879194 | pv | 2017-11-28 11:20:30 |
+--------+--------+------------+--------------+---------------------+
INSERT を使用して既存のテーブルにロードする
挿入するテーブルをカスタマイズしたい場合があります。例えば、次のような場合です:
- 列のデータ型、NULL 設定、またはデフォルト値
- キーの種類と列
- データのパーティショニングとバケット化
NOTE
最も効率的なテーブル構造を作成 するには、データの使用方法と列の内容に関する知識が必要です。このトピックではテーブル設計については扱いません。テーブル設計についての情報は、Table types を参照してください。
この例では、テーブルがどのようにクエリされるか、Parquet ファイル内のデータに関する知識に基づいてテーブルを作成します。Parquet ファイル内のデータに関する知識は、HDFS でファイルを直接クエリすることで得られます。
- HDFS のデータセットのクエリにより、
Timestamp列が VARBINARY データ型に一致するデータを含んでいることが示されたため、次の DDL で列の型が指定されています。 - HDFS のデータをクエリすることで、データセットに
NULL値がないことがわかるため、DDL ではどの列も NULL 許可として設定されていません。 - 予想されるクエリタイプに基づいて、ソートキーとバケット化列は
UserID列に設定されています。このデータに対するユースケースは異なるかもしれないので、ソートキーとしてItemIDを追加または代わりに使用することを決定するかもしれません。
データベースを作成し、それに切り替えます:
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手動でテーブルを作成します (HDFS からロードしたい Parquet ファイルと同じスキーマを持つことをお勧めします):
CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp varbinary
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
スキーマを表示して、FILES() テーブル関数によって生成された推測スキーマと比較できるようにします:
DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | NO | true | NULL | |
| ItemID | int | NO | false | NULL | |
| CategoryID | int | NO | false | NULL | |
| BehaviorType | varchar(65533) | NO | false | NULL | |
| Timestamp | varbinary | NO | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
5 rows in set (0.00 sec)
先ほど作成したスキーマを、FILES() テーブル関数を使用して以前に推測されたスキーマと比較してください。以下を確認してください:
- データ型
- NULL 許可
- キーフィールド
ターゲットテーブルのスキーマをより良く制御し、クエリパフォーマンスを向上させるために、本番環境では手動でテーブルスキーマを指定することをお勧めします。
テーブルを作成した後、INSERT INTO SELECT FROM FILES() を使用してロードできます:
INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "hdfs://<hdfs_ip>:<hdfs_port>/user/amber/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"hadoop.security.authentication" = "simple",
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);
ロードが完了した後、テーブルをクエリしてデータがロードされたことを確認できます。例:
SELECT * from user_behavior_declared LIMIT 3;
次のクエリ 結果が返され、データが正常にロードされたことを示しています:
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 107 | 1568743 | 4476428 | pv | 2017-11-25 14:29:53 |
| 107 | 470767 | 1020087 | pv | 2017-11-25 14:32:31 |
| 107 | 358238 | 1817004 | pv | 2017-11-25 14:43:23 |
+--------+---------+------------+--------------+---------------------+
ロード進捗の確認
StarRocks Information Schema の loads ビュ ーから INSERT ジョブの進捗をクエリできます。この機能は v3.1 以降でサポートされています。例:
SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;
loads ビューで提供されるフィールドについての情報は、loads を参照してください。
複数のロードジョブを送信した場合は、ジョブに関連付けられた LABEL でフィルタリングできます。例:
SELECT * FROM information_schema.loads WHERE LABEL = 'insert_0d86c3f9-851f-11ee-9c3e-00163e044958' \G
*************************** 1. row ***************************
JOB_ID: 10214
LABEL: insert_0d86c3f9-851f-11ee-9c3e-00163e044958
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-17 15:58:14
ETL_START_TIME: 2023-11-17 15:58:14
ETL_FINISH_TIME: 2023-11-17 15:58:14
LOAD_START_TIME: 2023-11-17 15:58:14
LOAD_FINISH_TIME: 2023-11-17 15:58:18
JOB_DETAILS: {"All backends":{"0d86c3f9-851f-11ee-9c3e-00163e044958":[10120]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"0d86c3f9-851f-11ee-9c3e-00163e044958":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
NOTE
INSERT は同期コマンドです。INSERT ジョブがまだ実行中の場合、その実行ステータスを確認するには別のセッションを開く必要があります。
Broker Load の使用
非同期の Broker Load プロセスは、HDFS への接続を確立し、データを取得し、StarRocks にデータを保存する処理を行います。
この方法は次のファイル形式をサポートしています:
- Parquet
- ORC
- CSV
- JSON (v3.2.3 以降でサポート)