跳到主要内容
版本:Stable-3.5

Kafka routine load StarRocks using shared-data storage

关于 Routine Load

Routine load 是一种使用 Apache Kafka 或在本实验中使用 Redpanda,将数据持续流式传输到 StarRocks 的方法。数据被流式传输到 Kafka 主题中,然后由 Routine Load 作业将数据导入到 StarRocks。更多关于 Routine Load 的详细信息将在实验结束时提供。

关于 shared-data

在存储与计算分离的系统中,数据存储在低成本可靠的远端存储系统中,如 Amazon S3、Google Cloud Storage、Azure Blob Storage 和其他兼容 S3 的存储如 MinIO。热数据会被本地缓存,当缓存命中时,查询性能与存储计算耦合架构相当。计算节点(CN)可以在几秒钟内按需添加或移除。这种架构降低了存储成本,确保了更好的资源隔离,并提供了弹性和可扩展性。

本教程涵盖:

  • 使用 Docker Compose 运行 StarRocks、Redpanda 和 MinIO
  • 使用 MinIO 作为 StarRocks 的存储层
  • 配置 StarRocks 以使用 shared-data
  • 添加一个 Routine Load 作业以从 Redpanda 消费数据

所使用的数据是合成数据。

本文档中包含大量信息,内容以步骤形式呈现于开头,技术细节在结尾。这是为了按以下顺序服务于这些目的:

  1. 配置 Routine Load。
  2. 允许读者在 shared-data 部署中导入数据并分析这些数据。
  3. 提供 shared-data 部署的配置细节。

前提条件

Docker

  • Docker
  • 为 Docker 分配 4 GB RAM
  • 为 Docker 分配 10 GB 可用磁盘空间

SQL 客户端

您可以使用 Docker 环境中提供的 SQL 客户端,或者使用系统上的客户端。许多 MySQL 兼容的客户端都可以使用,本指南涵盖了 DBeaver 和 MySQL Workbench 的配置。

curl

curl 用于下载 Compose 文件和生成数据的脚本。通过在操作系统提示符下运行 curlcurl.exe 检查是否已安装 curl。如果未安装 curl,在此获取 curl

Python

需要 Python 3 和 Apache Kafka 的 Python 客户端 kafka-python


术语

FE

前端节点负责元数据管理、客户端连接管理、查询计划和查询调度。每个 FE 在其内存中存储和维护完整的元数据副本,确保 FEs 之间的服务无差异。

CN

计算节点负责在 shared-data 部署中执行查询计划。

BE

后端节点负责在 shared-nothing 部署中进行数据存储和执行查询计划。

备注

本指南不使用 BEs,此信息仅为帮助您理解 BEs 和 CNs 之间的区别。


启动 StarRocks

要使用对象存储运行 StarRocks 的 shared-data,您需要:

  • 一个前端引擎(FE)
  • 一个计算节点(CN)
  • 对象存储

本指南使用 MinIO,它是兼容 S3 的对象存储提供商。MinIO 根据 GNU Affero 通用公共许可证提供。

下载实验文件

docker-compose.yml

mkdir routineload
cd routineload
curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/routine-load-shared-data/docker-compose.yml

gen.py

gen.py 是一个使用 Apache Kafka 的 Python 客户端将数据发布(生产)到 Kafka 主题的脚本。该脚本已写入 Redpanda 容器的地址和端口。

curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/routine-load-shared-data/gen.py

启动 StarRocks、MinIO 和 Redpanda

docker compose up --detach --wait --wait-timeout 120

检查服务的进度。容器变为健康状态可能需要 30 秒或更长时间。routineload-minio_mc-1 容器不会显示健康指示器,并将在配置 MinIO 以供 StarRocks 使用访问密钥后退出。等待 routineload-minio_mc-10 代码退出,其余服务为 Healthy

运行 docker compose ps 直到服务健康:

docker compose ps
WARN[0000] /Users/droscign/routineload/docker-compose.yml: `version` is obsolete
[+] Running 6/7
✔ Network routineload_default Crea... 0.0s
✔ Container minio Healthy 5.6s
✔ Container redpanda Healthy 3.6s
✔ Container redpanda-console Healt... 1.1s
⠧ Container routineload-minio_mc-1 Waiting 23.1s
✔ Container starrocks-fe Healthy 11.1s
✔ Container starrocks-cn Healthy 23.0s
container routineload-minio_mc-1 exited (0)

检查 MinIO 凭证

