Kafka connector for StarRocks
使用 Kafka connector 导入数据
StarRocks 提供 Apache Kafka® 连接器 (StarRocks Connector for Apache Kafka®,简称 Kafka connector),作为 sink connector,持续消费 Kafka 的消息并导入至 StarRocks 中。
使用 Kafka connector 可以更好的融入 Kafka 生态,StarRocks 可以与 Kafka Connect 无缝对接。为 StarRocks 准实时接入链路提供了更多的选择。相比于 Routine Load,您可以在以下场景中优先考虑使用 Kafka connector 导入数据:
- 相比于 Routine Load 仅支持导入 CSV、JSON、Avro 格式的数据,Kafka connector 支持导入更丰富的数据格式。只要数据能通过 Kafka Connect 的 converters 转换成 JSON 和 CSV 格式,就可以通过 Kafka connector 导入,例如 Protobuf 格式的数据。
- 需要对数据做自定义的 transform 操作,例如 Debezium CDC 格式的数据。
- 从多个 Kafka Topic 导入数据。
- 从 Confluent Cloud 导入数据。
- 需要更精细化的控制导入的批次大小,并行度等参数,以求达到导入速率和资源使用之间的平衡。
环境准备
版本要求
Connector | Kafka | StarRocks | Java |
---|---|---|---|
1.0.4 | 3.4 | 2.5 及以上 | 8 |
1.0.3 | 3.4 | 2.5 及以上 | 8 |
准备 Kafka 环境
支持自建 Apache Kafka 集群和 Confluent Cloud:
- 如果使用自建 Apache Kafka 集群,您可以参考 Apache Kafka quickstart 快速部署 Kafka 集群。Kafka Connect 已集成在 Kafka 中。
- 如果使用 Confluent Cloud,请确保已拥有 Confluent 账号并已经创建集群。
下载 Kafka connector
安装 Kafka connector 至 Kafka connect。
-
自建 Kafka 集群
-
Confluent Cloud
Kafka connector 目前尚未上传到 Confluent Hub,您需要下载并解压 starrocks-kafka-connector-xxx.tar.gz ,打包成 ZIP 文件并上传到 Confluent Cloud。
网络配置
确保 Kafka 所在机器能够访问 StarRocks 集群中 FE 节点的 http_port
(默认 8030
) 和 query_port
端口(默认 9030
),以及 BE 节点的 be_http_port
端口(默认 8040
)。
使用示例
本文以自建 Kafka 集群为例,介绍如何配置 Kafka connector 和 Kafka connect,然后启动 Kafka Connect 导入数据至 StarRocks。
数据集
假设 Kafka 集群的 Topic test
中存在如下 JSON 格式的数据。
{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}
目标数据库和表
根据 JSON 数据中需要导入的 key,在 StarRocks 集 群的目标数据库 example_db
中创建表 test_tbl
。
CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);
配置 Kafka connector 和 Kafka Connect,然后启动 Kafka Connect 导入数据
通过 Standalone 模式启动 Kafka Connect
-
配置 Kafka connector。在 Kafka 安装目录下的 config 目录,创建 Kafka connector 的配置文件 connect-StarRocks-sink.properties,并配置对应参数。参数和相关说明,参见参数说明。
信息- 在本示例中,StarRocks 提供的 Kafka connector 是 sink connector,能够持续消费 Kafka 的数据并导入 StarRocks。
- 如果源端数据为 CDC 数据,例如 Debezium CDC 格式的数据,并且 StarRocks 表为主键表,为了将源端的数据变更同步至主键表,则您还需要在 StarRocks 提供的 Kafka connector 的配置文件 connect-StarRocks-sink.properties 中配置
transforms
以及相关参数。
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# StarRocks FE 的 HTTP Server 地址,默认端口 8030
starrocks.http.url=192.168.xxx.xxx:8030
# 当 Kafka Topic 的名称与 StarRocks 表名不一致时,需要配置两者的映射关系
starrocks.topic2table.map=test:test_tbl
# StarRocks 用户名
starrocks.username=user1
# StarRocks 用户密码。您必须输入用户密码。
starrocks.password=123456
starrocks.database.name=example_db
sink.properties.strip_outer_array=true -
配置并启动 Kafka Connect。
-
配置 Kafka Connect。在 config 目录中的
config/connect-standalone.properties
配置文件中配置如下参数。参数解释,参见 Running Kafka Connect。# kafka broker 的地址,多个 Broker 之间以英文逗号 (,) 分隔。
# 注意本示例使用 PLAINTEXT 安全协议访问 Kafka 集群,如果使用其他安全协议访问 Kafka 集群,则您需要在本文件中配置相关信息。
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# Kafka connector 解压后所在的绝对路径,例如:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
启动 Kafka Connect。
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties
-