Apache Hudi Lakehouse
概要
- Docker compose を使用して Object Storage、Apache Spark、Hudi、StarRocks をデプロイ
 - Apache Spark を使用して Hudi に小さなデータセットをロード
 - StarRocks を設定して、external catalog を使用して Hive Metastore にアクセス
 - データが存在する場所で StarRocks を使用してデータをクエリ
 

In addition to efficient analytics of local data, StarRocks can work as the compute engine to analyze data stored in data lakes such as Apache Hudi, Apache Iceberg, and Delta Lake. One of the key features of StarRocks is its external catalog, which acts as the linkage to an externally maintained metastore. This functionality provides users with the capability to query external data sources seamlessly, eliminating the need for data migration. As such, users can analyze data from different systems such as HDFS and Amazon S3, in various file formats such as Parquet, ORC, and CSV, etc.
The preceding figure shows a data lake analytics scenario where StarRocks is responsible for data computing and analysis, and the data lake is responsible for data storage, organization, and maintenance. Data lakes allow users to store data in open storage formats and use flexible schemas to produce reports on "single source of truth" for various BI, AI, ad-hoc, and reporting use cases. StarRocks fully leverages the advantages of its vectorization engine and CBO, significantly improving the performance of data lake analytics.
前提条件
StarRocks demo リポジトリ
StarRocks demo リポジトリ をローカルマシンにクローンします。
このガイドのすべての手順は、クローンした demo GitHub リポジトリの demo/documentation-samples/hudi/ ディレクトリから実行されます。
Docker
- Docker セットアップ: Mac の場合、Install Docker Desktop on Mac に定義されている手順に従ってください。Spark-SQL クエリを実行するには、少なくとも 5 GB のメモリと 4 つの CPU が Docker に割り当てられていることを確認してください (Docker → Preferences → Advanced を参照)。そうしないと、メモリの問題で Spark-SQL クエリが終了する可能性があります。
 - Docker に割り当てられた 20 GB の空きディスクスペース
 
SQL クライアント
Docker 環境で提供される SQL クライアントを使用するか、システム上のものを使用できます。多くの MySQL 互換クライアントが動作します。
設定
demo/documentation-samples/hudi ディレクトリに移動し、ファイルを確認します。これは Hudi のチュートリアルではないため、すべての設定ファイルが説明されるわけではありませんが、設定がどのように行われているかを確認するためにどこを見ればよいかを知っておくことが重要です。hudi/ ディレクトリには、Docker でサービスを起動および設定するために使用される docker-compose.yml ファイルがあります。以下はそのサービスの一覧と簡単な説明です。
Docker サービス
| サービス | 責任 | 
|---|---|
starrocks-fe | メタデータ管理、クライアント接続、クエリプランとスケジューリング | 
starrocks-be | クエリプランの実行 | 
metastore_db | Hive メタデータを保存するために使用される Postgres DB | 
hive_metastore | Apache Hive メタストアを提供 | 
minio と mc | MinIO Object Storage と MinIO コマンドラインクライアント | 
spark-hudi | MinIO Object Storage | 
設定ファイル
hudi/conf/ ディレクトリには、spark-hudi コンテナにマウントされる設定ファイルがあります。
core-site.xml
このファイルには、オブジェクトストレージに関連する設定が含まれています。詳細については、このドキュメントの最後にリンクがあります。
spark-defaults.conf
Hive、MinIO、および Spark SQL の設定。
hudi-defaults.conf
spark-shell の警告を抑制するために使用されるデフォルトファイル。
hadoop-metrics2-hbase.properties
spark-shell の警告を抑制するために使用される空のファイル。
hadoop-metrics2-s3a-file-system.properties
spark-shell の警告を抑制するために使用される空のファイル。
デモクラスターの起動
このデモシステムは、StarRocks、Hudi、MinIO、および Spark サービスで構成されています。Docker compose を実行してクラスターを起動します。
docker compose up --detach --wait --wait-timeout 60
[+] Running 8/8
 ✔ Network hudi                     Created                                                   0.0s
 ✔ Container hudi-starrocks-fe-1    Healthy                                                   0.1s
 ✔ Container hudi-minio-1           Healthy                                                   0.1s
 ✔ Container hudi-metastore_db-1    Healthy                                                   0.1s
 ✔ Container hudi-starrocks-be-1    Healthy                                                   0.0s
 ✔ Container hudi-mc-1              Healthy                                                   0.0s
 ✔ Container hudi-hive-metastore-1  Healthy                                                   0.0s
 ✔ Container hudi-spark-hudi-1      Healthy                                                   0.1s
