跳到主要内容
版本:2.5

使用 Kafka connector 导入数据

StarRocks 提供 Apache Kafka® 连接器 (StarRocks Connector for Apache Kafka®,简称 Kafka connector),作为 sink connector,持续消费 Kafka 的消息并导入至 StarRocks 中。

使用 Kafka connector 可以更好的融入 Kafka 生态,StarRocks 可以与 Kafka Connect 无缝对接。为 StarRocks 准实时接入链路提供了更多的选择。相比于 Routine Load,您可以在以下场景中优先考虑使用 Kafka connector 导入数据:

  • 相比于 Routine Load 仅支持导入 CSV、JSON、Avro 格式的数据,Kafka connector 支持导入更丰富的数据格式。只要数据能通过 Kafka Connect 的 converters 转换成 JSON 和 CSV 格式,就可以通过 Kafka connector 导入,例如 Protobuf 格式的数据。
  • 需要对数据做自定义的 transform 操作,例如 Debezium CDC 格式的数据。
  • 从多个 Kafka Topic 导入数据。
  • 从 Confluent Cloud 导入数据。
  • 需要更精细化的控制导入的批次大小,并行度等参数,以求达到导入速率和资源使用之间的平衡。

环境准备

准备 Kafka 环境

支持自建 Apache Kafka 集群和 Confluent Cloud:

  • 如果使用自建 Apache Kafka 集群,您可以参考 Apache Kafka quickstart 快速部署 Kafka 集群。Kafka Connect 已集成在 Kafka 中。
  • 如果使用 Confluent Cloud,请确保已拥有 Confluent 账号并已经创建集群。

下载 Kafka connector

安装 Kafka connector 至 Kafka connect。

使用示例

本文以自建 Kafka 集群为例,介绍如何配置 Kafka connector 和 Kafka connect,然后启动 Kafka Connect 导入数据至 StarRocks。

数据集

假设 Kafka 集群的 Topic test 中存在如下 JSON 格式的数据。

{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}

目标数据库和表

根据 JSON 数据中需要导入的 key,在 StarRocks 集群的目标数据库 example_db 中创建表 test_tbl

CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);

配置 Kafka connector 和 Kafka Connect,然后启动 Kafka Connect 导入数据

通过 Standalone 模式启动 Kafka Connect

  1. 配置 Kafka connector。在 Kafka 安装目录下的 config 目录,创建 Kafka connector 的配置文件 connect-StarRocks-sink.properties,并配置对应参数。参数和相关说明,参见参数说明

    信息
    • 在本示例中,StarRocks 提供的 Kafka connector 是 sink connector,能够持续消费 Kafka 的数据并导入 StarRocks。
    • 如果源端数据为 CDC 数据,例如 Debezium CDC 格式的数据,并且 StarRocks 表为主键表,为了将源端的数据变更同步至主键表,则您还需要在 StarRocks 提供的 Kafka connector 的配置文件 connect-StarRocks-sink.properties配置 transforms 以及相关参数
    name=starrocks-kafka-connector
    connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
    topics=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=false
    # StarRocks FE 的 HTTP Server 地址,默认端口 8030
    starrocks.http.url=192.168.xxx.xxx:8030
    # 当 Kafka Topic 的名称与 StarRocks 表名不一致时,需要配置两者的映射关系
    starrocks.topic2table.map=test:test_tbl
    # StarRocks 用户名
    starrocks.username=user1
    # StarRocks 用户密码。您必须输入用户密码。
    starrocks.password=123456
    starrocks.database.name=example_db
    sink.properties.strip_outer_array=true
  2. 配置并启动 Kafka Connect。

    1. 配置 Kafka Connect。在 config 目录中的 config/connect-standalone.properties 配置文件中配置如下参数。参数解释,参见 Running Kafka Connect

      # kafka broker 的地址,多个 Broker 之间以英文逗号 (,) 分隔。
      # 注意本示例使用 PLAINTEXT 安全协议访问 Kafka 集群,如果使用其他安全协议访问 Kafka 集群,则您需要在本文件中配置相关信息。
      bootstrap.servers=<kafka_broker_ip>:9092
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      # Kafka connector 解压后所在的绝对路径,例如:
      plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3
    2. 启动 Kafka Connect。

      CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties

通过 Distributed 模式启动 Kafka Connect

  1. 配置并启动 Kafka Connect。

    1. 配置 Kafka Connect。在 config 目录中的 config/connect-distributed.properties 配置文件中配置如下参数。参数解释,参见 Running Kafka Connect

      # kafka broker 的地址,多个 Broker 之间以英文逗号 (,) 分隔。
      # 注意本示例使用 PLAINTEXT 安全协议访问 Kafka 集群,如果使用其他安全协议访问 Kafka 集群,则您需要在本文件中配置相关信息。 bootstrap.servers=<kafka_broker_ip>:9092
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=false
      # Kafka connector 解压后所在的绝对路径,例如:
      plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3
    2. 启动 Kafka Connect。

      CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
  2. 配置并创建 Kafka connector。注意,在 Distributed 模式下您需要通过 REST API 来配置并创建 Kafka connector。参数和相关说明,参见参数说明

    信息
    • 在本示例中,StarRocks 提供的 Kafka connector 是 sink connector,能够持续消费 Kafka 的数据并导入 StarRocks。
    • 如果源端数据为 CDC 数据,例如 Debezium CDC 格式的数据,并且 StarRocks 表为主键表,为了将源端的数据变更同步至主键表,则您还需要在 StarRocks 提供的 Kafka connector 的配置文件 connect-StarRocks-sink.properties配置 transforms 以及相关参数
    curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
    "name":"starrocks-kafka-connector",
    "config":{
    "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
    "topics":"test",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"true",
    "value.converter.schemas.enable":"false",
    "starrocks.http.url":"192.168.xxx.xxx:8030",
    "starrocks.topic2table.map":"test:test_tbl",
    "starrocks.username":"user1",
    "starrocks.password":"123456",
    "starrocks.database.name":"example_db",
    "sink.properties.strip_outer_array":"true"
    }
    }'

