AWS S3 からデータをロードする
StarRocks は、AWS S3 からデータをロードするために以下のオプションを提供しています。
- INSERT+
FILES()を使用した同期ロード - Broker Load を使用した非同期ロード
- Pipe を使用した継続的な非同期ロード
これらの各オプションには、それぞれの利点があり、詳細は以下のセクションで説明されています。
ほとんどの場合、使用が非常に簡単な INSERT+FILES() メソッドをお勧めします。
ただし、INSERT+FILES() メソッドは現在、Parquet、ORC、および CSV ファイル形式のみをサポートしています。したがって、JSON などの他のファイル形式のデータをロードする必要がある場合や、データロード中に DELETE などのデータ変更を行う必要がある場合は、Broker Load を利用できます。
大量のデータファイルを合計で大きなデータ量(例えば、100 GB 以上または 1 TB 以上)でロードする必要がある場合は、Pipe メソッドを使用することをお勧めします。Pipe はファイルの数やサイズに基づいてファイルを分割し、ロードジョブをより小さく順次のタスクに分解します。このアプローチにより、1 つのファイルのエラーが全体のロードジョブに影響を与えず、データエラーによる再試行の必要性を最小限に抑えます。
始める前に
ソースデータの準備
StarRocks にロードしたいソースデータが S3 バケットに適切に保存されていることを確認してください。また、データとデータベースの場所を考慮することもお勧めします。バケットと StarRocks クラスターが同じリージョンにある場合、データ転送コストは大幅に低くなります。
このトピックでは、S3 バケットにあるサンプルデータセット s3://starrocks-examples/user-behavior-10-million-rows.parquet を提供します。このオブジェクトは、AWS 認証済みユーザーであれば誰でも読み取れるため、有効な資格情報を使用してデータセットにアクセスできます。
権限の確認
StarRocks のテーブルにデータを ロード するには、その StarRocks テーブルに対して INSERT 権限を持つユーザーである必要があります。INSERT 権限がない場合は、GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。構文は GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>} です。
認証情報の収集
このトピックの例では、IAM ユーザーに基づく認証を使用しています。AWS S3 からデータを読み取る権限を持っていることを確認するために、IAM ユーザーに基づく認証の準備 を読み、適切な IAM ポリシー を設定した IAM ユーザーを作成する手順に従うことをお勧めします。
要するに、IAM ユーザーに基づく認証を実践する場合、次の AWS リソースに関する情報を収集する必要があります。
- データを保存する S3 バケット
- バケット内の特定のオブジェクトにアクセスする場合の S3 オブジェクトキー(オブジェクト名)。S3 オブジェクトがサブフォルダーに保存されている場合、オブジェクトキーにはプレフィックスを含めることができます。
- S3 バケットが属する AWS リージョン
- アクセス資格情報として使用されるアクセスキーとシークレットキー
利用可能なすべての認証方法については、AWS リソースへの認証 を参照してください。
INSERT+FILES() の使用
この方法は v3.1 以降で利用可能で、現在は Parquet、ORC、および CSV(v3.3.0 以降)ファイル形式のみをサポートしています。
INSERT+FILES() の利点
FILES() は、指定したパス関連のプロパティに基づいてクラウドストレージに保存されたファイルを読み取り、ファイル内のデータのテーブルスキーマを推測し、ファイルからデータをデータ行として返します。
FILES() を使用すると、次のことができます。
- SELECT を使用して S3 から直接データをクエリする。
- CREATE TABLE AS SELECT (CTAS) を使用してテーブルを作成し、ロードする。
- INSERT を使用して既存のテーブルにデータをロードする。
典型的な例
SELECT を使用して S3 から直接クエリする
SELECT+FILES() を使用して S3 から直接クエリすることで、テーブルを作成する前にデータセットの内容をプレビューできます。例えば:
- データを保存せずにデータセットをプレビューする。
- 最小値と最大値をクエリし、使用するデータ型を決定する。
NULL値を確認する。
次の例は、サンプルデータセット s3://starrocks-examples/user-behavior-10-million-rows.parquet をクエリします。
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
LIMIT 3;
NOTE
上記のコマンドで
AAAとBBBをあなたの資格情報に置き換えてください。オブジェクトは AWS 認証済みユーザーであれば誰でも読み取れるため、有効なaws.s3.access_keyとaws.s3.secret_keyを使用できます。
システムは次のクエリ結果を返します。
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 1 | 2576651 | 149192 | pv | 2017-11-25 01:21:25 |
| 1 | 3830808 | 4181361 | pv | 2017-11-25 07:04:53 |
| 1 | 4365585 | 2520377 | pv | 2017-11-25 07:49:06 |
+--------+---------+------------+--------------+---------------------+
NOTE
上記のように返される列名は Parquet ファイルによって提供されます。
CTAS を使用してテーブルを作成しロードする
これは前の例の続きです。前のクエリは CREATE TABLE AS SELECT (CTAS) にラップされ、スキーマ推測を使用してテーブル作成を自動化します。これは、StarRocks がテーブルスキーマを推測し、作成したいテーブルを作成し、データをテーブルにロードすることを意味します。Parquet ファイルを使用する場合、Parquet 形式には列名が含まれているため、FILES() テーブル関数を使用する際にテーブルを作成するための列名と型は必要ありません。
NOTE
スキーマ推測を使用する場合の CREATE TABLE の構文では、レプリカの数を設定することはできませんので、テーブルを作成する前に設定してください。以下の例は、1 つのレプリカを持つシステムの場合です:
ADMIN SET FRONTEND CONFIG ('default_replication_num' = "1");
データベースを作成し、それに切り替えます。
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
CTAS を使用してテーブルを作成し、サンプルデータセット s3://starrocks-examples/user-behavior-10-million-rows.parquet のデータをテーブルにロードします。
CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);
NOTE
上記のコマンドで
AAAとBBBをあなたの資格情報に置き換えてください。オブジェクトは AWS 認証済みユーザーであれば誰でも読み取れるため、有効なaws.s3.access_keyとaws.s3.secret_keyを使用できます。
テーブルを作成した後、DESCRIBE を使用してそのスキーマを表示できます。
DESCRIBE user_behavior_inferred;
システムは次のクエリ結果を返します。
+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+
テーブルにデータがロードされたことを確認するためにテーブルをクエリします。例:
SELECT * from user_behavior_inferred LIMIT 3;
次のクエリ結果が返され、データが正常にロードされたことを示しています。
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 225586 | 3694958 | 1040727 | pv | 2017-12-01 00:58:40 |
| 225586 | 3726324 | 965809 | pv | 2017-12-01 02:16:02 |
| 225586 | 3732495 | 1488813 | pv | 2017-12-01 00:59:46 |
+--------+---------+------------+--------------+---------------------+
INSERT を使用して既存のテーブルにロードする
挿入するテーブルをカスタマイズしたい場合があります。例えば:
- 列のデータ型、NULL 許可設定、またはデフォルト値
- キータイプと列
- データのパーティショニングとバケッティング
NOTE
最も効率的なテーブル構造を作成するには、データの使用方法と列の内容に関する知識が必要です。このトピックではテーブル設計については扱いません。テーブル設計については、テーブルタイプ を参照してください。
この例では、テーブルがどのようにクエリされるか、および Parquet ファイル内のデータに関する知識に基づいてテーブルを作成しています。Parquet ファイル内のデータに関する知識は、S3 内でファイルを直接クエリすることで得られます。
- S3 内のデータセットのクエリにより、
Timestamp列が VARCHAR データ型に一致するデータを含んでいることが示され、StarRocks は VARCHAR から DATETIME へのキャストが可能であるため、以下の DDL ではデータ型が DATETIME に変更されています。 - S3 内のデータをクエリすることで、データセットに
NULL値がないことがわかるため、DDL ではすべての列を非 NULL 許可として設定することもできます。 - 予想されるクエリタイプに基づいて、ソートキーとバケッティング列が
UserID列に設定されています。このデータに対するユースケースが異なる場合は、ソートキーとしてItemIDをUserIDと一緒に、または代わりに使用することを決定す るかもしれません。
データベースを作成し、それに切り替えます。
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手動でテーブルを作成します。
CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
スキーマを表示して、FILES() テーブル関数によって生成された推測スキーマと比較できるようにします。
DESCRIBE user_behavior_declared;
+--------------+----------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+----------------+------+-------+---------+-------+
| UserID | int | YES | true | NULL | |
| ItemID | int | YES | false | NULL | |
| CategoryID | int | YES | false | NULL | |
| BehaviorType | varchar(65533) | YES | false | NULL | |
| Timestamp | datetime | YES | false | NULL | |
+--------------+----------------+------+-------+---------+-------+
先ほど作成したスキーマと、FILES() テーブル関数を使用して推測されたスキーマを比較してください。以下の点に注目してください:
- データ型
- NULL 許可
- キーフィールド
本番環境では、宛先テーブルのスキーマを手動で指定することで、より良いクエリパフォーマンスを得ることができます。
テーブルを作成した後、INSERT INTO SELECT FROM FILES() を使用してロードできます。
INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user-behavior-10-million-rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);
NOTE
上記のコマンドで
AAAとBBBをあなたの資格情報に置き換えてください。オブジェクトは AWS 認証済みユーザーであれば誰でも読み取れるため、有効なaws.s3.access_keyとaws.s3.secret_keyを使用できます。
ロードが完了した後、テーブルをクエリしてデータがロードされたことを確認できます。例:
SELECT * from user_behavior_declared LIMIT 3;
次のクエリ結果が返され、データが正常にロードされたことを示しています。
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 393529 | 3715112 | 883960 | pv | 2017-12-02 02:45:44 |
| 393529 | 2650583 | 883960 | pv | 2017-12-02 02:45:59 |
| 393529 | 3715112 | 883960 | pv | 2017-12-02 03:00:56 |
+--------+---------+------------+--------------+---------------------+
ロード進捗の確認
StarRocks Information Schema の loads ビューから INSERT ジョブの進捗をクエリできます。この機能は v3.1 以降でサポートされています。例:
SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;
loads ビューで提供されるフィールドに関する情報は、loads を参照してください。
複数のロードジョブを送信した場合は、ジョブに関連付けられた LABEL でフィルタリングできます。例:
SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
NOTE
INSERT は同期コマンドです。INSERT ジョブがまだ実行中の場合、その実行ステータスを確認するには別のセッションを開く必要があります。
Broker Load の使用
非同期の Broker Load プロセスは、S3 への接続を処理し、データを取得し、StarRocks にデータを保存します。
この方法は次のファイル形式をサポートしています:
- Parquet
- ORC
- CSV
- JSON(v3.2.3 以降でサポート)
Broker Load の利点
- Broker Load はバックグラウンドで実行され、ジョブが続行するためにクライアントが接続を維持する必要はありません。
- Broker Load は長時間実行されるジョブに推奨され、デフォルトのタイムアウトは 4 時間です。
- Parquet および ORC ファイル形式に加えて、Broker Load は CSV ファイル形式および JSON ファイル形式(JSON ファイル形式は v3.2.3 以降でサポート)をサポートしています。
データフロー

