メインコンテンツまでスキップ
バージョン: Stable-3.3

Iceberg Lakehouse tutorial

Apache Iceberg Lakehouse

概要

  • Docker compose を使用して Object Storage、Apache Spark、Iceberg catalog、StarRocks をデプロイ
  • 2023 年 5 月のニューヨーク市グリーンタクシーデータを Iceberg データレイクにロード
  • StarRocks を Iceberg catalog にアクセスできるように設定
  • データが存在する場所で StarRocks を使用してデータをクエリ

DLA

StarRocks は、ローカルデータの効率的な分析に加えて、データレイクに保存されたデータを分析するためのコンピュートエンジンとしても機能します。Apache Hudi、Apache Iceberg、Delta Lake などのデータレイクに対応しています。StarRocks の主要な機能の一つは、その external catalog であり、外部で管理されているメタストアへのリンクとして機能します。この機能により、データ移行の必要なく外部データソースをシームレスにクエリすることが可能です。そのため、HDFS や Amazon S3 などの異なるシステムから、Parquet、ORC、CSV などのさまざまなファイル形式でデータを分析できます。

前述の図は、StarRocks がデータの計算と分析を担当し、データレイクがデータの保存、組織化、およびメンテナンスを担当するデータレイク分析のシナリオを示しています。データレイクは、ユーザーがオープンストレージ形式でデータを保存し、柔軟なスキーマを使用して、さまざまな BI、AI、アドホック、およびレポート用途の「単一の真実の源」に基づくレポートを作成することを可能にします。StarRocks は、そのベクトル化エンジンと CBO の利点を十分に活用し、データレイク分析のパフォーマンスを大幅に向上させます。

前提条件

Docker

  • Docker
  • Docker に割り当てられた 5 GB の RAM
  • Docker に割り当てられた 20 GB の空きディスクスペース

SQL クライアント

Docker 環境で提供される SQL クライアントを使用するか、システム上のものを使用できます。多くの MySQL 互換クライアントが動作し、このガイドでは DBeaver と MySQL Workbench の設定をカバーしています。

curl

curl はデータセットをダウンロードするために使用されます。OS のプロンプトで curl または curl.exe を実行してインストールされているか確認してください。curl がインストールされていない場合は、こちらから curl を取得してください


StarRocks 用語

FE

フロントエンドノードは、メタデータ管理、クライアント接続管理、クエリプランニング、クエリスケジューリングを担当します。各 FE はメモリ内にメタデータの完全なコピーを保存および維持し、FEs 間での無差別なサービスを保証します。

BE

バックエンド (BE) ノードは、データストレージと共有なしデプロイメントでのクエリプランの実行の両方を担当します。外部 catalog (このガイドで使用される Iceberg catalog など) が使用される場合、BE ノードは外部 catalog からデータをキャッシュしてクエリを高速化できます。


環境

このガイドでは 6 つのコンテナ (サービス) を使用し、すべて Docker compose でデプロイされます。サービスとその責任は次のとおりです。

サービス責任
starrocks-feメタデータ管理、クライアント接続、クエリプランとスケジューリング
starrocks-beクエリプランの実行
restIceberg catalog (メタデータサービス) の提供
spark-icebergPySpark を実行するための Apache Spark 環境
mcMinIO 設定 (MinIO コマンドラインクライアント)
minioMinIO Object Storage

Docker 構成と NYC グリーンタクシーデータのダウンロード

3 つの必要なコンテナを備えた環境を提供するために、StarRocks は Docker compose ファイルを提供します。curl を使用して compose ファイルとデータセットをダウンロードします。

Docker compose ファイル:

mkdir iceberg
cd iceberg
curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/iceberg/docker-compose.yml

データセット:

curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/iceberg/datasets/green_tripdata_2023-05.parquet

Docker で環境を開始

ヒント

このコマンドや他の docker compose コマンドは、docker-compose.yml ファイルを含むディレクトリから実行してください。

docker compose up -d
[+] Building 0.0s (0/0)                     docker:desktop-linux
[+] Running 6/6
✔ Container iceberg-rest Started 0.0s
✔ Container minio Started 0.0s
✔ Container starrocks-fe Started 0.0s
✔ Container mc Started 0.0s
✔ Container spark-iceberg Started 0.0s
✔ Container starrocks-be Started

環境のステータスを確認

サービスの進行状況を確認します。FE と BE が正常になるまで約 30 秒かかります。

docker compose ps を実行して、FE と BE が healthy のステータスを示すまで待ちます。残りのサービスにはヘルスチェック構成がありませんが、それらと対話することで動作しているかどうかがわかります。

ヒント

jq がインストールされていて、docker compose ps の短いリストを好む場合は、次を試してください。

