从 AutoMQ Kafka 持续导入
AutoMQ for Kafka AutoMQ Kafka ) 是一款基于云重新设计的云原生 Kafka。 AutoMQ Kafka 内核开源并且100% 兼容 Kafka 协议,可以充分兑现云的红利。 相比自建 Apache Kafka,AutoMQ Kafka 在其云原生架构基础上实现的自动弹性、流量自平衡、秒级分区移动等特性可以为用户带来更低的总体拥有成本(TCO)。 本文将介绍如何通过 StarRocks Routine Load 将数据导入 AutoMQ Kafka。关于Routine Load的基本原理可以参考 Routine Load 基本原理。
环境准备
准备 StarRocks 以及测试数据
请确保自己已经准备好了可用的 StarRocks 集群。本文为了方便演示过程,参考 使用 Docker 部署 StarRocks 在一台 Linux 机器上安装了作为 Demo 的 StarRocks 集群。 创建库和主键表的测试表:
create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"enable_persistent_index" = "true"
);
如果测试环境中集群仅包含一个 BE,可以在 PROPERTIES
中将副本数设置为 1
,即 PROPERTIES( "replication_num" = "1" )
。默认副本数为 3
,也是生产集群推荐的副本数。如果您需要使用默认设置,也可以不配置 replication_num
参数。
准备 AutoMQ Kafka 环境和测试数据
参考 AutoMQ 快速入门部署好 AutoMQ Kafka 集群,确保 AutoMQ Kafka 与 StarRocks 之间保持网络连通性。 在AutoMQ Kafka中快速创建一个名为 example_topic 的主题并向其中写入一条测试JSON数据,可以通过以下步骤实现:
创建Topic
使用Kafka的命令行工具来创建主题。你需要有Kafka环境的访问权限,并且确保Kafka服务正在运行。以下是创建主题的命令:
./kafka-topics.sh --create --topic exampleto_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1
注意:将 topic 和 bootstarp-server 替换为你的Kafka服务器地址。
创建完topic可以用以下命令检查topic创建的结果
./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092
生成测试数据
我将为你生成一条简单的JSON格式的测试数据,和前文的表需要对应:
{
"id": 1,
"name": "testuser",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}
写入测试数据
使用Kafka的命令行工具或者编程方式将测试数据写入到example_topic。以下是使用命令行工具的一个示例:
echo '{"id": 1, "name": "testuser", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic
注意:将 topic 和 bootstarp-server 替换为你的Kafka服务器地址。
使用如下命令可以查看刚写入的topic数据:
sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning
创建 Routine Load 导入作业
在 StarRocks 命令行创建一个 Routine Load导入作业,可以对 AutoMQ Kafka topic内的数据进行持续导入:
CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
注意:将 kafka_broker_list 替换为你的Kafka服务器地址。