メインコンテンツまでスキップ
バージョン: 3.2

Apache Hudi Lakehouse

概要

  • Docker compose を使用して Object Storage、Apache Spark、Hudi、StarRocks をデプロイ
  • Apache Spark を使用して Hudi に小さなデータセットをロード
  • StarRocks を設定して、external catalog を使用して Hive Metastore にアクセス
  • データが存在する場所で StarRocks を使用してデータをクエリ

DLA

In addition to efficient analytics of local data, StarRocks can work as the compute engine to analyze data stored in data lakes such as Apache Hudi, Apache Iceberg, and Delta Lake. One of the key features of StarRocks is its external catalog, which acts as the linkage to an externally maintained metastore. This functionality provides users with the capability to query external data sources seamlessly, eliminating the need for data migration. As such, users can analyze data from different systems such as HDFS and Amazon S3, in various file formats such as Parquet, ORC, and CSV, etc.

The preceding figure shows a data lake analytics scenario where StarRocks is responsible for data computing and analysis, and the data lake is responsible for data storage, organization, and maintenance. Data lakes allow users to store data in open storage formats and use flexible schemas to produce reports on "single source of truth" for various BI, AI, ad-hoc, and reporting use cases. StarRocks fully leverages the advantages of its vectorization engine and CBO, significantly improving the performance of data lake analytics.

前提条件

StarRocks demo リポジトリ

StarRocks demo リポジトリ をローカルマシンにクローンします。

このガイドのすべての手順は、クローンした demo GitHub リポジトリの demo/documentation-samples/hudi/ ディレクトリから実行されます。

Docker

  • Docker セットアップ: Mac の場合、Install Docker Desktop on Mac に定義されている手順に従ってください。Spark-SQL クエリを実行するには、少なくとも 5 GB のメモリと 4 つの CPU が Docker に割り当てられていることを確認してください (Docker → Preferences → Advanced を参照)。そうしないと、メモリの問題で Spark-SQL クエリが終了する可能性があります。
  • Docker に割り当てられた 20 GB の空きディスクスペース

SQL クライアント

Docker 環境で提供される SQL クライアントを使用するか、システム上のものを使用できます。多くの MySQL 互換クライアントが動作します。

設定

demo/documentation-samples/hudi ディレクトリに移動し、ファイルを確認します。これは Hudi のチュートリアルではないため、すべての設定ファイルが説明されるわけではありませんが、設定がどのように行われているかを確認するためにどこを見ればよいかを知っておくことが重要です。hudi/ ディレクトリには、Docker でサービスを起動および設定するために使用される docker-compose.yml ファイルがあります。以下はそのサービスの一覧と簡単な説明です。

Docker サービス

サービス責任
starrocks-feメタデータ管理、クライアント接続、クエリプランとスケジューリング
starrocks-beクエリプランの実行
metastore_dbHive メタデータを保存するために使用される Postgres DB
hive_metastoreApache Hive メタストアを提供
miniomcMinIO Object Storage と MinIO コマンドラインクライアント
spark-hudiMinIO Object Storage

設定ファイル

hudi/conf/ ディレクトリには、spark-hudi コンテナにマウントされる設定ファイルがあります。

core-site.xml

このファイルには、オブジェクトストレージに関連する設定が含まれています。詳細については、このドキュメントの最後にリンクがあります。

spark-defaults.conf

Hive、MinIO、および Spark SQL の設定。

hudi-defaults.conf

spark-shell の警告を抑制するために使用されるデフォルトファイル。

hadoop-metrics2-hbase.properties

spark-shell の警告を抑制するために使用される空のファイル。

hadoop-metrics2-s3a-file-system.properties

spark-shell の警告を抑制するために使用される空のファイル。

デモクラスターの起動

このデモシステムは、StarRocks、Hudi、MinIO、および Spark サービスで構成されています。Docker compose を実行してクラスターを起動します。

docker compose up --detach --wait --wait-timeout 60
[+] Running 8/8
✔ Network hudi Created 0.0s
✔ Container hudi-starrocks-fe-1 Healthy 0.1s
✔ Container hudi-minio-1 Healthy 0.1s
✔ Container hudi-metastore_db-1 Healthy 0.1s
✔ Container hudi-starrocks-be-1 Healthy 0.0s
✔ Container hudi-mc-1 Healthy 0.0s
✔ Container hudi-hive-metastore-1 Healthy 0.0s
✔ Container hudi-spark-hudi-1 Healthy 0.1s
ヒント

