Load data using Kafka connector
StarRocks provides a self-developed connector named Apache Kafka® connector (StarRocks Connector for Apache Kafka®, Kafka connector for short), as a sink connector, that continuously consumes messages from Kafka and loads them into StarRocks. The Kafka connector guarantees at-least-once semantics.
The Kafka connector can seamlessly integrate with Kafka Connect, which allows StarRocks better integrated with the Kafka ecosystem. It is a wise choice if you want to load real-time data into StarRocks. Compared with Routine Load, it is recommended to use the Kafka connector in the following scenarios:
- Compared with Routine Load which only supports loading data in CSV, JSON, and Avro formats, Kafka connector can load data in more formats, such as Protobuf. As long as data can be converted into JSON and CSV formats using Kafka Connect's converters, data can be loaded into StarRocks via the Kafka connector.
- Customize data transformation, such as Debezium-formatted CDC data.
- Load data from multiple Kafka topics.
- Load data from Confluent Cloud.
- Need finer control over load batch sizes, parallelism, and other parameters to achieve a balance between load speed and resource utilization.
Preparations
Version requirements
Connector | Kafka | StarRocks | Java |
---|---|---|---|
1.0.4 | 3.4 | 2.5 and later | 8 |
1.0.3 | 3.4 | 2.5 and later | 8 |
Set up Kafka environment
Both self-managed Apache Kafka clusters and Confluent Cloud are supported.
- For a self-managed Apache Kafka cluster, you can refer to Apache Kafka quickstart to quickly deploy a Kafka cluster. Kafka Connect is already integrated into Kafka.
- For Confluent Cloud, make sure that you have a Confluent account and have created a cluster.
Download Kafka connector
Submit the Kafka connector into Kafka Connect:
-
Self-managed Kafka cluster:
Download and extract starrocks-kafka-connector-xxx.tar.gz.
-
Confluent Cloud:
Currently, the Kafka connector is not uploaded to Confluent Hub. You need to download and extract starrocks-kafka-connector-xxx.tar.gz, package it into a ZIP file and upload the ZIP file to Confluent Cloud.
Network configuration
Ensure that the machine where Kafka is located can access the FE nodes of the StarRocks cluster via the http_port
(default: 8030
) and query_port
(default: 9030
), and the BE nodes via the be_http_port
(default: 8040
).
Usage
This section uses a self-managed Kafka cluster as an example to explain how to configure the Kafka connector and the Kafka Connect, and then run the Kafka Connect to load data into StarRocks.
Prepare a dataset
Suppose that JSON-format data exists in the topic test
in a Kafka cluster.
{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}
Create a table
Create the table test_tbl
in the database example_db
in the StarRocks cluster according to the keys of the JSON-format data.
CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);
Configure Kafka connector and Kafka Connect, and then run Kafka Connect to load data
Run Kafka Connect in standalone mode
-
Configure the Kafka connector. In the config directory under the Kafka installation directory, create the configuration file connect-StarRocks-sink.properties for the Kafka connector, and configure the following parameters. For more parameters and descriptions, see Parameters.
info- In this example, the Kafka connector provided by StarRocks is a sink connector that can continuously consume data from Kafka and load data into StarRocks.
- If the source data is CDC data, such as data in Debezium format, and the StarRocks table is a Primary Key table, you also need to configure
transform
in the configuration file connect-StarRocks-sink.properties for the Kafka connector provided by StarRocks, to synchronize the source data changes to the Primary Key table.
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# The HTTP URL of the FE in your StarRocks cluster. The default port is 8030.
starrocks.http.url=192.168.xxx.xxx:8030
# If the Kafka topic name is different from the StarRocks table name, you need to configure the mapping relationship between them.
starrocks.topic2table.map=test:test_tbl
# Enter the StarRocks username.
starrocks.username=user1
# Enter the StarRocks password.
starrocks.password=123456
starrocks.database.name=example_db
sink.properties.strip_outer_array=true -
Configure and run the Kafka Connect.
-
Configure the Kafka Connect. In the configuration file config/connect-standalone.properties in the config directory, configure the following parameters. For more parameters and descriptions, see Running Kafka Connect.
# The addresses of Kafka brokers. Multiple addresses of Kafka brokers need to be separated by commas (,).
# Note that this example uses PLAINTEXT as the security protocol to access the Kafka cluster. If you are using other security protocol to access the Kafka cluster, you need to configure the relevant information in this file.
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# The absolute path of the Kafka connector after extraction. For example:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
Run the Kafka Connect.
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties
-
Run Kafka Connect in distributed mode
-
Configure and run the Kafka Connect.
-
Configure the Kafka Connect. In the configuration file
config/connect-distributed.properties
in the config directory, configure the following parameters. For more parameters and descriptions, refer to Running Kafka Connect.# The addresses of Kafka brokers. Multiple addresses of Kafka brokers need to be separated by commas (,).
# Note that this example uses PLAINTEXT as the security protocol to access the Kafka cluster. If you are using other security protocol to access the Kafka cluster, you need to configure the relevant information in this file.
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# The absolute path of the Kafka connector after extraction. For example:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
Run the Kafka Connect.
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
-
-
Configure and create the Kafka connector. Note that in distributed mode, you need to configure and create the Kafka connector through the REST API. For parameters and descriptions, see Parameters.
info- In this example, the Kafka connector provided by StarRocks is a sink connector that can continuously consume data from Kafka and load data into StarRocks.
- If the source data is CDC data, such as data in Debezium format, and the StarRocks table is a Primary Key table, you also need to configure
transform
in the configuration file connect-StarRocks-sink.properties for the Kafka connector provided by StarRocks, to synchronize the source data changes to the Primary Key table.
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"starrocks-kafka-connector",
"config":{
"connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
"topics":"test",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"false",
"starrocks.http.url":"192.168.xxx.xxx:8030",
"starrocks.topic2table.map":"test:test_tbl",
"starrocks.username":"user1",
"starrocks.password":"123456",
"starrocks.database.name":"example_db",
"sink.properties.strip_outer_array":"true"
}
}'
Query StarRocks table
Query the target StarRocks table test_tbl
.
MySQL [example_db]> select * from test_tbl;
+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)
The data is successfully loaded when the above result is returned.
Parameters
name
Required: YES
Default value:
Description: Name for this Kafka connector. It must be globally unique among all Kafka connectors within this Kafka Connect cluster. For example, starrocks-kafka-connector.
connector.class
Required: YES
Default value:
Description: Class used by this Kafka connector's sink. Set the value to com.starrocks.connector.kafka.StarRocksSinkConnector
.