Apache Flink® からデータを継続的にロード
StarRocks は、Apache Flink® 用に StarRocks Connector for Apache Flink®(略して Flink connector)という独自開発のコネクタを提供しています。これを使用して、Flink を介して StarRocks テーブルにデータをロードできます。基本的な原理は、データを蓄積し、STREAM LOAD を通じて一度にすべてのデータを StarRocks にロードすることです。
Flink connector は DataStream API、Table API & SQL、Python API をサポートしています。これは、Apache Flink® が提供する flink-connector-jdbc よりも高い安定したパフォーマンスを持っています。
注意
Flink connector を使用して StarRocks テーブルにデータをロードするには、SELECT および INSERT 権限が必要です。これらの権限がない場合は、GRANT に従って、StarRocks クラスターに接続するために使用するユーザーにこれらの権限を付与してください。
バージョン要件
| Connector | Flink | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later | 8 | 2.11,2.12 |
Flink connector の取得
Flink connector の JAR ファイルを取得する方法は次のとおりです:
- コンパイル済みの Flink connector JAR ファイルを直接ダウンロードします。
- Flink connector を Maven プロジェクトの依存関係として追加し、JAR ファイルをダウンロードします。
- Flink connector のソースコードを自分でコンパイルして JAR ファイルを作成します。
Flink connector JAR ファイルの命名形式は以下の通りです:
-
Flink 1.15 以降では、
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jarです。例えば、Flink 1.15 をインストールし、Flink connector 1.2.7 を使用したい場合、flink-connector-starrocks-1.2.7_flink-1.15.jarを使用できます。 -
Flink 1.15 より前では、
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jarです。例えば、Flink 1.14 と Scala 2.12 を環境にインストールし、Flink connector 1.2.7 を使用したい場合、flink-connector-starrocks-1.2.7_flink-1.14_2.12.jarを使用できます。
注意
一般に、Flink connector の最新バージョンは、Flink の直近3つのバージョンとのみ互換性を維持します。
コンパイル済み Jar ファイルのダウンロード
Maven Central Repository から対応するバージョンの Flink connector Jar ファイルを直接ダウンロードします。
Maven 依存関係
Maven プロジェクトの pom.xml ファイルに、以下の形式で Flink connector を依存関係として追加します。flink_version、scala_version、および connector_version をそれぞれのバージョンに置き換えてください。
-
Flink 1.15 以降
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
Flink 1.15 より前のバージョン
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
自分でコンパイル
-
Flink connector package をダウンロードします。
-
以下のコマ ンドを実行して、Flink connector のソースコードを JAR ファイルにコンパイルします。
flink_versionは対応する Flink バージョンに置き換えてください。sh build.sh <flink_version>例えば、環境内の Flink バージョンが 1.15 の場合、以下のコマンドを実行する必要があります:
sh build.sh 1.15 -
target/ディレクトリに移動し、コンパイルによって生成された Flink connector JAR ファイル(例:flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar)を見つけます。
注意
正式にリリースされていない Flink connector の名前には
SNAPSHOTサフィックスが含まれます。
オプション
| オプション | 必須 | デフォルト値 | 説明 |
|---|---|---|---|
| connector | Yes | NONE | 使用したいコネクタ。値は "starrocks" でなければなりません。 |
| jdbc-url | Yes | NONE | FE の MySQL サーバーに接続するために使用されるアドレス。複数のアドレスを指定でき、カンマ(,)で区切る必要があります。形式:jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>。 |
| load-url | Yes | NONE | StarRocks クラスター内の FE の HTTP URL。複数の URL を指定でき、セミコロン(;)で区切る必要があります。形式:<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>。 |
| database-name | Yes | NONE | データをロードしたい StarRocks データベースの名前。 |
| table-name | Yes | NONE | StarRocks にデータをロードするために使用するテーブルの名前。 |
| username | Yes | NONE | StarRocks にデータをロードするために使用するアカウントのユーザー名。このアカウントには SELECT および INSERT 権限 が必要です。 |
| password | Yes | NONE | 前述のアカウントのパスワード。 |
| sink.semantic | No | at-least-once | sink によって保証されるセマンティクス。有効な値:at-least-once および exactly-once。 |
| sink.version | No | AUTO | データをロードするために使用されるインターフェース。このパラメータは Flink connector バージョン 1.2.4 以降でサポートされています。
|
| sink.label-prefix | No | NONE | Stream Load に使用されるラベルプレフィックス。connector 1.2.8 以降で exactly-once を使用している場合は設定を推奨します。exactly-once 使用上の注意 を参照してください。 |
| sink.buffer-flush.max-bytes | No | 94371840(90M) | 一度に StarRocks に送信される前にメモリ内に蓄積できるデータの最大サイズ。最大値は 64 MB から 10 GB の範囲です。このパラメータを大きな値に設定すると、ロードパフォーマンスが向上しますが、ロード 遅延が増加する可能性があります。 |
| sink.buffer-flush.max-rows | No | 500000 | 一度に StarRocks に送信される前にメモリ内に蓄積できる行の最大数。このパラメータは、sink.version を V1 に設定し、sink.semantic を at-least-once に設定した場合にのみ利用可能です。有効な値:64000 から 5000000。 |
| sink.buffer-flush.interval-ms | No | 300000 | データがフラッシュされる間隔。このパラメータは、sink.semantic を at-least-once に設定した場合にのみ利用可能です。有効な値:1000 から 3600000。単位:ms。 |
| sink.max-retries | No | 3 | Stream Load ジョブを実行するためにシステムが再試行する回数。このパラメータは、sink.version を V1 に設定した場合にのみ利用可能です。有効な値:0 から 10。 |
| sink.connect.timeout-ms | No | 30000 | HTTP 接続を確立するためのタイムアウト。有効な値:100 から 60000。単位:ms。Flink connector v1.2.9 より前では、デフォルト値は 1000 です。 |
| sink.socket.timeout-ms | No | -1 | 1.2.10 以降でサポートされています。HTTP クライアントがデータを待機する時間の長さ。単位:ms。デフォルト値 -1 はタイムアウトがないことを意味します。 |
| sink.wait-for-continue.timeout-ms | No | 10000 | 1.2.7 以降でサポートされています。FE からの HTTP 100-continue の応答を待機するためのタイムアウト。有効な値:3000 から 60000。単位:ms |
| sink.ignore.update-before | No | true | バージョン 1.2.8 以降でサポートされ ています。Primary Key テーブルにデータをロードする際に、Flink からの UPDATE_BEFORE レコードを無視するかどうか。このパラメータが false に設定されている場合、レコードは StarRocks テーブルへの削除操作として扱われます。 |
| sink.properties.* | No | NONE | Stream Load の動作を制御するために使用されるパラメータ。例えば、パラメータ sink.properties.format は Stream Load に使用される形式を指定します。サポートされているパラメータとその説明のリストについては、STREAM LOAD を参照してください。 |
| sink.properties.format | No | csv | Stream Load に使用される形式。Flink connector は各バッチのデータを StarRocks に送信する前にその形式に変換します。有効な値:csv および json。 |
| sink.properties.row_delimiter | No | \n | CSV 形式のデータの行区切り文字。 |
| sink.properties.column_separator | No | \t | CSV 形式のデータの列区切り文字。 |
| sink.properties.max_filter_ratio | No | 0 | Stream Load の最大エラー許容度。データ品質が不十分なためにフィルタリングされるデータレコードの最大割合です。有効な値:0 から 1。デフォルト値:0。詳細は Stream Load を参照してください。 |
| sink.parallelism | No | NONE | コネクタの並行性。Flink SQL のみで利用可能です。設定されていない場合、Flink プランナーが並行性を決定します。マルチ 並行性のシナリオでは、ユーザーはデータが正しい順序で書き込まれることを保証する必要があります。 |
| sink.properties.strict_mode | No | false | Stream Load の厳密モードを有効にするかどうかを指定します。不適格な行(列値が一致しないなど)がある場合のロード動作に影響します。有効な値:true および false。デフォルト値:false。詳細は Stream Load を参照してください。 |
| sink.properties.compression | No | NONE | 1.2.10 以降でサポートされています。Stream Load に使用される圧縮アルゴリズム。現在、圧縮は JSON 形式にのみサポートされています。有効な値:lz4_frame。JSON 形式の圧縮は StarRocks v3.2.7 以降でのみサポートされています。 |
Flink と StarRocks のデータ型マッピング
| Flink データ型 | StarRocks データ型 |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| BINARY | INT |
| CHAR | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| DATE | DATE |
| TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
| ARRAY<T> | ARRAY<T> |
| MAP<KT,VT> | JSON STRING |
| ROW<arg T...> | JSON STRING |
使用上の注意
Exactly Once
-
sink が exactly-once セマンティクスを保証することを望む場合、StarRocks を 2.5 以上に、Flink connector を 1.2.4 以上にアップグレードすることをお勧めします。
-
Flink connector 1.2.4 以降、exactly-once は StarRocks 2.4 以降で提供される Stream Load transaction interface に基づいて再設計されています。非トランザクション Stream Load インターフェースに基づく以前の実装と比較して、新しい実装はメモリ使用量とチェックポイントのオーバーヘッドを削減し、ロードのリアルタイム性能と安定性を向上させます。
-
StarRocks のバージョンが 2.4 より前、または Flink connector のバージョンが 1.2.4 より前の場合、sink は自動的に非トランザクション Stream Load インターフェースに基づく実装を選択します。
-
-
exactly-once を保証するための設定
-
sink.semanticの値はexactly-onceにする必要があります。 -
Flink connector のバージョンが 1.2.8 以降の場合、
sink.label-prefixの値を指定することをお勧めします。ラベルプレフィックスは、Flink ジョブ、Routine Load、Broker Load など、StarRocks のすべてのロードタイプ間で一意でなければなりません。-
ラベルプレフィックスが指定されている場合、Flink connector は、Flink の失敗シナリオ(例えば、チェックポイントが進行中のときに Flink ジョブが失敗する場合)で生成される可能性のある残存トランザクションをクリーンアップするためにラベルプレフィックスを使用します。これらの残存トランザクションは、
SHOW PROC '/transactions/<db_id>/running';を使用して StarRocks で表示すると、一般的にPREPAREDステータスになります。Flink ジョブがチェックポイントから復元されると、Flink connector はラベルプレフィックスとチェックポイント内の情報に基づいてこれらの残存トランザクションを見つけ、それらを中止します。Flink ジョブが終了するときに、exactly-once を実装するための二段階コミットメカニズムのため、Flink connector はそれらを中止できません。Flink ジョブが終了すると、Flink チェックポイントコーディネーターからトランザクションが成功したチェックポイントに含まれるべきかどうかの通知を受け取っていないため、これらのトランザクションが中止されるとデータが失われる可能性があります。Flink でエンドツーエンドの exactly-once を達成する方法についての概要は、この ブログ記事 を参照してください。 -
ラベルプレフィックスが指定されていない場合、残存トランザクションはタイムアウト後にのみ StarRocks によってクリーンアップされます。ただし、Flink ジョブがトランザクションのタイムアウト前に頻繁に失敗すると、実行中のトランザクションの数が StarRocks の
max_running_txn_num_per_dbの制限に達する可能性があります。タイムアウトの長さは StarRocks FE の設定prepared_transaction_default_timeout_secondによって制御され、デフォルト値は86400(1 日)です。ラベルプレフィックスが指定されていない場合は、トランザクションがより早く期限切れになるように、これを小さな値に設定できます。
-
-
-
Flink ジョブが停止または継続的なフェイルオーバーの後に長時間のダウンタイムから最終的にチェックポイントまたはセーブポイントから復旧することが確実である場合、データ損失を避けるために以下の StarRocks 設定を調整してください。
-
prepared_transaction_default_timeout_second: StarRocks FE の設定で、デフォルト値は86400です。この設定の値は Flink ジョブのダウンタイムよりも大きくする必要があります。そうしないと、Flink ジョブを再起動する前にタイムアウトのために成功したチェックポイントに含まれる残存トランザクションが中止され、データ損失につながる可能性があります。この設定に大きな値を設定する場合、ラベルプレフィックスの値を指定することをお勧めします。 これにより、タイムアウトによるクリーンアップ(データ損失を引き起こす可能性がある)ではなく、チェックポイント内の情報に基づいてラベルプレフィックスに従って残存トランザクションをクリーンアップできます。
-
label_keep_max_secondおよびlabel_keep_max_num: StarRocks FE の設定で、デフォルト値はそれぞれ259200と1000です。詳細は FE 設定 を参照してください。label_keep_max_secondの値は Flink ジョブのダウンタイムよりも大きくする必要があります。そうしないと、Flink connector は Flink のセーブポイントまたはチェックポイントに保存されたトランザクションラベルを使用して StarRocks 内のトランザクションの状態を確認し、それらがコミットされているかどうかを判断できず、最終的にデータ損失につながる可能性があります。
これらの設定は変更可能であり、
ADMIN SET FRONTEND CONFIGを使用して変更できます:ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600");
ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200");
ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000"); -