多くのコンテナが実行されている場合、docker compose ps の出力は jq にパイプすると読みやすくなります。

docker compose ps --format json | \
jq '{Service: .Service, State: .State, Status: .Status}'
{
"Service": "hive-metastore",
"State": "running",
"Status": "Up About a minute (healthy)"
}
{
"Service": "mc",
"State": "running",
"Status": "Up About a minute"
}
{
"Service": "metastore_db",
"State": "running",
"Status": "Up About a minute"
}
{
"Service": "minio",
"State": "running",
"Status": "Up About a minute"
}
{
"Service": "spark-hudi",
"State": "running",
"Status": "Up 33 seconds (healthy)"
}
{
"Service": "starrocks-be",
"State": "running",
"Status": "Up About a minute (healthy)"
}
{
"Service": "starrocks-fe",
"State": "running",
"Status": "Up About a minute (healthy)"
}

MinIO の設定

Spark コマンドを実行するときに、作成されるテーブルの basepath を s3a URI に設定します。

val basePath = "s3a://huditest/hudi_coders"

このステップでは、MinIO にバケット huditest を作成します。MinIO コンソールはポート 9000 で実行されています。

MinIO に認証

ブラウザを開いて http://localhost:9000/ にアクセスし、認証します。ユーザー名とパスワードは docker-compose.yml に指定されており、adminpassword です。

バケットを作成

左側のナビゲーションで Buckets を選択し、Create Bucket + を選択します。バケット名を huditest とし、Create Bucket を選択します。

Create bucket huditest

テーブルを作成してデータを投入し、Hive に同期

ヒント

このコマンドや他の docker compose コマンドは、docker-compose.yml ファイルがあるディレクトリから実行してください。

spark-hudi サービスで spark-shell を開きます。

docker compose exec spark-hudi spark-shell
注記

spark-shell の起動時に不正なリフレクティブアクセスに関する警告が表示されますが、これらの警告は無視して構いません。

これらのコマンドを scala> プロンプトで実行して以下を行います。

  • この Spark セッションをデータのロード、処理、および書き込みに設定
  • データフレームを作成し、それを Hudi テーブルに書き込み
  • Hive Metastore に同期
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._

val schema = StructType( Array(
StructField("language", StringType, true),
StructField("users", StringType, true),
StructField("id", StringType, true)
))

val rowData= Seq(Row("Java", "20000", "a"),
Row("Python", "100000", "b"),
Row("Scala", "3000", "c"))


val df = spark.createDataFrame(rowData,schema)

val databaseName = "hudi_sample"
val tableName = "hudi_coders_hive"
val basePath = "s3a://huditest/hudi_coders"

df.write.format("hudi").
option(org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME, tableName).
option(RECORDKEY_FIELD_OPT_KEY, "id").
option(PARTITIONPATH_FIELD_OPT_KEY, "language").
option(PRECOMBINE_FIELD_OPT_KEY, "users").
option("hoodie.datasource.write.hive_style_partitioning", "true").
option("hoodie.datasource.hive_sync.enable", "true").
option("hoodie.datasource.hive_sync.mode", "hms").
option("hoodie.datasource.hive_sync.database", databaseName).
option("hoodie.datasource.hive_sync.table", tableName).
option("hoodie.datasource.hive_sync.partition_fields", "language").
option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hive-metastore:9083").
mode(Overwrite).
save(basePath)
System.exit(0)
注記

次の警告が表示されます。

WARN
org.apache.hudi.metadata.HoodieBackedTableMetadata -
Metadata table was not found at path
s3a://huditest/hudi_coders/.hoodie/metadata

これは無視して構いません。このファイルはこの spark-shell セッション中に自動的に作成されます。

また、次の警告も表示されます。

78184 [main] WARN  org.apache.hadoop.fs.s3a.S3ABlockOutputStream  - 
Application invoked the Syncable API against stream writing to
hudi_coders/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0.
This is unsupported

この警告は、オブジェクトストレージを使用している場合、書き込み中のログファイルの同期がサポートされていないことを示しています。ファイルは閉じられたときにのみ同期されます。詳細は Stack Overflow を参照してください。

上記の spark-shell セッションの最後のコマンドはコンテナを終了するはずですが、終了しない場合は Enter キーを押すと終了します。