- ユーザーがロードジョブを作成します。
- フロントエンド (FE) がクエリプランを作成し、バックエンドノード (BEs) またはコンピュートノード (CNs) にプランを配布します。
- BEs または CNs がソースからデータを取得し、StarRocks にデータをロードします。
典型的な例
テーブルを作成し、S3 からサンプルデータセット s3://starrocks-examples/user-behavior-10-million-rows.parquet を取得するロードプロセスを開始し、データロードの進捗と成功を確認します。
データベースとテーブルの作成
データベースを作成し、それに切り替えます。
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
手動でテーブルを作成します(AWS S3 からロードしたい Parquet ファイルと同じスキーマを持つことをお勧めします)。
CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);
Broker Load の開始
次のコマンドを実行して、サンプルデータセット s3://starrocks-examples/user-behavior-10-million-rows.parquet から user_behavior テーブルにデータをロードする Broker Load ジョブを開始します。
LOAD LABEL user_behavior
(
DATA INFILE("s3://starrocks-examples/user-behavior-10-million-rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(
"aws.s3.enable_ssl" = "true",
"aws.s3.use_instance_profile" = "false",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
PROPERTIES
(
"timeout" = "72000"
);
NOTE
上記のコマンドで
AAAとBBBをあなたの資格情報に置き換えてください。オブジェクトは AWS 認証済みユーザーであれば誰でも読み取れるため、有効なaws.s3.access_keyとaws.s3.secret_keyを使用できます。
このジョブには 4 つの主要なセクションがあります:
LABEL: ロードジョブの状態をクエリする際に使用される文字列。LOAD宣言: ソース URI、ソースデータ形式、および宛先テーブル名。BROKER: ソースの接続詳細。PROPERTIES: タイムアウト値およびロードジョブに適用するその他のプロパテ ィ。
詳細な構文とパラメータの説明については、BROKER LOAD を参照してください。
ロード進捗の確認
StarRocks Information Schema の loads ビューから Broker Load ジョブの進捗をクエリできます。この機能は v3.1 以降でサポートされています。
SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';
loads ビューで提供されるフィールドに関する情報は、loads を参照してください。
このレコードは LOADING の状態を示し、進捗は 39% です。類似の状態が表示された場合、FINISHED の状態が表示されるまでコマンドを再実行してください。
JOB_ID: 10466
LABEL: user_behavior
DATABASE_NAME: mydatabase
STATE: LOADING
PROGRESS: ETL:100%; LOAD:39%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 4620288
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 4620288
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):72000; max_filter_ratio:0.0
CREATE_TIME: 2024-02-28 22:11:36
ETL_START_TIME: 2024-02-28 22:11:41
ETL_FINISH_TIME: 2024-02-28 22:11:41
LOAD_START_TIME: 2024-02-28 22:11:41
LOAD_FINISH_TIME: NULL
JOB_DETAILS: {"All backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]},"FileNumber":1,"FileSize":136901706,"InternalTableLoadBytes":144032784,"InternalTableLoadRows":4620288,"ScanBytes":143969616,"ScanRows":4620288,"TaskNumber":1,"Unfinished backends":{"2fb97223-b14c-404b-9be1-83aa9b3a7715":[10004]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
ロードジ ョブが完了したことを確認した後、宛先テーブルのサブセットを確認してデータが正常にロードされたかどうかを確認できます。例:
SELECT * from user_behavior LIMIT 3;
次のクエリ結果が返され、データが正常にロードされたことを示しています。
+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 34 | 856384 | 1029459 | pv | 2017-11-27 14:43:27 |
| 34 | 5079705 | 1029459 | pv | 2017-11-27 14:44:13 |
| 34 | 4451615 | 1029459 | pv | 2017-11-27 14:45:52 |
+--------+---------+------------+--------------+---------------------+
Pipe の使用
v3.2 以降、StarRocks は Pipe ロード方法を提供しており、現在は Parquet および ORC ファイル形式のみをサポートしています。
Pipe の利点
Pipe は、継続的なデータロードと大規模なデータロードに最適です。
-
マイクロバッチによる大規模なデータロードは、データエラーによるリトライのコストを削減します。
Pipe を利用することで、StarRocks は大量のデータファイルを効率的にロードできます。Pipe はファイルの数やサイズに基づいて自動的にファイルを分割し、ロードジョブを小さな連続タスクに分解します。このアプローチにより、1 つのファイルのエラーが全体のロードジョブに影響を与えま せん。各ファイルのロードステータスは Pipe によって記録され、エラーを含むファイルを簡単に特定して修正できます。データエラーによるリトライの必要性を最小限に抑えることで、コスト削減に寄与します。
-
継続的なデータロードは、人手を削減します。
Pipe は、新しいまたは更新されたデータファイルを特定の場所に書き込み、これらのファイルから新しいデータを継続的に StarRocks にロードします。
"AUTO_INGEST" = "TRUE"を指定して Pipe ジョブを作成すると、指定されたパスに保存されたデータファイルの変更を常に監視し、データファイルから新しいまたは更新されたデータを自動的に目的の StarRocks テーブルにロードします。
さらに、Pipe はファイルの一意性チェックを行い、重複したデータロードを防ぎます。ロードプロセス中、Pipe はファイル名とダイジェストに基づいて各データファイルの一意性をチェックします。特定のファイル名とダイジェストを持つファイルがすでに Pipe ジョブによって処理されている場合、Pipe ジョブは同じファイル名とダイジェストを持つ後続のすべてのファイルをスキップします。 object storage like AWS S3 uses ETag をファイルダイジェストとして注意してください。
各データファイルのロードステータスは information_schema.pipe_files ビューに記録され保存されます。このビューに関連付けられた Pipe ジョブが削除されると、そのジョブでロードされたファイルに関する記録も削除されます。