docker compose ps --format json | jq '{Service: .Service, State: .State, Status: .Status}'
docker compose ps
SERVICE         CREATED         STATUS                   PORTS
rest 4 minutes ago Up 4 minutes 0.0.0.0:8181->8181/tcp
mc 4 minutes ago Up 4 minutes
minio 4 minutes ago Up 4 minutes 0.0.0.0:9000-9001->9000-9001/tcp
spark-iceberg 4 minutes ago Up 4 minutes 0.0.0.0:8080->8080/tcp, 0.0.0.0:8888->8888/tcp, 0.0.0.0:10000-10001->10000-10001/tcp
starrocks-be 4 minutes ago Up 4 minutes (healthy) 0.0.0.0:8040->8040/tcp
starrocks-fe 4 minutes ago Up 4 minutes (healthy) 0.0.0.0:8030->8030/tcp, 0.0.0.0:9020->9020/tcp, 0.0.0.0:9030->9030/tcp

PySpark

Iceberg と対話する方法はいくつかありますが、このガイドでは PySpark を使用します。PySpark に慣れていない場合は、詳細情報セクションにリンクされているドキュメントがありますが、実行する必要があるすべてのコマンドが以下に提供されています。

グリーンタクシーデータセット

データを spark-iceberg コンテナにコピーします。このコマンドは、データセットファイルを spark-iceberg サービスの /opt/spark/ ディレクトリにコピーします。

docker compose \
cp green_tripdata_2023-05.parquet spark-iceberg:/opt/spark/

PySpark の起動

このコマンドは spark-iceberg サービスに接続し、pyspark コマンドを実行します。

docker compose exec -it spark-iceberg pyspark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.0
/_/

Using Python version 3.9.18 (main, Nov 1 2023 11:04:44)
Spark context Web UI available at http://6ad5cb0e6335:4041
Spark context available as 'sc' (master = local[*], app id = local-1701967093057).
SparkSession available as 'spark'.
>>>

データセットをデータフレームに読み込む

データフレームは Spark SQL の一部であり、データベーステーブルやスプレッドシートに似たデータ構造を提供します。

グリーンタクシーデータは NYC Taxi and Limousine Commission によって Parquet 形式で提供されます。/opt/spark ディレクトリからファイルをロードし、最初の数レコードを最初の 3 行のデータの最初の数列を SELECT して確認します。これらのコマンドは pyspark セッションで実行する必要があります。コマンド:

  • ディスクからデータセットファイルを df という名前のデータフレームに読み込む
  • Parquet ファイルのスキーマを表示
df = spark.read.parquet("/opt/spark/green_tripdata_2023-05.parquet")
df.printSchema()
root
|-- VendorID: integer (nullable = true)
|-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
|-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
|-- store_and_fwd_flag: string (nullable = true)
|-- RatecodeID: long (nullable = true)
|-- PULocationID: integer (nullable = true)
|-- DOLocationID: integer (nullable = true)
|-- passenger_count: long (nullable = true)
|-- trip_distance: double (nullable = true)
|-- fare_amount: double (nullable = true)
|-- extra: double (nullable = true)
|-- mta_tax: double (nullable = true)
|-- tip_amount: double (nullable = true)
|-- tolls_amount: double (nullable = true)
|-- ehail_fee: double (nullable = true)
|-- improvement_surcharge: double (nullable = true)
|-- total_amount: double (nullable = true)
|-- payment_type: long (nullable = true)
|-- trip_type: long (nullable = true)
|-- congestion_surcharge: double (nullable = true)

>>>

最初の数 (7) 列の最初の数 (3) 行のデータを確認します。

df.select(df.columns[:7]).show(3)
+--------+--------------------+---------------------+------------------+----------+------------+------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|
+--------+--------------------+---------------------+------------------+----------+------------+------------+
| 2| 2023-05-01 00:52:10| 2023-05-01 01:05:26| N| 1| 244| 213|
| 2| 2023-05-01 00:29:49| 2023-05-01 00:50:11| N| 1| 33| 100|
| 2| 2023-05-01 00:25:19| 2023-05-01 00:32:12| N| 1| 244| 244|
+--------+--------------------+---------------------+------------------+----------+------------+------------+
only showing top 3 rows

テーブルに書き込む

このステップで作成されたテーブルは、次のステップで StarRocks に提供される catalog に含まれます。

  • Catalog: demo
  • Database: nyc
  • Table: greentaxis
df.writeTo("demo.nyc.greentaxis").create()

StarRocks を Iceberg Catalog にアクセスできるように設定

PySpark からは今すぐ退出するか、SQL コマンドを実行するために新しいターミナルを開くことができます。新しいターミナルを開く場合は、続行する前に docker-compose.yml ファイルを含む quickstart ディレクトリにディレクトリを変更してください。