多くのコンテナが実行されている場合、docker compose ps の出力は jq にパイプすると読みやすくなります。
docker compose ps --format json | \
jq '{Service: .Service, State: .State, Status: .Status}'
{
  "Service": "hive-metastore",
  "State": "running",
  "Status": "Up About a minute (healthy)"
}
{
  "Service": "mc",
  "State": "running",
  "Status": "Up About a minute"
}
{
  "Service": "metastore_db",
  "State": "running",
  "Status": "Up About a minute"
}
{
  "Service": "minio",
  "State": "running",
  "Status": "Up About a minute"
}
{
  "Service": "spark-hudi",
  "State": "running",
  "Status": "Up 33 seconds (healthy)"
}
{
  "Service": "starrocks-be",
  "State": "running",
  "Status": "Up About a minute (healthy)"
}
{
  "Service": "starrocks-fe",
  "State": "running",
  "Status": "Up About a minute (healthy)"
}
MinIO の設定
Spark コマンドを実行するときに、作成されるテーブルの basepath を s3a URI に設定します。
val basePath = "s3a://huditest/hudi_coders"
このステップでは、MinIO にバケット huditest を作成します。MinIO コンソールはポート 9000 で実行されています。
MinIO に認証
ブラウザを開いて http://localhost:9000/ にアクセスし、認証します。ユーザー名とパスワードは docker-compose.yml に指定されており、admin と password です。
バケットを作成
左側のナビゲーションで Buckets を選択し、Create Bucket + を選択します。バケット名を huditest とし、Create Bucket を選択します。

テーブルを作成してデータを投入し、Hive に同期
このコマンドや他の docker compose コマンドは、docker-compose.yml ファイルがあるディレクトリから実行してください。
spark-hudi サービスで spark-shell を開きます。
docker compose exec spark-hudi spark-shell
spark-shell の起動時に不正なリフレクティブアクセスに関する警告が表示されますが、これらの警告は無視して構いません。
これらのコマンドを scala> プロンプトで実行して以下を行います。
- この Spark セッションをデータのロード、処理、および書き込みに設定
 - データフレームを作成し、それを Hudi テーブルに書き込み
 - Hive Metastore に同期
 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._
val schema = StructType( Array(
                 StructField("language", StringType, true),
                 StructField("users", StringType, true),
                 StructField("id", StringType, true)
             ))
val rowData= Seq(Row("Java", "20000", "a"),
               Row("Python", "100000", "b"),
               Row("Scala", "3000", "c"))
val df = spark.createDataFrame(rowData,schema)
val databaseName = "hudi_sample"
val tableName = "hudi_coders_hive"
val basePath = "s3a://huditest/hudi_coders"
df.write.format("hudi").
  option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
  option(RECORDKEY_FIELD_OPT_KEY, "id").
  option(PARTITIONPATH_FIELD_OPT_KEY, "language").
  option(PRECOMBINE_FIELD_OPT_KEY, "users").
  option("hoodie.datasource.write.hive_style_partitioning", "true").
  option("hoodie.datasource.hive_sync.enable", "true").
  option("hoodie.datasource.hive_sync.mode", "hms").
  option("hoodie.datasource.hive_sync.database", databaseName).
  option("hoodie.datasource.hive_sync.table", tableName).
  option("hoodie.datasource.hive_sync.partition_fields", "language").
  option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
  mode(Overwrite).
  save(basePath)
System.exit(0)
次の警告が表示されます。
WARN
org.apache.hudi.metadata.HoodieBackedTableMetadata - 
Metadata table was not found at path 
s3a://huditest/hudi_coders/.hoodie/metadata
これは無視して構いません。このファイルはこの spark-shell セッション中に自動的に作成されます。
また、次の警告も表示されます。
78184 [main] WARN  org.apache.hadoop.fs.s3a.S3ABlockOutputStream  - 
Application invoked the Syncable API against stream writing to 
hudi_coders/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0. 
This is unsupported
この警告は、オブジェクトストレージを使用している場合、書き込み中のログファイルの同期がサポートされていないことを示しています。ファイルは閉じられたときにのみ同期されます。詳細は Stack Overflow を参照してください。
上記の spark-shell セッションの最後のコマンドはコンテナを終了するはずですが、終了しない場合は Enter キーを押すと終了します。
StarRocks の設定
StarRocks に接続
starrocks-fe サービスが提供する MySQL クライアントを使用して StarRocks に接続するか、お気に入りの SQL クライアントを使用して、MySQL プロトコルを使用して localhost:9030 に接続するように設定します。
docker compose exec starrocks-fe \
  mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
