Kafka connector for StarRocks
Kafka コネクタを使用してデータをロードする
StarRocks は、Apache Kafka® コネクタ(StarRocks Connector for Apache Kafka®、以下 Kafka コネクタ)という自社開発のコネクタを提供しています。これはシンクコネクタとして、Kafka からメッセージを継続的に消費し、それらを StarRocks にロードします。Kafka コネクタは少なくとも一度のセマンティクスを保証します。
Kafka コネクタは Kafka Connect とシームレスに統合でき、StarRocks が Kafka エコシステムとより良く統合されることを可能にします。リアルタイムデータを StarRocks にロードしたい場合、これは賢明な選択です。Routine Load と比較して、以下のシナリオでは Kafka コネクタの使用が推奨されます。
- Routine Load は CSV、JSON、Avro フォーマットでのデータロードのみをサポートしていますが、Kafka コネクタは Protobuf など、より多くのフォーマットでデータをロードできます。Kafka Connect のコンバータを使用してデータを JSON や CSV フォーマットに変換できる限り、データは Kafka コネクタを通じて StarRocks にロードできます。
- Debezium フォーマットの CDC データなど、データ変換をカスタマイズ。
- 複数の Kafka トピックからデータをロード。
- Confluent Cloud からデータをロード。
- ロードバッチサイズ、並行性、その他のパラメータを細かく制御して、ロード速度とリソース利用のバランスを取る必要がある場合。
準備
バージョン要件
| コネクタ | Kafka | StarRocks | Java |
|---|---|---|---|
| 1.0.4 | 3.4 | 2.5 and later | 8 |
| 1.0.3 | 3.4 | 2.5 and later | 8 |
Kafka 環境のセットアップ
自己管理の Apache Kafka クラスターと Confluent Cloud の両方がサポートされています。
- 自己管理の Apache Kafka クラスターの場合、Apache Kafka クイックスタート を参照して、Kafka クラスターを迅速にデプロイできます。Kafka Connect はすでに Kafka に統合されています。
- Confluent Cloud の場合、Confluent アカウントを持ち、クラスターを作成していることを確認してください。
Kafka コネクタのダウンロード
Kafka コネクタを Kafka Connect に提出します。
-
自己管理の Kafka クラスター:
starrocks-kafka-connector-xxx.tar.gz をダウンロードして解凍します。
-
Confluent Cloud:
現在、Kafka コネクタは Confluent Hub にアップロードされていません。starrocks-kafka-connector-xxx.tar.gz をダウンロードして解凍し、ZIP ファイルにパッケージして Confluent Cloud にアップロードする必要があります。
ネットワーク構成
Kafka が配置されているマシンが StarRocks クラスターの FE ノードに http_port(デフォルト: 8030)および query_port(デフォルト: 9030)を介してアクセスでき、BE ノードに be_http_port(デフォルト: 8040)を介してアクセスできることを確認してください。
使用方法
このセクションでは、自己管理の Kafka クラスターを例にとり、Kafka コネクタと Kafka Connect を設定し、Kafka Connect を実行して StarRocks にデータをロードする方法を説明します。
データセットの準備
Kafka クラスターのトピック test に JSON フォーマットのデータが存在すると仮定します。
{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}
テーブルの作成
StarRocks クラスターのデータベース example_db に JSON フォーマットデータのキーに基づいてテーブル test_tbl を作成します。
CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);
Kafka コネクタと Kafka Connect の 設定、そして Kafka Connect を実行してデータをロード
スタンドアロンモードで Kafka Connect を実行
-
Kafka コネクタを設定します。Kafka のインストールディレクトリの config ディレクトリに、Kafka コネクタ用の設定ファイル connect-StarRocks-sink.properties を作成し、以下のパラメータを設定します。詳細なパラメータと説明については、Parameters を参照してください。
備考- この例では、StarRocks が提供する Kafka コネクタは、Kafka からデータを継続的に消費し、StarRocks にデータをロー ドできるシンクコネクタです。
- ソースデータが Debezium フォーマットの CDC データであり、StarRocks テーブルが主キーテーブルである場合、StarRocks が提供する Kafka コネクタ用の設定ファイル connect-StarRocks-sink.properties で
transformを設定し、ソースデータの変更を主キーテーブルに同期する必要があります。
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# StarRocks クラスター内の FE の HTTP URL。デフォルトポートは 8030 です。
starrocks.http.url=192.168.xxx.xxx:8030
# Kafka トピック名が StarRocks テーブル名と異なる場合、それらの間のマッピング関係を設定する必要があります。
starrocks.topic2table.map=test:test_tbl
# StarRocks のユーザー名を入力します。
starrocks.username=user1
# StarRocks のパスワードを入力します。
starrocks.password=123456
starrocks.database.name=example_db
sink.properties.strip_outer_array=true -
Kafka Connect を設定して実行します。
-
Kafka Connect を設定します。config ディレクトリ内の設定ファイル config/connect-standalone.properties で、以下のパラメータを設定します。詳細なパラメータと説明については、Running Kafka Connect を参照してください。
# Kafka ブローカーのアドレス。複数の Kafka ブローカーのアドレスはカンマ (,) で区切る必要があります。
# この例では、Kafka クラスターにアクセスするためのセキュリティプロトコルとして PLAINTEXT を使用しています。Kafka クラスターにアクセスするために他のセキュリティプロトコルを使用している場合は、このファイルに関連情報を設定する必要があります。
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# 解凍後の Kafka コネクタの絶対パス。例:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
Kafka Connect を実行します。
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties
-
分散モードで Kafka Connect を実行
-
Kafka Connect を設定して実行します。
-
Kafka Connect を設定します。config ディレクトリ内の設定ファイル
config/connect-distributed.propertiesで、以下のパラメータを設定します。詳細なパラメータと説明については、Running Kafka Connect を参照してください。# Kafka ブローカーのアドレス。複数の Kafka ブローカーのアドレスはカンマ (,) で区切る必要があります。
# この例では、Kafka クラスターにアクセスするためのセキュリティプロトコルとして PLAINTEXT を使用しています。Kafka クラスターにアクセスするために他のセキュリティプロトコルを使用している場合は、このファイルに関連情報を設定する必要があります。
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# 解凍後の Kafka コネクタの絶対パス。例:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
Kafka Connect を実行します。
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
-
-
Kafka コネクタを設定して作成します。分散モードでは、REST API を通じて Kafka コネクタを設定して作成する必要があります。パラメータと説明については、Parameters を参照してください。
備考- この例では、StarRocks が提供する Kafka コネクタは、Kafka からデータを継続的に消費し、StarRocks にデータをロードできるシンクコネクタです。
- ソースデータが Debezium フォーマットの CDC データであり、StarRocks テーブルが主キーテーブルである場合、StarRocks が提供する Kafka コネクタ用の設定ファイル connect-StarRocks-sink.properties で
transformを設定し、ソースデータの変更を主キーテーブルに同期する必要があります。
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"starrocks-kafka-connector",
"config":{
"connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
"topics":"test",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"false",
"starrocks.http.url":"192.168.xxx.xxx:8030",
"starrocks.topic2table.map":"test:test_tbl",
"starrocks.username":"user1",
"starrocks.password":"123456",
"starrocks.database.name":"example_db",
"sink.properties.strip_outer_array":"true"
}
}'
StarRocks テーブルをクエリ
ターゲット StarRocks テーブル test_tbl をクエリします。
MySQL [example_db]> select * from test_tbl;
+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)
上記の結果が返された場合、データは正常にロードされています。
パラメータ
name
必須: YES
デフォルト値:
説明: この Kafka コネクタの名前。Kafka Connect クラスター内のすべての Kafka コネクタの中でグローバルにユニークである必要があります。例: starrocks-kafka-connector。
connector.class
必須: YES
デフォルト値:
説明: この Kafka コネクタのシンクに使用されるクラス。値を com.starrocks.connector.kafka.StarRocksSinkConnector に設定します。