为了使用 MinIO 作为 StarRocks 的对象存储,StarRocks 需要一个 MinIO 访问密钥。访问密钥是在 Docker 服务启动期间生成的。为了帮助您更好地理解 StarRocks 如何连接到 MinIO,您应该验证密钥是否存在。

打开 MinIO Web UI

浏览到 http://localhost:9001/access-keys 用户名和密码在 Docker compose 文件中指定,分别是 miniouserminiopassword。您应该看到有一个访问密钥。密钥是 AAAAAAAAAAAAAAAAAAAA,您无法在 MinIO 控制台中看到密钥,但它在 Docker compose 文件中,是 BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB

查看 MinIO 访问密钥


为您的数据创建一个 bucket

当您在 StarRocks 中创建一个存储卷时,您将指定数据的 LOCATION

    LOCATIONS = ("s3://my-starrocks-bucket/")

打开 http://localhost:9001/buckets 并为存储卷添加一个 bucket。将 bucket 命名为 my-starrocks-bucket。接受列出的三个选项的默认值。


SQL 客户端

当前教程可以使用以下三个客户端进行测试,您只需选择其中一个:

  • MySQL CLI:您可以从 Docker 环境或您的本机运行此客户端。
  • DBeaver(社区版或专业版)
  • MySQL Workbench

配置客户端

您可以从 StarRocks FE 节点容器 starrocks-fe 中直接运行 MySQL Client:

docker compose exec starrocks-fe \
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
提示

所有 docker compose 命令必须从包含 docker-compose.yml 文件的目录中运行。

如果您需要安装 MySQL Client,请点击展开以下 安装 MySQL 客户端 部分:

安装 MySQL 客户端
  • macOS:如果您使用 Homebrew 并且不需要安装 MySQL 服务器,请运行 brew install mysql-client@8.0 安装 MySQL Client。
  • Linux:请检查您的 mysql 客户端的 Repository。例如,运行 yum install mariadb
  • Microsoft Windows:安装 MySQL Community Server 后,运行提供的客户端,或在 WSL 中运行 mysql

StarRocks 的 shared-data 配置

此时您已经运行了 StarRocks,并且 MinIO 也在运行。MinIO 访问密钥用于连接 StarRocks 和 MinIO。

这是 FE 配置的一部分,指定了 StarRocks 部署将使用 shared data。这是在 Docker Compose 创建部署时添加到文件 fe.conf 中的。

# enable the shared data run mode
run_mode = shared_data
cloud_native_storage_type = S3
信息

您可以通过从 quickstart 目录运行此命令并查看文件末尾来验证这些设置:

docker compose exec starrocks-fe \
cat /opt/starrocks/fe/conf/fe.conf

:::

使用 SQL 客户端连接到 StarRocks

提示

从包含 docker-compose.yml 文件的目录运行此命令。

如果您使用的是 mysql CLI 以外的客户端,请立即打开它。

docker compose exec starrocks-fe \
mysql -P9030 -h127.0.0.1 -uroot --prompt="StarRocks > "

检查存储卷

SHOW STORAGE VOLUMES;
提示

应该没有存储卷,您将接下来创建一个。

Empty set (0.04 sec)

创建一个 shared-data 存储卷

早些时候,您在 MinIO 中创建了一个名为 my-starrocks-volume 的 bucket,并验证了 MinIO 有一个名为 AAAAAAAAAAAAAAAAAAAA 的访问密钥。以下 SQL 将使用访问密钥和密钥在 MionIO bucket 中创建一个存储卷。

CREATE STORAGE VOLUME s3_volume
TYPE = S3
LOCATIONS = ("s3://my-starrocks-bucket/")
PROPERTIES
(
"enabled" = "true",
"aws.s3.endpoint" = "minio:9000",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"aws.s3.use_instance_profile" = "false",
"aws.s3.use_aws_sdk_default_behavior" = "false"
);

现在您应该看到一个存储卷列出,之前它是一个空集:

SHOW STORAGE VOLUMES;
+----------------+
| Storage Volume |
+----------------+
| s3_volume |
+----------------+
1 row in set (0.02 sec)

查看存储卷的详细信息,并注意这还不是默认卷,并且它被配置为使用您的 bucket:

DESC STORAGE VOLUME s3_volume\G
提示

本文档中的一些 SQL,以及 StarRocks 文档中的许多其他文档,以 \G 结尾而不是分号。\G 使 mysql CLI 垂直呈现查询结果。

许多 SQL 客户端不解释垂直格式输出,因此您应该将 \G 替换为 ;