StarRocks と Hudi のリンクを作成
このガイドの最後に external catalog に関する詳細情報へのリンクがあります。このステップで作成される external catalog は、Docker で実行されている Hive Metastore (HMS) へのリンクとして機能します。
CREATE EXTERNAL CATALOG hudi_catalog_hms
PROPERTIES
(
    "type" = "hudi",
    "hive.metastore.type" = "hive",
    "hive.metastore.uris" = "thrift://hive-metastore:9083",
    "aws.s3.use_instance_profile" = "false",
    "aws.s3.access_key" = "admin",
    "aws.s3.secret_key" = "password",
    "aws.s3.enable_ssl" = "false",
    "aws.s3.enable_path_style_access" = "true",
    "aws.s3.endpoint" = "http://minio:9000"
);
Query OK, 0 rows affected (0.59 sec)
新しいカタログを使用
SET CATALOG hudi_catalog_hms;
Query OK, 0 rows affected (0.01 sec)
Spark で挿入されたデータに移動
SHOW DATABASES;
+--------------------+
| Database           |
+--------------------+
| default            |
| hudi_sample        |
| information_schema |
+--------------------+
2 rows in set (0.40 sec)
USE hudi_sample;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
SHOW TABLES;
+-----------------------+
| Tables_in_hudi_sample |
+-----------------------+
| hudi_coders_hive      |
+-----------------------+
1 row in set (0.07 sec)
StarRocks で Hudi のデータをクエリ
このクエリを 2 回実行します。最初のクエリは、StarRocks にデータがまだキャッシュされていないため、完了するのに約 5 秒かかる場合があります。2 回目のクエリは非常に迅速に完了します。
SELECT * from hudi_coders_hive\G
StarRocks ドキュメントの一部の SQL クエリは、セミコロンの代わりに \G で終了します。mysql CLI では、 \G はクエリ結果を縦に表示させます。
多くの SQL クライアントは縦のフォーマット出力を解釈しないため、mysql CLI を使用していない場合は \G を ; に置き換える必要があります。
*************************** 1. row ***************************
   _hoodie_commit_time: 20240208165522561
  _hoodie_commit_seqno: 20240208165522561_0_0
    _hoodie_record_key: c
_hoodie_partition_path: language=Scala
     _hoodie_file_name: bb29249a-b69d-4c32-843b-b7142d8dc51c-0_0-27-1221_20240208165522561.parquet
              language: Scala
                 users: 3000
                    id: c
*************************** 2. row ***************************
   _hoodie_commit_time: 20240208165522561
  _hoodie_commit_seqno: 20240208165522561_2_0
    _hoodie_record_key: a
_hoodie_partition_path: language=Java
     _hoodie_file_name: 12fc14aa-7dc4-454c-b710-1ad0556c9386-0_2-27-1223_20240208165522561.parquet
              language: Java
                 users: 20000
                    id: a
*************************** 3. row ***************************
   _hoodie_commit_time: 20240208165522561
  _hoodie_commit_seqno: 20240208165522561_1_0
    _hoodie_record_key: b
_hoodie_partition_path: language=Python
     _hoodie_file_name: 51977039-d71e-4dd6-90d4-0c93656dafcf-0_1-27-1222_20240208165522561.parquet
              language: Python
                 users: 100000
                    id: b
3 rows in set (0.15 sec)
まとめ
このチュートリアルでは、StarRocks external catalog を使用して、Hudi external catalog を使用してデータをそのままクエリできることを示しました。他にも Iceberg、Delta Lake、JDBC カタログを使用した多くの統合が利用可能です。
このチュートリアルで行ったこと:
- Docker で StarRocks と Hudi/Spark/MinIO 環境をデプロイ
 - Apache Spark を使用して Hudi に小さなデータセットをロード
 - Hudi カタログへのアクセスを提供するために StarRocks external catalog を設定
 - データレイクからデータをコピーせずに StarRocks で SQL を使用してデータをクエリ
 
詳細情報
Apache Hudi quickstart (Spark を含む)