Kafka connector for StarRocks
使用 Kafka connector 导入数据
StarRocks 提供 Apache Kafka® 连接器 (StarRocks Connector for Apache Kafka®,简称 Kafka connector),作为 sink connector,持续消费 Kafka 的消息并导入至 StarRocks 中。
使用 Kafka connector 可以更好的融入 Kafka 生态,StarRocks 可以与 Kafka Connect 无缝对接。为 StarRocks 准实时接入链路提供了更多的选择。相比于 Routine Load,您可以在以下场景中优先考虑使用 Kafka connector 导入数据:
- 相比于 Routine Load 仅支持导入 CSV、JSON、Avro 格式的数据,Kafka connector 支持导入更丰富的数据格式。只要数据能通过 Kafka Connect 的 converters 转换成 JSON 和 CSV 格式,就可以通过 Kafka connector 导入,例如 Protobuf 格式的数据。
- 需要对数据做自定义的 transform 操作,例如 Debezium CDC 格式的数据。
- 从多个 Kafka Topic 导入数据。
- 从 Confluent Cloud 导入数据。
- 需要更精细化的控制导入的批次大小,并行度等参数,以求达到导入速率和资源使用之间的平衡。
环境准备
版本要求
Connector | Kafka | StarRocks | Java |
---|---|---|---|
1.0.4 | 3.4 | 2.5 及以上 | 8 |
1.0.3 | 3.4 | 2.5 及以上 | 8 |
准备 Kafka 环境
支持自建 Apache Kafka 集群和 Confluent Cloud:
- 如果使用自建 Apache Kafka 集群,您可以参考 Apache Kafka quickstart 快速部署 Kafka 集群。Kafka Connect 已集成在 Kafka 中。
- 如果使用 Confluent Cloud,请确保已拥有 Confluent 账号并已经创建集群。
下载 Kafka connector
安装 Kafka connector 至 Kafka connect。
-
自建 Kafka 集群
-
Confluent Cloud
Kafka connector 目前尚未上传到 Confluent Hub,您需要下载并解压 starrocks-kafka-connector-xxx.tar.gz ,打包成 ZIP 文件并上传到 Confluent Cloud。
网络配置
确保 Kafka 所在机器能够访问 StarRocks 集群中 FE 节点的 http_port
(默认 8030
) 和 query_port
端口(默认 9030
),以及 BE 节点的 be_http_port
端口(默认 8040
)。
使用示例
本文以自建 Kafka 集群为例,介绍如何配置 Kafka connector 和 Kafka connect,然后启动 Kafka Connect 导入数据至 StarRocks。
数据集
假设 Kafka 集群的 Topic test
中存在如下 JSON 格式的数据。
{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}