Apache Hudi Lakehouse
概要
- Docker compose を使用して Object Storage、Apache Spark、Hudi、StarRocks をデプロイ
- Apache Spark を使用して Hudi に小さなデータセットをロード
- StarRocks を設定して、external catalog を使用して Hive Metastore にアクセス
- データが存在する場所で StarRocks を使用してデータをクエリ
In addition to efficient analytics of local data, StarRocks can work as the compute engine to analyze data stored in data lakes such as Apache Hudi, Apache Iceberg, and Delta Lake. One of the key features of StarRocks is its external catalog, which acts as the linkage to an externally maintained metastore. This functionality provides users with the capability to query external data sources seamlessly, eliminating the need for data migration. As such, users can analyze data from different systems such as HDFS and Amazon S3, in various file formats such as Parquet, ORC, and CSV, etc.
The preceding figure shows a data lake analytics scenario where StarRocks is responsible for data computing and analysis, and the data lake is responsible for data storage, organization, and maintenance. Data lakes allow users to store data in open storage formats and use flexible schemas to produce reports on "single source of truth" for various BI, AI, ad-hoc, and reporting use cases. StarRocks fully leverages the advantages of its vectorization engine and CBO, significantly improving the performance of data lake analytics.
前提条件
StarRocks demo
リポジトリ
StarRocks demo リポジトリ をローカルマシンにクローンします。
このガイドのすべての手順は、クローンした demo
GitHub リポジトリの demo/documentation-samples/hudi/
ディレクトリから実行されます。
Docker
- Docker セットアップ: Mac の場合、Install Docker Desktop on Mac に定義されている手順に従ってください。Spark-SQL クエリを実行するには、少なくとも 5 GB のメモリと 4 つの CPU が Docker に割り当てられていることを確認してください (Docker → Preferences → Advanced を参照)。そうしないと、メモリの問題で Spark-SQL クエリが終了する可能性があります。
- Docker に割り当てられた 20 GB の空きディスクスペース
SQL クライアント
Docker 環境で提供される SQL クライアントを使用するか、システム上のものを使用できます。多くの MySQL 互換クライアントが動作します。
設定
demo/documentation-samples/hudi
ディレクトリに移動し、ファイルを確認します。これは Hudi のチュートリアルではないため、すべての設定ファイルが説明されるわけではありませんが、設定がどのように行われているかを確認するためにどこを見ればよいかを知っておくことが重要です。hudi/
ディレクトリには、Docker でサービスを起動および設定するために使用される docker-compose.yml
ファイルがあります。以下はそのサービスの一覧と簡単な説明です。
Docker サービス
サービス | 責任 |
---|---|
starrocks-fe | メタデータ管理、クライアント接続、クエリプランとスケジューリング |
starrocks-be | クエリプランの実行 |
metastore_db | Hive メタデータを保存するために使用される Postgres DB |
hive_metastore | Apache Hive メタストアを提供 |
minio と mc | MinIO Object Storage と MinIO コマンドラインクライアント |
spark-hudi | MinIO Object Storage |
設定ファイル
hudi/conf/
ディレクトリには、spark-hudi
コンテナにマウントされる設定ファイルがあります。
core-site.xml
このファイルには、オブジェクトストレージに関連する設定が含まれています。詳細については、このドキュメントの最後にリンクがあります。
spark-defaults.conf
Hive、MinIO、および Spark SQL の設定。
hudi-defaults.conf
spark-shell
の警告を抑制するために使用されるデフォルトファイル。
hadoop-metrics2-hbase.properties
spark-shell
の警告を抑制するために使用される空のファイル。
hadoop-metrics2-s3a-file-system.properties
spark-shell
の警告を抑制するために使用される空のファイル。
デモクラスターの起動
このデモシステムは、StarRocks、Hudi、MinIO、および Spark サービスで構成されています。Docker compose を実行してクラスターを起動します。
docker compose up --detach --wait --wait-timeout 60
[+] Running 8/8
✔ Network hudi Created 0.0s
✔ Container hudi-starrocks-fe-1 Healthy 0.1s
✔ Container hudi-minio-1 Healthy 0.1s
✔ Container hudi-metastore_db-1 Healthy 0.1s
✔ Container hudi-starrocks-be-1 Healthy 0.0s
✔ Container hudi-mc-1 Healthy 0.0s
✔ Container hudi-hive-metastore-1 Healthy 0.0s
✔ Container hudi-spark-hudi-1 Healthy 0.1s
多くのコンテナが実行されている場合、docker compose ps
の出力は jq
にパイプすると読みやすくなります。
docker compose ps --format json | \
jq '{Service: .Service, State: .State, Status: .Status}'
{
"Service": "hive-metastore",
"State": "running",
"Status": "Up About a minute (healthy)"
}
{
"Service": "mc",
"State": "running",
"Status": "Up About a minute"
}
{
"Service": "metastore_db",
"State": "running",
"Status": "Up About a minute"
}
{
"Service": "minio",
"State": "running",
"Status": "Up About a minute"
}
{
"Service": "spark-hudi",
"State": "running",
"Status": "Up 33 seconds (healthy)"
}
{
"Service": "starrocks-be",
"State": "running",
"Status": "Up About a minute (healthy)"
}
{
"Service": "starrocks-fe",
"State": "running",
"Status": "Up About a minute (healthy)"
}
MinIO の設定
Spark コマンドを実行するときに、作成されるテーブルの basepath を s3a
URI に設定します。
val basePath = "s3a://huditest/hudi_coders"
このステップでは、MinIO にバケット huditest
を作成します。MinIO コンソールはポート 9000
で実行されています。
MinIO に認証
ブラウザを開いて http://localhost:9000/ にアクセスし、認証します。ユーザー名とパスワードは docker-compose.yml
に指定されており、admin
と password
です。
バケットを作成
左側のナビゲーションで Buckets を選択し、Create Bucket + を選択します。バケット名を huditest
とし、Create Bucket を選択します。
テーブルを作成してデータを投入し、Hive に同期
このコマンドや他の docker compose
コマンドは、docker-compose.yml
ファイルがあるディレクトリから実行してください。
spark-hudi
サービスで spark-shell
を開きます。
docker compose exec spark-hudi spark-shell
spark-shell
の起動時に不正なリフレクティブアクセスに関する警告が表示されますが、これらの警告は無視して構いません。
これらのコマンドを scala>
プロンプトで実行して以下を行います。
- この Spark セッションをデータのロード、処理、および書き込みに設定
- データフレームを作成し、それを Hudi テーブルに書き込み
- Hive Metastore に同期
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._
val schema = StructType( Array(
StructField("language", StringType, true),
StructField("users", StringType, true),
StructField("id", StringType, true)
))
val rowData= Seq(Row("Java", "20000", "a"),
Row("Python", "100000", "b"),
Row("Scala", "3000", "c"))
val df = spark.createDataFrame(rowData,schema)
val databaseName = "hudi_sample"
val tableName = "hudi_coders_hive"
val basePath = "s3a://huditest/hudi_coders"
df.write.format("hudi").
option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
option(RECORDKEY_FIELD_OPT_KEY, "id").
option(PARTITIONPATH_FIELD_OPT_KEY, "language").
option(PRECOMBINE_FIELD_OPT_KEY, "users").
option("hoodie.datasource.write.hive_style_partitioning", "true").
option("hoodie.datasource.hive_sync.enable", "true").
option("hoodie.datasource.hive_sync.mode", "hms").
option("hoodie.datasource.hive_sync.database", databaseName).
option("hoodie.datasource.hive_sync.table", tableName).
option("hoodie.datasource.hive_sync.partition_fields", "language").
option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
mode(Overwrite).
save(basePath)
System.exit(0)
次の警告が表示されます。
WARN
org.apache.hudi.metadata.HoodieBackedTableMetadata -
Metadata table was not found at path
s3a://huditest/hudi_coders/.hoodie/metadata
これは無視して構いません。このファイルはこの spark-shell
セッション中に自動的に作成されます。
また、次の警告も表示されます。
78184 [main] WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream -
Application invoked the Syncable API against stream writing to
hudi_coders/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0.
This is unsupported
この警告は、オブジェクトストレージを使用している場合、書き込み中のログファイルの同期がサポートされていないことを示しています。ファイルは閉じられたときにのみ同期されます。詳細は Stack Overflow を参照してください。
上記の spark-shell セッションの最後のコマンドはコンテナを終了するはずですが、終了しない場合は Enter キーを押すと終了します。
StarRocks の設定
StarRocks に接続
starrocks-fe
サービスが提供する MySQL クライアントを使用して StarRocks に接続するか、お気に入りの SQL クライアントを使用して、MySQL プロトコルを使用して localhost:9030
に接続するように設定します。
docker compose exec starrocks-fe \
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
StarRocks と Hudi のリンクを作成
このガイドの最後に external catalog に関する詳細情報へのリンクがあります。このステップで作成される external catalog は、Docker で実行されている Hive Metastore (HMS) へのリンクとして機能します。
CREATE EXTERNAL CATALOG hudi_catalog_hms
PROPERTIES
(
"type" = "hudi",
"hive.metastore.type" = "hive",
"hive.metastore.uris" = "thrift://hive-metastore:9083",
"aws.s3.use_instance_profile" = "false",
"aws.s3.access_key" = "admin",
"aws.s3.secret_key" = "password",
"aws.s3.enable_ssl" = "false",
"aws.s3.enable_path_style_access" = "true",
"aws.s3.endpoint" = "http://minio:9000"
);
Query OK, 0 rows affected (0.59 sec)
新しいカタログを使用
SET CATALOG hudi_catalog_hms;
Query OK, 0 rows affected (0.01 sec)
Spark で挿入されたデータに移動
SHOW DATABASES;
+--------------------+
| Database |
+--------------------+
| default |
| hudi_sample |
| information_schema |
+--------------------+
2 rows in set (0.40 sec)
USE hudi_sample;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
SHOW TABLES;
+-----------------------+
| Tables_in_hudi_sample |
+-----------------------+
| hudi_coders_hive |
+-----------------------+
1 row in set (0.07 sec)
StarRocks で Hudi のデータをクエリ
このクエリを 2 回実行します。最初のクエリは、StarRocks にデータがまだキャッシュされていないため、完了するのに約 5 秒かかる場合があります。2 回目のクエリは非常に迅速に完了します。
SELECT * from hudi_coders_hive\G
StarRocks ドキュメントの一部の SQL クエリは、セミコロンの代わりに \G
で終了します。mysql
CLI では、 \G
はクエリ結果を縦に表示させます。
多くの SQL クライアントは縦のフォーマット出力を解釈しないため、mysql
CLI を使用していない場合は \G
を ;
に置き換える必要があります。
*************************** 1. row ***************************
_hoodie_commit_time: 20240208165522561
_hoodie_commit_seqno: 20240208165522561_0_0
_hoodie_record_key: c
_hoodie_partition_path: language=Scala
_hoodie_file_name: bb29249a-b69d-4c32-843b-b7142d8dc51c-0_0-27-1221_20240208165522561.parquet
language: Scala
users: 3000
id: c
*************************** 2. row ***************************
_hoodie_commit_time: 20240208165522561
_hoodie_commit_seqno: 20240208165522561_2_0
_hoodie_record_key: a
_hoodie_partition_path: language=Java
_hoodie_file_name: 12fc14aa-7dc4-454c-b710-1ad0556c9386-0_2-27-1223_20240208165522561.parquet
language: Java
users: 20000
id: a
*************************** 3. row ***************************
_hoodie_commit_time: 20240208165522561
_hoodie_commit_seqno: 20240208165522561_1_0
_hoodie_record_key: b
_hoodie_partition_path: language=Python
_hoodie_file_name: 51977039-d71e-4dd6-90d4-0c93656dafcf-0_1-27-1222_20240208165522561.parquet
language: Python
users: 100000
id: b
3 rows in set (0.15 sec)
まとめ
このチュートリアルでは、StarRocks external catalog を使用して、Hudi external catalog を使用してデータをそのままクエリできることを示しました。他にも Iceberg、Delta Lake、JDBC カタログを使用した多くの統合が利用可能です。
このチュートリアルで行ったこと:
- Docker で StarRocks と Hudi/Spark/MinIO 環境をデプロイ
- Apache Spark を使用して Hudi に小さなデータセットをロード
- Hudi カタログへのアクセスを提供するために StarRocks external catalog を設定
- データレイクからデータをコピーせずに StarRocks で SQL を使用してデータをクエリ
詳細情報
Apache Hudi quickstart (Spark を含む)