Kafka コネクタを使用したデータのロード
StarRocks は、Apache Kafka® コネクタ(StarRocks Connector for Apache Kafka®、以下 Kafka コネクタと略します)という独自開発のコネクタを提供しています。このコネクタはシンクコネクタとして、Kafka からメッセージを継続的に消費し、StarRocks にロードします。Kafka コネクタは少なくとも一度のセマンティクスを保証します。
Kafka コネクタは Kafka Connect とシームレスに統合でき、StarRocks が Kafka エコシステムとより良く統合されることを可能にします。リアルタイムデータを StarRocks にロードしたい場合には賢明な選択です。Routine Load と比較して、以下のシナリオでは Kafka コネクタの使用が推奨されます:
- Routine Load は CSV、JSON、Avro フォーマットでのデータロードのみをサポートしていますが、Kafka コネクタは Protobuf など、より多くのフォーマットでデータをロードできます。Kafka Connect のコンバータを使用してデータを JSON や CSV フォーマットに変換できる限り、Kafka コネクタを介して StarRocks にデータをロードできます。
- Debezium フォーマットの CDC データなど、データ変換をカスタマイズします。
- 複数の Kafka トピックからデータをロードします。
- Confluent Cloud からデータをロードします。
- ロードバッチサイズ、並行性、その他のパラメータを細かく制御して、ロード速度とリソース使用率のバランスを取る必要があります。
準備
バージョン要件
| コネクタ | Kafka | StarRocks | Java |
|---|---|---|---|
| 1.0.4 | 3.4 | 2.5 and later | 8 |
| 1.0.3 | 3.4 | 2.5 and later | 8 |
Kafka 環境のセットアップ
自己管理の Apache Kafka クラスターと Confluent Cloud の両方がサポートされています。
- 自己管理の Apache Kafka クラスターの場合、Apache Kafka クイックスタートを参照して、Kafka クラスターを迅速にデプロイできます。Kafka Connect はすでに Kafka に統合されています。
- Confluent Cloud の場合、Confluent アカウントを持ち、クラスターを作成していることを確認してください。
Kafka コネクタのダウンロード
Kafka コネクタを Kafka Connect に提出します:
-
自己管理の Kafka クラスター:
starrocks-kafka-connector-xxx.tar.gz をダウンロードして解凍します。
-
Confluent Cloud:
現在、Kafka コネクタは Confluent Hub にアップロードされていません。starrocks-kafka-connector-xxx.tar.gz をダウンロードして解凍し、ZIP ファイルにパッケージして Confluent Cloud にアップロードする必要があります。
ネットワーク構成
Kafka が配置されているマシンが StarRocks クラスターの FE ノードに http_port(デフォルト:8030)および query_port(デフォルト:9030)を介してアクセスでき、BE ノードに be_http_port(デフォルト:8040)を介してアクセスできることを確認してください。
使用方法
このセクションでは、自己管理の Kafka クラスターを例にとり、Kafka コネクタと Kafka Connect を設定し、Kafka Connect を実行して StarRocks にデータをロードする方法を説明します。
データセットの準備
Kafka クラスターのトピック test に JSON フォーマットのデータが存在すると仮定します。
{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}
テーブルの作成
StarRocks クラスターのデータベース example_db に JSON フォーマットデータのキーに基づいてテーブル test_tbl を作成します。
CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);
Kafka コネクタと Kafka Connect の設定、そして Kafka Connect を実行してデータをロード
スタンドアロンモードで Kafka Connect を実行
-
Kafka コネクタを設定します。Kafka インストールディレクトリの config ディレクトリに、Kafka コネクタ用の設定ファイル connect-StarRocks-sink.properties を作成し、以下のパラメータを設定します。詳細なパラメータと説明については、Parameters を参照してください。
備考- この例では、StarRocks が提供する Kafka コネクタは、Kafka からデータを継続的に消費し、StarRocks にデータをロードできるシンクコネクタです。
- ソースデータが CDC データ(例えば、Debezium フォーマットのデータ)であり、StarRocks テーブルが主キーテーブルである場合、StarRocks が提供する Kafka コネクタ用の設定ファイル connect-StarRocks-sink.properties で
transformを設定し、ソースデータの変更を主キーテーブルに同期する必要があります。
name=starrocks-kafka-connector
connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
topics=test
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# StarRocks クラスター内の FE の HTTP URL。デフォルトポートは 8030。
starrocks.http.url=192.168.xxx.xxx:8030
# Kafka トピック名が StarRocks テーブル名と異なる場合、それらの間のマッピング関係を設定する必要があります。
starrocks.topic2table.map=test:test_tbl
# StarRocks のユーザー名を入力します。
starrocks.username=user1
# StarRocks のパスワードを入力します。
starrocks.password=123456
starrocks.database.name=example_db
sink.properties.strip_outer_array=true -
Kafka Connect を設定して実行します。
-
Kafka Connect を設定します。config ディレクトリの設定ファイル config/connect-standalone.properties で、以下のパラメータを設定します。詳細なパラメータと説明については、Running Kafka Connect を参照してください。
# Kafka ブローカーのアドレス。複数の Kafka ブローカーのアドレスはカンマ(,)で区切る必要があります。
# この例では、Kafka クラスターにアクセスするためのセキュリティプロトコルとして PLAINTEXT を使用しています。他のセキュリティプロトコルを使用して Kafka クラスターにアクセスする場合は、このファイルに関連情報を設定する必要があります。
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# 解凍後の Kafka コネクタの絶対パス。例:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
Kafka Connect を実行します。
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-standalone.sh config/connect-standalone.properties config/connect-starrocks-sink.properties
-
分散モードで Kafka Connect を実行
-
Kafka Connect を設定して実行します。
-
Kafka Connect を設定します。config ディレクトリの設定ファイル
config/connect-distributed.propertiesで、以下のパラメータを設定します。詳細なパラメータと説明については、Running Kafka Connect を参照してください。# Kafka ブローカーのアドレス。複数の Kafka ブローカーのアドレスはカンマ(,)で区切る必要があります。
# この例では、Kafka クラスターにアクセスするためのセキュリティプロトコルとして PLAINTEXT を使用しています。他のセキュリティプロトコルを使用して Kafka クラスターにアクセスする場合は、このファイルに関連情報を設定する必要があります。
bootstrap.servers=<kafka_broker_ip>:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# 解凍後の Kafka コネクタの絶対パス。例:
plugin.path=/home/kafka-connect/starrocks-kafka-connector-1.0.3 -
Kafka Connect を実行します。
CLASSPATH=/home/kafka-connect/starrocks-kafka-connector-1.0.3/* bin/connect-distributed.sh config/connect-distributed.properties
-
-
Kafka コネクタを設定して作成します。分散モードでは、REST API を通じて Kafka コネクタを設定して作成する必要があります。パラメータと説明については、Parameters を参照してください。
備考- この例では、StarRocks が提供する Kafka コネクタは、Kafka からデータを継続的に消費し、StarRocks にデータをロードできるシンクコネクタです。
- ソースデータが CDC データ(例えば、Debezium フォーマットのデータ)であり、StarRocks テーブルが主キーテーブルである場合、StarRocks が提供する Kafka コネクタ用の設定ファイル connect-StarRocks-sink.properties で
transformを設定し、ソースデータの変更を主キーテーブルに同期する必要があります。
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"starrocks-kafka-connector",
"config":{
"connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector",
"topics":"test",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"false",
"starrocks.http.url":"192.168.xxx.xxx:8030",
"starrocks.topic2table.map":"test:test_tbl",
"starrocks.username":"user1",
"starrocks.password":"123456",
"starrocks.database.name":"example_db",
"sink.properties.strip_outer_array":"true"
}
}'
StarRocks テーブルをクエリ
ターゲット StarRocks テーブル test_tbl をクエリします。
MySQL [example_db]> select * from test_tbl;
+------+-------------+
| id | city |
+------+-------------+
| 1 | New York |
| 2 | Los Angeles |
| 3 | Chicago |
+------+-------------+
3 rows in set (0.01 sec)
上記の結果が返された場合、データは正常にロードされています。
パラメータ
name
必須: YES
デフォルト値:
説明: この Kafka コネクタの名前。Kafka Connect クラスター内のすべての Kafka コネクタの中でグローバルにユニークである必要があります。例:starrocks-kafka-connector。
connector.class
必須: YES
デフォルト値:
説明: この Kafka コネクタのシンクで使用されるクラス。値を com.starrocks.connector.kafka.StarRocksSinkConnector に設定します。
topics
必須:
デフォルト値:
説明: 購読する1つ以上のトピックで、各トピックは StarRocks テーブルに対応します。デフォルトでは、StarRocks はトピック名が StarRocks テーブル名と一致すると仮定します。そのため、StarRocks はトピック名を使用してターゲットの StarRocks テーブルを決定します。topics または topics.regex(下記)のいずれかを記入してください。ただし、StarRocks テーブル名がトピック名と異なる場合は、オプションの starrocks.topic2table.map パラメータ(下記)を使用してトピック名からテーブル名へのマッピングを指定します。
topics.regex
必須:
デフォルト値:
説明: 購読する1つ以上のトピックを一致させる正規表現。詳細については topics を参照してください。topics.regex または topics(上記)のいずれかを記入してください。
starrocks.topic2table.map
必須: NO
デフォルト値:
説明: トピック名 が StarRocks テーブル名と異なる場合の StarRocks テーブル名とトピック名のマッピング。フォーマットは <topic-1>:<table-1>,<topic-2>:<table-2>,... です。
starrocks.http.url
必須: YES
デフォルト値:
説明: StarRocks クラスター内の FE の HTTP URL。フォーマットは <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,... です。複数のアドレスはカンマ(,)で区切ります。例:192.168.xxx.xxx:8030,192.168.xxx.xxx:8030。
starrocks.database.name
必須: YES
デフォルト値:
説明: StarRocks データベースの名前。
starrocks.username
必須: YES
デフォルト値:
説明: StarRocks クラスターアカウントのユーザー名。ユーザーは StarRocks テーブルに対する INSERT 権限を持つ必要があります。
starrocks.password
必須: YES
デフォルト値:
説明: StarRocks クラスターアカウントのパスワード。
key.converter
必須: NO
デフォルト値: Kafka Connect クラスターで使用されるキーコンバータ
説明: このパラメータはシンクコネクタ(Kafka-connector-starrocks)用のキーコンバータを指定し、Kafka データのキーをデシリアライズするために使用されます。デフォルトのキーコンバータは Kafka Connect クラスターで使用されるものです。
value.converter
必須: NO
デフォルト値: Kafka Connect クラスターで使用される値コンバータ
説明: このパラメータはシンクコネクタ(Kafka-connector-starrocks)用の値コンバータを指定し、Kafka データの値をデシリアライズするために使用されます。デフォルトの値コンバータは Kafka Connect クラスターで使用されるものです。