基于 Apache Iceberg 的数据湖分析
基于 Apache Iceberg 的数据湖分析
概述
当前教程包含以下内容:
- 使用 Docker Compose 部署对象存储、Apache Spark、Iceberg Catalog 和 StarRocks。
- 向 Iceberg 数据湖导入数据。
- 配置 StarRocks 以访问 Iceberg Catalog。
- 使用 StarRocks 查询数据湖中的数据。
StarRocks 不仅能高效的分析本地存储的数据,也可以作为计算引擎直接分析数据湖中的数据。用户可以通过 StarRocks 提供的 External Catalog,轻松查询存储在 Apache Hive、Apache Iceberg、Apache Hudi、Delta Lake 等数据湖上的数据,无需进行数据迁移。支持的存储系统包括 HDFS、S3、OSS,支持的文件格式包括 Parquet、ORC、CSV。
如上图所示,在数据湖分析场景中,StarRocks 主要负责数据的计算分析,而数据湖则主要负责数据的存储、组织和维护。使用数据湖的优势在于可以使用开放的存储格式和灵活多变的 schema 定义方式,可以让 BI/AI/Adhoc/报表等业务有统一的 single source of truth。而 StarRocks 作为数据湖的计算引擎,可以充分发挥向量化引擎和 CBO 的优势,大大提升了数据湖分析的性能。
前提条件
Docker
- 安装 Docker。
- 为 Docker 分配 5 GB RAM。
- 为 Docker 分配 20 GB 的空闲磁盘空间。
SQL 客户端
您可以使用 Docker 环境中提供的 MySQL Client,也可以使用其他兼容 MySQL 的客户端,包括本教程中涉及的 DBeaver 和 MySQL Workbench。
curl
curl
命令用于下载数据集。您可以通过在终端运行 curl
或 curl.exe
来检查您的操作系统是否已安装 curl。如果未安装 curl,请点击此处获取 curl。
术语
FE
FE 节点负责元数据管理、客户端连接管理、查询计划和查询调度。每个 FE 节点在内存中存储和维护完整的元数据副本,确保每个 FE 都能提供无差别的服务。
CN
CN 节点负责在存算分离或存算一体集群中执行查询。
BE
BE 节点在存算一体集群中负责数据存储和执行查询。使用 External Catalog(例如本教程中使用的 Iceberg Catalog)时,BE 可以用于缓存外部数据,从而达到加速查询的效果。
环境
本教程使用了六个 Docker 容器(服务),均使用 Docker Compose 部署。这些服务及其功能如下:
服务 | 功能 |
---|---|
starrocks-fe | 负责元数据管理、客户端连接、查询规划和调度。 |
starrocks-be | 负责执行查询计划。 |
rest | 提供 Iceberg Catalog(元数据服务)。 |
spark-iceberg | 用于运行 PySpark 的 Apache Spark 环境。 |
mc | MinIO Client 客户 端。 |
minio | MinIO 对象存储。 |
下载 Docker Compose 文件和数据集
StarRocks 提供了包含以上必要容器的环境的 Docker Compose 文件和教程中需要使用数据集。
本教程中使用的数据集为纽约市绿色出租车行程记录,为 Parquet 格式。
下载 Docker Compose 文件。
mkdir iceberg
cd iceberg
curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/iceberg/docker-compose.yml
下载数据集。
curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/iceberg/datasets/green_tripdata_2023-05.parquet
在 Docker 中启动环境
所有 docker compose
命令必须从包含 docker-compose.yml
文件的目录中运行。
docker compose up -d
返回:
[+] Building 0.0s (0/0) docker:desktop-linux
[+] Running 6/6
✔ Container iceberg-rest Started 0.0s
✔ Container minio Started 0.0s
✔ Container starrocks-fe Started 0.0s
✔ Container mc Started 0.0s
✔ Container spark-iceberg Started 0.0s
✔ Container starrocks-be Started
检查环境状态
成功启动后,FE 和 BE 节点大约需要 30 秒才能部署完成。您需要通过 docker compose ps
命令检查服务的运行状态,直到 starrocks-fe
和 starrocks-be
的状态变为 healthy
。
docker compose ps
返回:
SERVICE CREATED STATUS PORTS
rest 4 minutes ago Up 4 minutes 0.0.0.0:8181->8181/tcp
mc 4 minutes ago Up 4 minutes
minio 4 minutes ago Up 4 minutes 0.0.0.0:9000-9001->9000-9001/tcp
spark-iceberg 4 minutes ago Up 4 minutes 0.0.0.0:8080->8080/tcp, 0.0.0.0:8888->8888/tcp, 0.0.0.0:10000-10001->10000-10001/tcp
starrocks-be 4 minutes ago Up 4 minutes (healthy) 0.0.0.0:8040->8040/tcp
starrocks-fe 4 minutes ago Up 4 minutes (healthy) 0.0.0.0:8030->8030/tcp, 0.0.0.0:9020->9020/tcp, 0.0.0.0:9030->9030/tcp
PySpark
本教程使用 PySpark 与 Iceberg 交互。如果您不熟悉 PySpark,您可以参考更多信息部分。
拷贝数据集
在将数据导入至 Iceberg 之前,需要将其拷贝到 spark-iceberg
容器中。
运行以下命令将数据集文件复制到 spark-iceberg
容器中的 /opt/spark/
路径。
docker compose \
cp green_tripdata_2023-05.parquet spark-iceberg:/opt/spark/
启动 PySpark
运行以下命令连接 spark-iceberg
服务并启动 PySpark。
docker compose exec -it spark-iceberg pyspark
返回:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.0
/_/
Using Python version 3.9.18 (main, Nov 1 2023 11:04:44)
Spark context Web UI available at http://6ad5cb0e6335:4041
Spark context available as 'sc' (master = local[*], app id = local-1701967093057).
SparkSession available as 'spark'.
>>>
导入数据集至 DataFrame 中
DataFrame 是 Spark SQL 的一部分,提供类似于数据库表的数据结构。
您需要 从 /opt/spark
路径导入数据集文件至 DataFrame 中,并通过查询其中部分数据检查数据导入是否成功。
在 PySpark Session 运行以下命令:
# 读取数据集文件到名为 `df` 的 DataFrame 中。
df = spark.read.parquet("/opt/spark/green_tripdata_2023-05.parquet")
# 显示数据集文件的 Schema。
df.printSchema()