查询 StarRocks 表中的数据

查询 StarRocks 目标表 test_tbl,返回如下结果则表示数据已经成功导入。

MySQL [example_db]> select * from test_tbl;
+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)

参数说明

参数是否必填默认值描述
name表示当前的 Kafka connector,在 Kafka Connect 集群中必须为全局唯一,例如 starrocks-kafka-connector。
connector.classKafka connector 的 sink 使用的类:com.starrocks.connector.kafka.StarRocksSinkConnector。
topics一个或多个待订阅 Topic,每个 Topic 对应一个 StarRocks 表,默认情况下 Topic 的名称与 StarRocks 表名一致,导入时根据 Topic 名称确定目标 StarRocks 表。topicstopics.regex(如下) 两者二选一填写。如果两者不一致,则还需要配置 starrocks.topic2table.map
topics.regex与待订阅 Topic 匹配的正则表达式。更多解释,同 topicstopicstopics.regextopics(如上)两者二选一填写。
starrocks.topic2table.map当 Topic 的名称与 StarRocks 表名不一致时,该配置项可以说明映射关系,格式为 <topic-1>:<table-1>,<topic-2>:<table-2>,...
starrocks.http.urlFE 的 HTTP Server 地址。格式为 <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,...。多个地址,使用英文逗号 (,) 分隔。例如 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030
starrocks.database.nameStarRocks 目标库名。
starrocks.usernameStarRocks 用户名。用户需要具有目标表的 INSERT 权限。
starrocks.passwordStarRocks 用户密码。
key.converterKafka Connect 集群的 key convertersink connector (在本场景中即 Kafka-connector-starrocks) 的 key converter,用于反序列化 Kafka 数据的 key。默认为 Kafka Connect 集群的 key converter, 您也可以自定义配置。
value.converterKafka Connect 集群的 value convertersink connector 的 value converter,用于反序列化 Kafka 数据的 value。默认为 Kafka Connect 集群的 value converter, 您也可以自定义配置。
key.converter.schema.registry.urlkey converter 对应的 schema registry 地址。
value.converter.schema.registry.urlvalue converter 对应的 schema registry 地址。
tasks.max1Kafka connector 要创建的 task 线程数量上限,通常与 Kafka Connect 集群中的 worker 节点上的 CPU 核数量相同。如果需要增加导入性能的时候可以调整该参数。
bufferflush.maxbytes94371840(90M)数据攒批的大小,达到该阈值后将数据通过 Stream Load 批量写入 StarRocks。取值范围:[64MB, 10GB]。 Stream Load SDK buffer可能会启动多个 Stream Load 来缓冲数据,因此这里的阈值是指总数据量大小。
bufferflush.intervalms300000数据攒批发送的间隔,用于控制数据写入 StarRocks 的延迟,取值范围:[1000, 3600000]。
connect.timeoutms1000连接 http-url 的超时时间。取值范围:[100, 60000]。
sink.properties.*指定 Stream Load 的参数,用于控制导入行为,例如使用 sink.properties.format 指定导入数据的格式为 CSV 或者 JSON。更多参数和说明,请参见 Stream Load
sink.properties.formatjsonStream Load 导入时的数据格式。取值为 CSV 或者 JSON。默认为JSON。更多参数说明,参见 CSV 适用参数JSON 适用参数

使用限制

  • 不支持将 Kafka topic 里的一条消息展开成多条导入到 StarRocks。
  • StarRocks 提供的 Kafka connector 的 Sink 保证 at-least-once 语义。

最佳实践

导入 Debezium CDC 格式数据

如果 Kafka 数据为 Debezium CDC 格式,并且 StarRocks 表为主键表,则在 StarRocks 提供的 Kafka connector 的配置文件 connect-StarRocks-sink.properties 中除了配置基础参数外,还需要配置 transforms 以及相关参数。

信息

在本示例中,StarRocks 提供的 Kafka connector 是 sink connector,能够持续消费 Kafka 的数据并导入 StarRocks。

transforms=addfield,unwrap
transforms.addfield.type=com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=rewrite

在上述配置中,我们指定 transforms=addfield,unwrap

  • Debezium CDC 格式数据中 op 字段记录了来自上游数据库的数据对应的 SQL 操作,cud 分别代表 create,update 和 delete。如果 StarRocks 表是主键表,则需要指定 addfield transform。addfield transform 会为每行数据增加一个 __op 字段,来标记数据对应的 SQL 操作,并且会根据 Debezium CDC 格式数据的 op 字段的值去 before 或者 after 字段中里取其它列的值,以拼成一个完整的一行数据。最终这些数据会转成 JSON 或 CSV 格式,写入 StarRocks 中。addfield transform 的类是 com.Starrocks.Kafka.Transforms.AddOpFieldForDebeziumRecord,已经包含在 Kafka connector JAR 文件中,您无需手动安装。

    如果 StarRocks 表不是主键表,则无需指定 addfield transform。

  • unwrap transform 是指由 Debezium 提供的 unwrap,可以根据操作类型 unwrap Debezium 复杂的数据结构。更多信息,参见 New Record State Extraction