SQL クライアントで StarRocks に接続

SQL クライアント

これらの3つのクライアントはこのチュートリアルでテストされています。1つだけ必要です。

  • mysql CLI: Docker環境またはあなたのマシンから実行できます。
  • DBeaver はコミュニティバージョンとProバージョンがあります。
  • MySQL Workbench

クライアントの設定

mysql CLIを使用する最も簡単な方法は、StarRocksコンテナ starrocks-fe から実行することです。

docker compose exec starrocks-fe \
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
ヒント

すべての docker compose コマンドは、docker-compose.yml ファイルを含むディレクトリから実行する必要があります。

mysql CLIをインストールしたい場合は、以下の mysql client install を展開してください。

mysql client install
  • macOS: Homebrew を使用していて MySQL Server が不要な場合、brew install mysql-client@8.0 を実行して CLI をインストールします。
  • Linux: mysql クライアントをリポジトリシステムで確認します。例えば、yum install mariadb
  • Microsoft Windows: MySQL Community Server をインストールして提供されたクライアントを実行するか、WSL から mysql を実行します。

PySpark セッションを終了し、StarRocks に接続できます。

ヒント

このコマンドは、docker-compose.yml ファイルを含むディレクトリから実行してください。

mysql CLI 以外のクライアントを使用している場合は、今すぐ開いてください。

docker compose exec starrocks-fe \
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
StarRocks >

外部 catalog を作成

外部 catalog は、StarRocks が Iceberg データを StarRocks のデータベースやテーブルにあるかのように操作できるようにする構成です。個々の構成プロパティはコマンドの後に詳述されます。

CREATE EXTERNAL CATALOG 'iceberg'
COMMENT "External catalog to Apache Iceberg on MinIO"
PROPERTIES
(
"type"="iceberg",
"iceberg.catalog.type"="rest",
"iceberg.catalog.uri"="http://iceberg-rest:8181",
"iceberg.catalog.warehouse"="warehouse",
"aws.s3.access_key"="admin",
"aws.s3.secret_key"="password",
"aws.s3.endpoint"="http://minio:9000",
"aws.s3.enable_path_style_access"="true",
"client.factory"="com.starrocks.connector.iceberg.IcebergAwsClientFactory"
);

PROPERTIES

プロパティ説明
typeこの例ではタイプは iceberg です。他のオプションには Hive、Hudi、Delta Lake、JDBC があります。
iceberg.catalog.typeこの例では rest が使用されます。Tabular は使用される Docker イメージを提供し、Tabular は REST を使用します。
iceberg.catalog.uriREST サーバーのエンドポイント。
iceberg.catalog.warehouseIceberg catalog の識別子。この場合、compose ファイルで指定されたウェアハウス名は warehouse です。
aws.s3.access_keyMinIO キー。この場合、キーとパスワードは compose ファイルで admin に設定されています。
aws.s3.secret_keyパスワード。
aws.s3.endpointMinIO エンドポイント。
aws.s3.enable_path_style_accessMinIO を Object Storage として使用する場合、これが必要です。MinIO はこの形式 http://host:port/<bucket_name>/<key_name> を期待します。
client.factoryこのプロパティを iceberg.IcebergAwsClientFactory に設定することで、aws.s3.access_keyaws.s3.secret_key パラメータが認証に使用されます。
SHOW CATALOGS;
+-----------------+----------+------------------------------------------------------------------+
| Catalog | Type | Comment |
+-----------------+----------+------------------------------------------------------------------+
| default_catalog | Internal | An internal catalog contains this cluster's self-managed tables. |
| iceberg | Iceberg | External catalog to Apache Iceberg on MinIO |
+-----------------+----------+------------------------------------------------------------------+
2 rows in set (0.03 sec)
SET CATALOG iceberg;
SHOW DATABASES;
ヒント

表示されるデータベースは、PySpark セッションで作成されました。CATALOG iceberg を追加したときに、データベース nyc が StarRocks で表示可能になりました。

+--------------------+
| Database |
+--------------------+
| information_schema |
| nyc |
+--------------------+
2 rows in set (0.07 sec)
USE nyc;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
SHOW TABLES;
+---------------+
| Tables_in_nyc |
+---------------+
| greentaxis |
+---------------+
1 rows in set (0.05 sec)
DESCRIBE greentaxis;
ヒント

StarRocks が使用するスキーマを、以前の PySpark セッションからの df.printSchema() の出力と比較してください。Spark の timestamp_ntz データ型は StarRocks の DATETIME などとして表されます。

