Apache Flink® からデータを継続的にロードする
StarRocks は、Apache Flink® 用の StarRocks Connector(以下、Flink コネクタ)という独自開発のコネクタを提供しており、Flink を使用して StarRocks テーブルにデータをロードするのに役立ちます。基本的な原理は、データを蓄積し、それを一度に StarRocks に STREAM LOAD を通じてロードすることです。
Flink コネクタは DataStream API、Table API & SQL、Python API をサポートしています。これは、Apache Flink® が提供する flink-connector-jdbc よりも高い安定したパフォーマンスを持っています。
注意
Flink コネクタを使用して StarRocks テーブルにデータをロードするには、対象の StarRocks テーブルに対する SELECT および INSERT 権限が必要です。これらの権限がない場合は、GRANT に従って、StarRocks クラスターに接続するために使用するユーザーにこれらの権限を付与してください。
バージョン要件
| コネクタ | Flink | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.2.14 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 |
| 1.2.12 | 1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 |
| 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 以降 | 8 | 2.11,2.12 |
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 以降 | 8 | 2.11,2.12 |
Flink コネクタの取得
Flink コネクタの JAR ファイルは以下の方法で取得できます。
- コンパイル済みの Flink コネクタ JAR ファイルを直接ダウンロードする。
- Flink コネクタを Maven プロジェクトの依存関係として追加し、JAR ファイルをダウンロードする。
- Flink コネクタのソースコードを自分でコンパイルして JAR ファイルを作成する。
Flink コネクタ JAR ファイルの命名形式は以下の通りです。
-
Flink 1.15 以降では、
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jarです。例えば、Flink 1.15 をインストールし、Flink コネクタ 1.2.7 を使用したい場合、flink-connector-starrocks-1.2.7_flink-1.15.jarを使用できます。 -
Flink 1.15 より前では、
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jarです。例えば、Flink 1.14 と Scala 2.12 を環境にインストールし、Flink コネクタ 1.2.7 を使用したい場合、flink-connector-starrocks-1.2.7_flink-1.14_2.12.jarを使用できます。
注意
一般的に、Flink コネクタの最新バージョンは、Flink の最新の 3 つのバージョンとのみ互換性を維持します。
コンパイル済みの Jar ファイルをダウンロード
Maven Central Repository から対応するバージョンの Flink コネクタ Jar ファイルを直接ダウンロードします。
Maven 依存関係
Maven プロジェクトの pom.xml ファイルに、以下の形式で Flink コネクタを依存関係として追加します。flink_version、scala_version、connector_version をそれぞれのバージョンに置き換えてください。
-
Flink 1.15 以降
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
Flink 1.15 より前のバージョン
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
自分でコンパイル
-
Flink コネクタのソースコード をダウンロードします。
-
以下のコマンドを実行して、Flink コネクタのソースコードを JAR ファイルにコンパイルします。
flink_versionは対応する Flink バージョンに置き換えてください。sh build.sh <flink_version>例えば、環境の Flink バージョンが 1.15 の場合、以下のコマンドを実行する必要があります。
sh build.sh 1.15 -
target/ディレクトリに移動し、コンパイルによって生成された Flink コネクタ JAR ファイル(例:flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar)を見つけます。
注意
正式にリリースされていない Flink コネクタの名前には
SNAPSHOTサフィックスが含まれています。
オプション
一般的なオプション
connector
- Required: Yes
- Default value: NONE
- Description: 使用するコネクタ。値は "starrocks" である必要があります。
jdbc-url
- 必須: はい
- デフォルト値: NONE
- 説明: FE の MySQL サーバーへの接続に使用されるアドレスです。複数のアドレスを指定できます。その場合、カンマ (,) で区切る必要があります。形式:
jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>。
load-url
- 必須: はい
- デフォルト値: NONE
- 説明: FE の HTTP サーバーへの接続に使用されるアドレス。 複数のアドレスを指定できます。複数のアドレスはセミコ ロン (;) で区切る必要があります。 形式:
<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>。
database-name
- 必須: はい
- デフォルト値: NONE
- 説明: データをロードする StarRocks データベースの名前です。
table-name
- 必須: はい
- デフォルト値: NONE
- 説明: データを StarRocks にロードするために使用するテーブルの名前。
username
- 必須: はい
- デフォルト値: NONE
- 説明: データを StarRocks にロードするために使用するアカウントのユーザー名。アカウントには、ターゲットの StarRocks テーブルに対する SELECT および INSERT 権限 が必要です。
password
- 必須: はい
- デフォルト値: NONE
- 説明: 前述のアカウントのパスワード。
sink.version
- 必須: いいえ
- デフォルト値: AUTO
- 説明: データのロードに使用されるインターフェース。このパラメータは、Flink connector バージョン 1.2.4 以降でサポートされています。有効な値:
V1: Stream Load インターフェースを使用してデータをロードします。1.2.4 より前のコネクタは、このモードのみをサポートしています。V2: Stream Load transaction インターフェースを使用してデータをロードします。StarRocks のバージョンが 2.4 以上である必要があります。メモリ使用量を最適化し、より安定した exactly-once の実装を提供するため、V2を推奨します。AUTO: StarRocks のバージョンがトランザクション Stream Load をサポートしている場合、自動的にV2を選択し、そうでない場合はV1を選択します。
sink.label-prefix
- 必須: いいえ
- デフォルト値: NONE
- 説明: Stream Load が使用するラベルのプレフィックス。connector 1.2.8 以降で exactly-once を使用している場合は、設定することを推奨します。exactly-once の使用に関する注意事項 を参照してください。
sink.semantic
- 必須: いいえ
- デフォルト値: at-least-once
- 説明: sink によって保証されるセマンティクス。有効な値: at-least-once および exactly-once。
sink.buffer-flush.max-bytes
- 必須: いいえ
- デフォルト値: 94371840(90M)
- 説明: 一度に StarRocks に送信する前にメモリに蓄積できるデータの最大サイズ。最大値の範囲は 64 MB から 10 GB です。このパラメータをより大きな値に設定すると、データロードのパフォーマンスが向上しますが、データロードのレイテンシーが増加する可能性があります。このパラメータは、
sink.semanticがat-least-onceに設定されている場合にのみ有効です。sink.semanticがexactly-onceに設定されている 場合、Flink チェックポイントがトリガーされると、メモリ内のデータはフラッシュされます。この場合、このパラメータは有効になりません。
sink.buffer-flush.max-rows
- 必須: いいえ
- デフォルト値: 500000
- 説明: 一度に StarRocks に送信する前にメモリに蓄積できる最大行数。このパラメータは、
sink.versionがV1で、sink.semanticがat-least-onceの場合にのみ使用できます。有効な値: 64000 ~ 5000000。
sink.buffer-flush.interval-ms
- 必須: いいえ
- デフォルト値: 300000
- 説明: データをフラッシュする間隔。このパラメータは、
sink.semanticがat-least-onceの場合にのみ使用できます。単位: ms。有効な値の範囲:- v1.2.14 より前のバージョン: [1000, 3600000]
- v1.2.14 以降: (0, 3600000]
sink.max-retries
- 必須: いいえ
- デフォルト値: 3
- 説明: システムが Stream Load ジョブの実行をリトライする回数。このパラメータは、
sink.versionをV1に設定した場合にのみ使用できます。有効な値: 0~10。
sink.connect.timeout-ms
- 必須: いいえ
- デフォルト値: 30000
- 説明: HTTP接続を確立するためのタイムアウト。有効な値:100~60000。単位:ms。Flink connector v1.2.9より前のバージョンでは、デフォルト値は
1000です。
sink.socket.timeout-ms
- 必須: いいえ
- デフォルト値: -1
- 説明: 1.2.10 からサポートされています。 HTTP クライアントがデータを待機する時間。単位: ミリ秒。デフォルト値の
-1は、タイムアウトがないことを意味します。
sink.sanitize-error-log
- 必須: いいえ
- デフォルト値: false
- 説明: 1.2.12 からサポートされています。 本番環境のセキュリティのために、エラーログ内の機密データをサニタイズするかどうかを指定します。 この項目が
trueに設定されている場合、Stream Load のエラーログ内の機密性の高い行データと列の値は、コネクタと SDK のログの両方で編集されます。 互換性を保つため、デフォルト値はfalseです。
sink.wait-for-continue.timeout-ms
- 必須: いいえ
- デフォルト値: 10000
- 説明: 1.2.7 以降でサポートされています。 FE からの HTTP 100-continue の応答を待機するタイムアウト。有効な値:
3000~60000。単位: ミリ秒
sink.ignore.update-before
- 必須: いいえ
- デフォルト値: true
- 説明: バージョン 1.2.8 以降でサポートされています。Primary Key テーブルにデータをロードする際に、Flink からの
UPDATE_BEFOREレコードを無視するかどうかを指定します。このパラメ ータを false に設定すると、レコードは StarRocks テーブルに対する削除操作として扱われます。
sink.parallelism
- 必須: いいえ
- デフォルト値: NONE
- 説明: ロードの並行性。Flink SQL でのみ利用可能です。このパラメータが指定されていない場合、Flink プランナーが並行性を決定します。マルチ並行性のシナリオでは、ユーザーはデータが正しい順序で書き込まれることを保証する必要があります。
sink.properties.*
- 必須: いいえ
- デフォルト値: NONE
- 説明: Stream Load の動作を制御するために使用されるパラメータです。たとえば、パラメータ
sink.properties.formatは、CSV や JSON など、Stream Load に使用される形式を指定します。サポートされているパラメータとその説明のリストについては、STREAM LOAD を参照してください。