*************************** 1. row ***************************
Name: s3_volume
Type: S3
IsDefault: false
Location: s3://my-starrocks-bucket/
Params: {"aws.s3.access_key":"******","aws.s3.secret_key":"******","aws.s3.endpoint":"minio:9000","aws.s3.region":"us-east-1","aws.s3.use_instance_profile":"false","aws.s3.use_web_identity_token_file":"false","aws.s3.use_aws_sdk_default_behavior":"false"}
Enabled: true
Comment:
1 row in set (0.02 sec)

设置默认存储卷

SET s3_volume AS DEFAULT STORAGE VOLUME;
DESC STORAGE VOLUME s3_volume\G
*************************** 1. row ***************************
Name: s3_volume
Type: S3
IsDefault: true
Location: s3://my-starrocks-bucket/
Params: {"aws.s3.access_key":"******","aws.s3.secret_key":"******","aws.s3.endpoint":"minio:9000","aws.s3.region":"us-east-1","aws.s3.use_instance_profile":"false","aws.s3.use_web_identity_token_file":"false","aws.s3.use_aws_sdk_default_behavior":"false"}
Enabled: true
Comment:
1 row in set (0.02 sec)

创建一个表

这些 SQL 命令在您的 SQL 客户端中运行。

CREATE DATABASE IF NOT EXISTS quickstart;

验证数据库 quickstart 是否使用存储卷 s3_volume

SHOW CREATE DATABASE quickstart \G
*************************** 1. row ***************************
Database: quickstart
Create Database: CREATE DATABASE `quickstart`
PROPERTIES ("storage_volume" = "s3_volume")
USE quickstart;
CREATE TABLE site_clicks (
`uid` bigint NOT NULL COMMENT "uid",
`site` string NOT NULL COMMENT "site url",
`vtime` bigint NOT NULL COMMENT "vtime"
)
DISTRIBUTED BY HASH(`uid`)
PROPERTIES("replication_num"="1");

打开 Redpanda 控制台

目前还没有主题,下一步将创建一个主题。

http://localhost:8080/overview

将数据发布到 Redpanda 主题

routineload/ 文件夹中的命令行中运行此命令以生成数据:

python gen.py 5
提示

在您的系统上,您可能需要在命令中使用 python3 代替 python

如果缺少 kafka-python,请尝试:

pip install kafka-python

pip3 install kafka-python
b'{ "uid": 6926, "site": "https://docs.starrocks.io/", "vtime": 1718034793 } '
b'{ "uid": 3303, "site": "https://www.starrocks.io/product/community", "vtime": 1718034793 } '
b'{ "uid": 227, "site": "https://docs.starrocks.io/", "vtime": 1718034243 } '
b'{ "uid": 7273, "site": "https://docs.starrocks.io/", "vtime": 1718034794 } '
b'{ "uid": 4666, "site": "https://www.starrocks.io/", "vtime": 1718034794 } '

在 Redpanda 控制台中验证

导航到 http://localhost:8080/topics 在 Redpanda 控制台中,您将看到一个名为 test2 的主题。选择该主题,然后选择 Messages 选项卡,您将看到五条与 gen.py 输出匹配的消息。

消费消息

在 StarRocks 中,您将创建一个 Routine Load 作业来:

  1. 从 Redpanda 主题 test2 消费消息
  2. 将这些消息加载到表 site_clicks

StarRocks 被配置为使用 MinIO 进行存储,因此插入到 site_clicks 表中的数据将存储在 MinIO 中。

创建一个 Routine Load 作业

在 SQL 客户端中运行此命令以创建 Routine Load 作业,命令将在实验结束时详细解释。

CREATE ROUTINE LOAD quickstart.clicks ON site_clicks
PROPERTIES
(
"format" = "JSON",
"jsonpaths" ="[\"$.uid\",\"$.site\",\"$.vtime\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "redpanda:29092",
"kafka_topic" = "test2",
"kafka_partitions" = "0",
"kafka_offsets" = "OFFSET_BEGINNING"
);

验证 Routine Load 作业

SHOW ROUTINE LOAD\G

验证三个高亮行:

  1. 状态应为 RUNNING
  2. 主题应为 test2,代理应为 redpanda:2092
  3. 统计信息应显示 0 或 5 行已加载,具体取决于您运行 SHOW ROUTINE LOAD 命令的时间。如果有 0 行已加载,请再次运行。