+-----------------------+---------------------+------+-------+---------+-------+---------+
| Field | Type | Null | Key | Default | Extra | Comment |
+-----------------------+---------------------+------+-------+---------+-------+---------+
| VendorID | INT | Yes | false | NULL | | NULL |
| lpep_pickup_datetime | DATETIME | Yes | false | NULL | | NULL |
| lpep_dropoff_datetime | DATETIME | Yes | false | NULL | | NULL |
| store_and_fwd_flag | VARCHAR(1073741824) | Yes | false | NULL | | NULL |
| RatecodeID | BIGINT | Yes | false | NULL | | NULL |
| PULocationID | INT | Yes | false | NULL | | NULL |
| DOLocationID | INT | Yes | false | NULL | | NULL |
| passenger_count | BIGINT | Yes | false | NULL | | NULL |
| trip_distance | DOUBLE | Yes | false | NULL | | NULL |
| fare_amount | DOUBLE | Yes | false | NULL | | NULL |
| extra | DOUBLE | Yes | false | NULL | | NULL |
| mta_tax | DOUBLE | Yes | false | NULL | | NULL |
| tip_amount | DOUBLE | Yes | false | NULL | | NULL |
| tolls_amount | DOUBLE | Yes | false | NULL | | NULL |
| ehail_fee | DOUBLE | Yes | false | NULL | | NULL |
| improvement_surcharge | DOUBLE | Yes | false | NULL | | NULL |
| total_amount | DOUBLE | Yes | false | NULL | | NULL |
| payment_type | BIGINT | Yes | false | NULL | | NULL |
| trip_type | BIGINT | Yes | false | NULL | | NULL |
| congestion_surcharge | DOUBLE | Yes | false | NULL | | NULL |
+-----------------------+---------------------+------+-------+---------+-------+---------+
20 rows in set (0.03 sec)
ヒント

StarRocks のドキュメントにある一部の SQL クエリは、セミコロンの代わりに \G で終わります。\G は mysql CLI にクエリ結果を縦に表示させます。

多くの SQL クライアントは縦書きのフォーマット出力を解釈しないため、mysql CLI を使用していない場合は \G; に置き換えるべきです。

StarRocks でクエリ

ピックアップ日時のフォーマットを確認

SELECT lpep_pickup_datetime FROM greentaxis LIMIT 10;
+----------------------+
| lpep_pickup_datetime |
+----------------------+
| 2023-05-01 00:52:10 |
| 2023-05-01 00:29:49 |
| 2023-05-01 00:25:19 |
| 2023-05-01 00:07:06 |
| 2023-05-01 00:43:31 |
| 2023-05-01 00:51:54 |
| 2023-05-01 00:27:46 |
| 2023-05-01 00:27:14 |
| 2023-05-01 00:24:14 |
| 2023-05-01 00:46:55 |
+----------------------+
10 rows in set (0.07 sec)

混雑する時間帯を見つける

このクエリは、1 日の時間ごとに旅行を集計し、1 日の最も混雑する時間が 18:00 であることを示します。

SELECT COUNT(*) AS trips,
hour(lpep_pickup_datetime) AS hour_of_day
FROM greentaxis
GROUP BY hour_of_day
ORDER BY trips DESC;
+-------+-------------+
| trips | hour_of_day |
+-------+-------------+
| 5381 | 18 |
| 5253 | 17 |
| 5091 | 16 |
| 4736 | 15 |
| 4393 | 14 |
| 4275 | 19 |
| 3893 | 12 |
| 3816 | 11 |
| 3685 | 13 |
| 3616 | 9 |
| 3530 | 10 |
| 3361 | 20 |
| 3315 | 8 |
| 2917 | 21 |
| 2680 | 7 |
| 2322 | 22 |
| 1735 | 23 |
| 1202 | 6 |
| 1189 | 0 |
| 806 | 1 |
| 606 | 2 |
| 513 | 3 |
| 451 | 5 |
| 408 | 4 |
+-------+-------------+
24 rows in set (0.08 sec)

まとめ

このチュートリアルでは、StarRocks の外部 catalog を使用して、Iceberg REST catalog を使用してデータをそのままクエリできることを示しました。Hive、Hudi、Delta Lake、JDBC catalog を使用した他の多くの統合が利用可能です。

このチュートリアルでは以下を行いました:

  • Docker で StarRocks と Iceberg/PySpark/MinIO 環境をデプロイ
  • Iceberg catalog へのアクセスを提供するために StarRocks の外部 catalog を設定
  • ニューヨーク市が提供するタクシーデータを Iceberg データレイクにロード
  • データレイクからデータをコピーせずに StarRocks で SQL を使用してデータをクエリ

詳細情報

StarRocks Catalogs

Apache Iceberg documentationQuickstart (includes PySpark)

Green Taxi Trip Records データセットは、ニューヨーク市によって提供され、これらの 利用規約 および プライバシーポリシー に従います。

StarRocks Assistant

AI generated answers are based on docs and other sources. Please test answers in non-production environments.