Kafka コネクタを使用してデータをロードする
StarRocks は、Apache Kafka® コネクタ (StarRocks Connector for Apache Kafka®) という独自開発のコネクタを提供しており、Kafka からメッセージを継続的に消費し、それを StarRocks にロードします。Kafka コネクタは、少なくとも一度のセマンティクスを保証します。
Kafka コネクタは Kafka Connect とシームレスに統合でき、StarRocks が Kafka エコシステムとより良く統合されます。リアルタイムデータを StarRocks にロードしたい場合には賢明な選択です。Routine Load と比較して、以下のシナリオでは Kafka コネクタの使用が推奨されます。
- Routine Load は CSV、JSON、Avro フォーマットでのデータロードのみをサポートしていますが、Kafka コネクタは Protobuf など、より多くのフォーマットでのデータロードが可能です。Kafka Connect のコンバータを使用してデータを JSON や CSV フォーマットに変換できれば、Kafka コネクタを介して StarRocks にデータをロードできます。
- Debezium フォーマットの CDC データなど、データ変換をカスタマイズします。
- 複数の Kafka トピックからデータをロードします。
- Confluent Cloud からデータをロードします。
- ロードバッチサイズ、並行性、その他のパラメータを細かく制御して、ロード速度とリソース利用のバランスを取る必要があります。
準備
バージョン要件
| コネクタ | Kafka | StarRocks | Java |
|---|---|---|---|
| 1.0.4 | 3.4 | 2.5 and later | 8 |
| 1.0.3 | 3.4 | 2.5 and later | 8 |
Kafka 環境のセットアップ
自己管理の Apache Kafka クラスターと Confluent Cloud の両方がサポートされています。
- 自己管理の Apache Kafka クラスターの場合、Apache Kafka クイックスタート を参照して、Kafka クラスターを迅速にデプロイできます。Kafka Connect はすでに Kafka に統合されています。
- Confluent Cloud の場合、Confluent アカウントを持ち、クラスターを作成していることを確認してください。
Kafka コネクタのダウンロード
Kafka コネクタを Kafka Connect に提出します。
-
自己管理の Kafka クラスター:
starrocks-kafka-connector-xxx.tar.gz をダウンロードして解凍します。
-
Confluent Cloud:
現在、Kafka コネクタは Confluent Hub にアップロードされていません。starrocks-kafka-connector-xxx.tar.gz をダウンロードして解凍し、ZIP ファイルにパッケージして Confluent Cloud にアップロードする必要があります。
ネットワーク構成
Kafka が配置されているマシンが StarRocks クラスターの FE ノードに http_port (デフォルト: 8030) および query_port (デフォルト: 9030) を介してアクセスでき、BE ノードに be_http_port (デフォルト: 8040) を介してアクセスできることを確認してください。
使用方法
このセクションでは、自己管理の Kafka クラスターを例にとり、Kafka コネクタと Kafka Connect を設定し、Kafka Connect を実行して StarRocks にデータをロードする方法を説明します。
データセットの準備
Kafka クラスターのトピック test に JSON フォーマットのデータが存在すると仮定します。
{"id":1,"city":"New York"}
{"id":2,"city":"Los Angeles"}
{"id":3,"city":"Chicago"}
テーブルの作成
StarRocks クラスターのデータベース example_db に JSON フォーマットデータのキーに基づいてテーブル test_tbl を作成します。
CREATE DATABASE example_db;
USE example_db;
CREATE TABLE test_tbl (id INT, city STRING);