*************************** 1. row ***************************
Id: 10078
Name: clicks
CreateTime: 2024-06-12 15:51:12
PauseTime: NULL
EndTime: NULL
DbName: quickstart
TableName: site_clicks
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","partial_update_mode":"null","whereExpr":"*","dataFormat":"json","timezone":"Etc/UTC","format":"json","log_rejected_record_num":"0","taskTimeoutSecond":"60","json_root":"","maxFilterRatio":"1.0","strict_mode":"false","jsonpaths":"[\"$.uid\",\"$.site\",\"$.vtime\"]","taskConsumeSecond":"15","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"test2","currentKafkaPartitions":"0","brokerList":"redpanda:29092"}
CustomProperties: {"group.id":"clicks_ea38a713-5a0f-4abe-9b11-ff4a241ccbbd"}
Statistic: {"receivedBytes":0,"errorRows":0,"committedTaskNum":0,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":0,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":1}
Progress: {"0":"OFFSET_ZERO"}
TimestampProgress: {}
ReasonOfStateChanged:
ErrorLogUrls:
TrackingSQL:
OtherMsg:
LatestSourcePosition: {}
1 row in set (0.00 sec)
SHOW ROUTINE LOAD\G
*************************** 1. row ***************************
Id: 10076
Name: clicks
CreateTime: 2024-06-12 18:40:53
PauseTime: NULL
EndTime: NULL
DbName: quickstart
TableName: site_clicks
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","partial_update_mode":"null","whereExpr":"*","dataFormat":"json","timezone":"Etc/UTC","format":"json","log_rejected_record_num":"0","taskTimeoutSecond":"60","json_root":"","maxFilterRatio":"1.0","strict_mode":"false","jsonpaths":"[\"$.uid\",\"$.site\",\"$.vtime\"]","taskConsumeSecond":"15","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"test2","currentKafkaPartitions":"0","brokerList":"redpanda:29092"}
CustomProperties: {"group.id":"clicks_a9426fee-45bb-403a-a1a3-b3bc6c7aa685"}
Statistic: {"receivedBytes":372,"errorRows":0,"committedTaskNum":1,"loadedRows":5,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":5,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":519}
Progress: {"0":"4"}
TimestampProgress: {"0":"1718217035111"}
ReasonOfStateChanged:
ErrorLogUrls:
TrackingSQL:
OtherMsg:
LatestSourcePosition: {"0":"5"}
1 row in set (0.00 sec)

验证数据是否存储在 MinIO 中

打开 MinIO http://localhost:9001/browser/ 并验证 my-starrocks-bucket 下是否存储了对象。


从 StarRocks 查询数据

USE quickstart;
SELECT * FROM site_clicks;
+------+--------------------------------------------+------------+
| uid | site | vtime |
+------+--------------------------------------------+------------+
| 4607 | https://www.starrocks.io/blog | 1718031441 |
| 1575 | https://www.starrocks.io/ | 1718031523 |
| 2398 | https://docs.starrocks.io/ | 1718033630 |
| 3741 | https://www.starrocks.io/product/community | 1718030845 |
| 4792 | https://www.starrocks.io/ | 1718033413 |
+------+--------------------------------------------+------------+
5 rows in set (0.07 sec)

发布更多数据

再次运行 gen.py 将向 Redpanda 发布另外五条记录。

python gen.py 5

验证数据是否已添加

由于 Routine Load 作业按计划运行(默认每 10 秒),数据将在几秒钟内加载。

SELECT * FROM site_clicks;
+------+--------------------------------------------+------------+
| uid | site | vtime |
+------+--------------------------------------------+------------+
| 6648 | https://www.starrocks.io/blog | 1718205970 |
| 7914 | https://www.starrocks.io/ | 1718206760 |
| 9854 | https://www.starrocks.io/blog | 1718205676 |
| 1186 | https://www.starrocks.io/ | 1718209083 |
| 3305 | https://docs.starrocks.io/ | 1718209083 |
| 2288 | https://www.starrocks.io/blog | 1718206759 |
| 7879 | https://www.starrocks.io/product/community | 1718204280 |
| 2666 | https://www.starrocks.io/ | 1718208842 |
| 5801 | https://www.starrocks.io/ | 1718208783 |
| 8409 | https://www.starrocks.io/ | 1718206889 |
+------+--------------------------------------------+------------+
10 rows in set (0.02 sec)

配置详情

现在您已经体验了使用 StarRocks 的 shared-data,了解配置非常重要。

CN 配置

此处使用的 CN 配置是默认配置,因为 CN 设计用于 shared-data 使用。默认配置如下所示。您无需进行任何更改。

sys_log_level = INFO

# ports for admin, web, heartbeat service
be_port = 9060
be_http_port = 8040
heartbeat_service_port = 9050
brpc_port = 8060
starlet_port = 9070

FE 配置

FE 配置与默认配置略有不同,因为 FE 必须配置为期望数据存储在对象存储中,而不是存储在 BE 节点的本地磁盘上。

docker-compose.yml 文件在 command 中生成 FE 配置。

# enable shared data, set storage type, set endpoint
run_mode = shared_data
cloud_native_storage_type = S3
备注

此配置文件不包含 FE 的默认条目,仅显示 shared-data 配置。

非默认 FE 配置设置:

备注

许多配置参数以 s3_ 为前缀。此前缀用于所有 Amazon S3 兼容的存储类型(例如:S3、GCS 和 MinIO)。使用 Azure Blob Storage 时,前缀为 azure_

run_mode=shared_data

这启用 shared-data 使用。

cloud_native_storage_type=S3

这指定使用 S3 兼容存储或 Azure Blob Storage。对于 MinIO,这始终是 S3。

CREATE storage volume 的详细信息

CREATE STORAGE VOLUME s3_volume
TYPE = S3
LOCATIONS = ("s3://my-starrocks-bucket/")
PROPERTIES
(
"enabled" = "true",
"aws.s3.endpoint" = "minio:9000",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB",
"aws.s3.use_instance_profile" = "false",
"aws.s3.use_aws_sdk_default_behavior" = "false"
);

aws_s3_endpoint=minio:9000

MinIO 端点,包括端口号。

aws_s3_path=starrocks

bucket 名称。

aws_s3_access_key=AAAAAAAAAAAAAAAAAAAA

MinIO 访问密钥。

aws_s3_secret_key=BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB

MinIO 访问密钥密钥。

aws_s3_use_instance_profile=false

使用 MinIO 时使用访问密钥,因此 MinIO 不使用实例配置文件。

aws_s3_use_aws_sdk_default_behavior=false

使用 MinIO 时,此参数始终设置为 false。


关于 Routine Load 命令的说明

StarRocks Routine Load 接受许多参数。这里只描述了本教程中使用的参数,其余将在更多信息部分中链接。

CREATE ROUTINE LOAD quickstart.clicks ON site_clicks
PROPERTIES
(
"format" = "JSON",
"jsonpaths" ="[\"$.uid\",\"$.site\",\"$.vtime\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "redpanda:29092",
"kafka_topic" = "test2",
"kafka_partitions" = "0",
"kafka_offsets" = "OFFSET_BEGINNING"
);

参数

CREATE ROUTINE LOAD quickstart.clicks ON site_clicks

CREATE ROUTINE LOAD ON 的参数为:

  • database_name.job_name
  • table_name

database_name 是可选的。在本实验中,它是 quickstart 并被指定。

job_name 是必需的,为 clicks

table_name 是必需的,为 site_clicks

作业属性

属性 format

"format" = "JSON",

在这种情况下,数据是 JSON 格式的,因此属性设置为 JSON。其他有效格式为:CSVJSONAvroCSV 是默认格式。

属性 jsonpaths

"jsonpaths" ="[\"$.uid\",\"$.site\",\"$.vtime\"]"

您希望从 JSON 格式数据中加载的字段名称。此参数的值是一个有效的 JsonPath 表达式。更多信息可在页面末尾获得。

数据源属性

kafka_broker_list

"kafka_broker_list" = "redpanda:29092",

Kafka 的代理连接信息。格式为 <kafka_broker_name_or_ip>:<broker_ port>。多个代理用逗号分隔。

kafka_topic

"kafka_topic" = "test2",

要消费的 Kafka 主题。

kafka_partitionskafka_offsets

"kafka_partitions" = "0",
"kafka_offsets" = "OFFSET_BEGINNING"

这些属性一起呈现,因为每个 kafka_partitions 条目需要一个 kafka_offset

kafka_partitions 是要消费的一个或多个分区的列表。如果未设置此属性,则消费所有分区。

kafka_offsets 是一个偏移量列表,每个分区在 kafka_partitions 中列出。在这种情况下,值为 OFFSET_BEGINNING,这会导致消费所有数据。默认情况下仅消费新数据。


总结

在本教程中,您:

  • 在 Docker 中部署了 StarRocks、Reedpanda 和 Minio
  • 创建了一个 Routine Load 作业以从 Kafka 主题消费数据
  • 学习了如何配置使用 MinIO 的 StarRocks 存储卷

更多信息

StarRocks 架构

本实验使用的示例非常简单。Routine Load 具有更多选项和功能。了解更多

JSONPath

Rocky the happy otterStarRocks Assistant

AI generated answers are based on docs and other sources. Please test answers in non-production environments.