StarRocks の設定

StarRocks に接続

starrocks-fe サービスが提供する MySQL クライアントを使用して StarRocks に接続するか、お気に入りの SQL クライアントを使用して、MySQL プロトコルを使用して localhost:9030 に接続するように設定します。

docker compose exec starrocks-fe \
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "

StarRocks と Hudi のリンクを作成

このガイドの最後に external catalog に関する詳細情報へのリンクがあります。このステップで作成される external catalog は、Docker で実行されている Hive Metastore (HMS) へのリンクとして機能します。

CREATE EXTERNAL CATALOG hudi_catalog_hms
PROPERTIES
(
"type" = "hudi",
"hive.metastore.type" = "hive",
"hive.metastore.uris" = "thrift://hive-metastore:9083",
"aws.s3.use_instance_profile" = "false",
"aws.s3.access_key" = "admin",
"aws.s3.secret_key" = "password",
"aws.s3.enable_ssl" = "false",
"aws.s3.enable_path_style_access" = "true",
"aws.s3.endpoint" = "http://minio:9000"
);
Query OK, 0 rows affected (0.59 sec)

新しいカタログを使用

SET CATALOG hudi_catalog_hms;
Query OK, 0 rows affected (0.01 sec)

Spark で挿入されたデータに移動

SHOW DATABASES;
+--------------------+
| Database |
+--------------------+
| default |
| hudi_sample |
| information_schema |
+--------------------+
2 rows in set (0.40 sec)
USE hudi_sample;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
SHOW TABLES;
+-----------------------+
| Tables_in_hudi_sample |
+-----------------------+
| hudi_coders_hive |
+-----------------------+
1 row in set (0.07 sec)

StarRocks で Hudi のデータをクエリ

このクエリを 2 回実行します。最初のクエリは、StarRocks にデータがまだキャッシュされていないため、完了するのに約 5 秒かかる場合があります。2 回目のクエリは非常に迅速に完了します。

SELECT * from hudi_coders_hive\G
ヒント

StarRocks ドキュメントの一部の SQL クエリは、セミコロンの代わりに \G で終了します。mysql CLI では、 \G はクエリ結果を縦に表示させます。

多くの SQL クライアントは縦のフォーマット出力を解釈しないため、mysql CLI を使用していない場合は \G; に置き換える必要があります。

*************************** 1. row ***************************
_hoodie_commit_time: 20240208165522561
_hoodie_commit_seqno: 20240208165522561_0_0
_hoodie_record_key: c
_hoodie_partition_path: language=Scala
_hoodie_file_name: bb29249a-b69d-4c32-843b-b7142d8dc51c-0_0-27-1221_20240208165522561.parquet
language: Scala
users: 3000
id: c
*************************** 2. row ***************************
_hoodie_commit_time: 20240208165522561
_hoodie_commit_seqno: 20240208165522561_2_0
_hoodie_record_key: a
_hoodie_partition_path: language=Java
_hoodie_file_name: 12fc14aa-7dc4-454c-b710-1ad0556c9386-0_2-27-1223_20240208165522561.parquet
language: Java
users: 20000
id: a
*************************** 3. row ***************************
_hoodie_commit_time: 20240208165522561
_hoodie_commit_seqno: 20240208165522561_1_0
_hoodie_record_key: b
_hoodie_partition_path: language=Python
_hoodie_file_name: 51977039-d71e-4dd6-90d4-0c93656dafcf-0_1-27-1222_20240208165522561.parquet
language: Python
users: 100000
id: b
3 rows in set (0.15 sec)

まとめ

このチュートリアルでは、StarRocks external catalog を使用して、Hudi external catalog を使用してデータをそのままクエリできることを示しました。他にも Iceberg、Delta Lake、JDBC カタログを使用した多くの統合が利用可能です。

このチュートリアルで行ったこと:

  • Docker で StarRocks と Hudi/Spark/MinIO 環境をデプロイ
  • Apache Spark を使用して Hudi に小さなデータセットをロード
  • Hudi カタログへのアクセスを提供するために StarRocks external catalog を設定
  • データレイクからデータをコピーせずに StarRocks で SQL を使用してデータをクエリ

詳細情報

StarRocks Catalogs

Apache Hudi quickstart (Spark を含む)

Apache Hudi S3 configuration

Apache Spark configuration docs