Apache Flink® からのデータの継続的なロード
StarRocks は、Apache Flink® 用に独自開発したコネクタである StarRocks Connector for Apache Flink®(以下、Flink コネクタ)を提供しています。これを使用して、Flink を介して StarRocks テーブルにデータをロードできます。基本的な原理は、データを蓄積し、STREAM LOAD を通じて一度に StarRocks にロードすることです。
Flink コネクタは、DataStream API、Table API & SQL、Python API をサポートしています。Apache Flink® が提供する flink-connector-jdbc よりも高い安定したパフォーマンスを持っています。
注意
Flink コネクタを使用して StarRocks テーブルにデータをロードするには、対象の StarRocks テーブルに対する SELECT および INSERT 権限が必要です。これらの権限がない場合は、GRANT に従って、StarRocks クラスターに接続するために使用するユーザーにこれらの権限を付与してください。
バージョン要件
| コネクタ | Flink | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later | 8 | 2.11,2.12 |
Flink コネクタの取得
Flink コネクタの JAR ファイルは以下の方法で取得できます。
- コンパイル済みの Flink コネクタ JAR ファイルを直接ダウンロードする。
- Flink コネクタを Maven プロジェクトの依存関係として追加し、JAR ファイルをダウンロードする。
- Flink コネクタのソースコードを自分でコンパイルして JAR ファイルを作成する。
Flink コネクタ JAR ファイルの命名形式は以下の通りです。
-
Flink 1.15 以降では、
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jarです。例えば、Flink 1.15 をインストールし、Flink コネクタ 1.2.7 を使用したい場合、flink-connector-starrocks-1.2.7_flink-1.15.jarを使用できます。 -
Flink 1.15 より前では、
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jarです。例えば、Flink 1.14 と Scala 2.12 を環境にインストールし、Flink コネクタ 1.2.7 を使用したい場合、flink-connector-starrocks-1.2.7_flink-1.14_2.12.jarを使用できます。
注意
一般に、Flink コネクタの最新バージョンは、Flink の最新の 3 つのバージョンとのみ互換性を維持します。
コンパイル済みの Jar ファイルをダウンロード
Maven Central Repository から対応するバージョンの Flink コネクタ Jar ファイルを直接ダウンロードします。
Maven 依存関係
Maven プロジェクトの pom.xml ファイルに、以下の形式で Flink コネクタを依存関係として追加します。flink_version、scala_version、connector_version をそれぞれのバージョンに置き換えてください。
-
Flink 1.15 以降
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
Flink 1.15 より前のバージョン
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
自分でコンパイル
-
Flink コネクタのソースコード をダウンロードします。
-
Flink コネクタのソースコードを JAR ファイルにコンパイルするために、以下のコマンドを実行します。
flink_versionは対応する Flink バージョンに置き換えてください。sh build.sh <flink_version>例えば、環境の Flink バージョンが 1.15 の場合、以下のコマンドを実行する必要があります。
sh build.sh 1.15 -
target/ディレクトリに移動し、コンパイル時に生成された Flink コネクタ JAR ファイル(例:flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar)を見つけます。
注意
正式にリリースされていない Flink コネクタの名前には
SNAPSHOTサフィックスが含まれています。
オプション
connector
必須: はい
デフォルト値: NONE
説明: 使用したいコネクタ。値は "starrocks" でなければなりません。
jdbc-url
必須: はい
デフォルト値: NONE
説明: FE の MySQL サーバーに接続するために使用されるアドレス。複数のアドレスを指定でき、カンマ (,) で区切る必要があります。形式: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>。
load-url
必須: はい
デフォルト値: NONE
説明: FE の HTTP サーバーに接続するために使用されるアドレス。複数のアドレスを指定でき、セミコロン (;) で区切る必要があります。形式: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>。
database-name
必須: はい
デフォルト値: NONE
説明: データをロードしたい StarRocks データベースの名前。
table-name
必須: はい
デフォルト値: NONE
説明: StarRocks にデータをロードするために使用したいテーブルの名前。
username
必須: はい
デフォルト値: NONE
説明: StarRocks にデータをロードするために使用したいアカウントのユーザー名。このアカウントには、対象の StarRocks テーブルに対する SELECT および INSERT 権限 が必要です。
password
必須: はい
デフォルト値: NONE
説明: 前述のアカウントのパスワード。
sink.version
必須: いいえ
デフォルト値: AUTO
説明: データをロードするために使用されるインターフェース。このパラメータは Flink コネクタバージョン 1.2.4 以降でサポートされています。
V1: Stream Load インターフェースを使用してデータをロードします。1.2.4 より前のコネクタはこのモードのみをサポートしています。V2: Stream Load トランザクション インターフェースを使用してデータをロードします。StarRocks のバージョンが少なくとも 2.4 である必要があります。V2を推奨します。これはメモリ使用量を最適化し、より安定した exactly-once 実装を提供します。AUTO: StarRocks のバージョンがトランザクション Stream Load をサポートしている場合、自動的にV2を選択し、そうでない場合はV1を選択します。
sink.label-prefix
必須: いいえ
デフォルト値: NONE
説明: Stream Load で使用されるラベルプレフィックス。コネクタ 1.2.8 以降で exactly-once を使用する場合、設定を推奨します。exactly-once 使用メモ を参照してください。
sink.semantic
必須: いいえ
デフォルト値: at-least-once
説明: sink によって保証されるセマンティクス。有効な値: at-least-once と exactly-once。
sink.buffer-flush.max-bytes
必須: いいえ
デフォルト値: 94371840(90M)
説明: 一度に StarRocks に送信される前にメモリに蓄積できるデータの最大サイズ。最大値は 64 MB から 10 GB の範囲です。このパラメータを大きな値に設定すると、ロードパフォーマンスが向上しますが、ロードの遅延が増加する可能性があります。このパラメータは sink.semantic が at-least-once に設定されている場合にのみ有効です。sink.semantic が exactly-once に設定されている場合、Flink チェックポイントがトリガーされたときにメモリ内のデータがフラッシュされます。この場合、このパラメータは効果を発揮し ません。
sink.buffer-flush.max-rows
必須: いいえ
デフォルト値: 500000
説明: 一度に StarRocks に送信される前にメモリに蓄積できる行の最大数。このパラメータは sink.version が V1 であり、sink.semantic が at-least-once の場合にのみ利用可能です。有効な値: 64000 から 5000000。
sink.buffer-flush.interval-ms
必須: いいえ
デフォルト値: 300000
説明: データがフラッシュされる間隔。このパラメータは sink.semantic が at-least-once の場合にのみ利用可能です。有効な値: 1000 から 3600000。単位: ms。
sink.max-retries
必須: いいえ
デフォルト値: 3
説明: Stream Load ジョブを実行するためにシステムが再試行する回数。このパラメータは sink.version を V1 に設定した場合にのみ利用可能です。有効な値: 0 から 10。
sink.connect.timeout-ms
必須: いいえ
デフォルト値: 30000
説明: HTTP 接続を確立するためのタイムアウト。有効な値: 100 から 60000。単位: ms。Flink コネクタ v1.2.9 より前では、デフォルト値は 1000 です。
sink.socket.timeout-ms
必須: いいえ
デフォルト値: -1
説明: 1.2.10 以降でサポートされています。HTTP クライアントがデータを待機する時間。単位: ms。デフォルト値 -1 はタイムアウトがないことを意味します。
sink.wait-for-continue.timeout-ms
必須: いいえ
デフォルト値: 10000
説明: 1.2.7 以降でサポートされています。FE からの HTTP 100-continue 応答を待つタイムアウト。有効な値: 3000 から 60000。単位: ms
sink.ignore.update-before
必須: いいえ
デフォルト値: true
説明: バージョン 1.2.8 以降でサポートされています。Primary Key テーブルにデータをロードする際に Flink からの UPDATE_BEFORE レコードを無視するかどうか。このパラメータが false に設定されている場合、レコードは StarRocks テーブルへの削除操作として扱われます。
sink.parallelism
必須: いいえ
デフォルト値: NONE
説明: ロードの並行性。Flink SQL のみで利用可能です。このパラメータが指定されていない場合、Flink プランナーが並行性を決定します。マルチ並行性のシナリオでは、ユーザーはデータが正しい順序で書き込まれることを保証する必要があります。
sink.properties.*
必須: いいえ
デフォルト値: NONE
説明: Stream Load の動作を制御するために使用されるパラメータ。例えば、パラメータ sink.properties.format は Stream Load に使用される形式を指定します。サポートされているパラメータとその説明のリストについては、STREAM LOAD を参照してください。
sink.properties.format
必須: いいえ
デフォルト値: csv
説明: Stream Load に使用される形式。Flink コネクタは、各バッチのデータを StarRocks に送信する前にその形式に変換します。有効な値: csv と json。
sink.properties.column_separator
必須: いいえ
デフォルト値: \t
説明: CSV 形式のデータのカラムセパレータ。
sink.properties.row_delimiter
必須: いいえ
デフォルト値: \n
説明: CSV 形式のデータの行区切り文字。
sink.properties.max_filter_ratio
必須: いいえ
デ フォルト値: 0
説明: Stream Load の最大エラー許容率。データ品質が不十分なためにフィルタリングされるデータレコードの最大割合です。有効な値: 0 から 1。デフォルト値: 0。Stream Load を参照してください。
sink.properties.partial_update
必須: いいえ
デフォルト値: FALSE
説明: 部分更新を使用するかどうか。有効な値: TRUE と FALSE。デフォルト値: FALSE、この機能を無効にすることを示します。
sink.properties.partial_update_mode
必須: いいえ
デフォルト値: row
説明: 部分更新のモードを指定します。有効な値: row と column。
- 値
row(デフォルト)は行モードでの部分更新を意味し、多くのカラムと小さなバッチでのリアルタイム更新に適しています。 - 値
columnはカラムモードでの部分更新を意味し、少ないカラムと多くの行でのバッチ更新に適しています。このようなシナリオでは、カラムモードを有効にすると更新速度が速くなります。例えば、100 カラムのテーブルで、すべての行に対して 10 カラム(全体の 10%)のみが更新される場合、カラムモードの更新速度は 10 倍速くなります。
sink.properties.strict_mode
必須: いいえ
デフォルト値: false
説明: Stream Load の厳密モードを有効にするかどうかを指定します。これは、カラム値が一致しないなどの不適格な行がある場合のロード動作に影響します。有効な値: true と false。デフォルト値: false。Stream Load を参照してください。
sink.properties.compression
必須: いいえ
デフォルト値: NONE
説明: 1.2.10 以降でサポートされています。Stream Load に使用される圧縮アルゴリズム。現在、圧縮は JSON 形式に対してのみサポートされています。有効な値: lz4_frame。JSON 形式の圧縮は StarRocks v3.2.7 以降でのみサポートされています。
Flink と StarRocks 間のデータ型マッピング
| Flink データ型 | StarRocks データ型 |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| BINARY | INT |
| CHAR | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| DATE | DATE |
| TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
| ARRAY<T> | ARRAY<T> |
| MAP<KT,VT> | JSON STRING |
| ROW<arg T...> | JSON STRING |
使用メモ
Exactly Once
-
sink に exactly-once セマンティクスを保証させたい場合は、StarRocks を 2.5 以降に、Flink コネクタを 1.2.4 以降にアップグレードすることをお勧めします。
-
Flink コネクタ 1.2.4 以降では、exactly-once は StarRocks 2.4 以降で提供される Stream Load トランザクションインターフェース に基づいて再設計されています。以前の非トランザクション Stream Load インターフェースに基づく実装と比較して、新しい実装はメモリ使用量とチェックポイントのオーバーヘッドを削減し、ロードのリアルタイムパフォーマンスと安定性を向上させます。
-
StarRocks のバージョンが 2.4 より前、または Flink コネクタのバージョンが 1.2.4 より前の場合、sink は自動的に非トランザクション Stream Load インターフェースに基づく実装を選択します。
-
-
exactly-once を保証するための設定
-
sink.semanticの値はexactly-onceである必要があります。 -
Flink コネクタのバージョンが 1.2.8 以降の場合、
sink.label-prefixの値を指定することをお勧めします。ラベルプレフィックスは、Flink ジョブ、Routine Load、Broker Load など、StarRocks のすべてのロードタイプ間で一意でなければなりません。-
ラベルプレフィックスが指定されている場合、Flink コネクタはラベルプレフィックスを使用して、Flink の失敗シナリオ(例: チェックポイントが進行中のときに Flink ジョブが失敗する)で生成される可能性のある残存トランザクションをクリーンアップします。これらの残存トランザクションは、
SHOW PROC '/transactions/<db_id>/running';を使用して StarRocks で表示すると、通常PREPAREDステータスになります。Flink ジョブがチェックポイントから復元されると、Flink コネクタはラベ ルプレフィックスとチェックポイント内の情報に基づいてこれらの残存トランザクションを見つけ、アボートします。Flink ジョブが終了するときに Flink コネクタはそれらをアボートできません。これは、exactly-once を実装するための二段階コミットメカニズムのためです。Flink ジョブが終了すると、Flink コネクタはトランザクションが成功したチェックポイントに含まれるべきかどうかについて Flink チェックポイントコーディネーターから通知を受け取っていないため、これらのトランザクションをアボートするとデータが失われる可能性があります。Flink でエンドツーエンドの exactly-once を達成する方法については、この ブログ記事 を参照してください。 -
ラベルプレフィックスが指定されていない場合、残存トランザクションはタイムアウト後にのみ StarRocks によってクリーンアップされます。ただし、Flink ジョブがトランザクションのタイムアウト前に頻繁に失敗すると、実行中のトランザクションの数が StarRocks の
max_running_txn_num_per_dbの制限に達する可能性があります。タイムアウトの長さは StarRocks FE 設定prepared_transaction_default_timeout_secondによって制御され、デフォルト値は86400(1 日)です。ラベルプレフィックスが指定されていない場合、トランザクションがより速く期限切れになるように、これに小さい値を設定することができます。
-
-
-
Flink ジョブが停止または継続的なフェイ ルオーバーのために長時間のダウンタイムの後にチェックポイントまたはセーブポイントから最終的に復旧することが確実である場合、データ損失を避けるために次の StarRocks 設定を調整してください。
-
prepared_transaction_default_timeout_second: StarRocks FE 設定、デフォルト値は86400。この設定の値は Flink ジョブのダウンタイムよりも大きくする必要があります。そうしないと、Flink ジョブを再起動する前にタイムアウトのために成功したチェックポイントに含まれる残存トランザクションがアボートされ、データ損失が発生する可能性があります。この設定に大きな値を設定する場合、ラベルプレフィックスの値を指定することをお勧めします。これにより、残存トランザクションはタイムアウトによるものではなく、ラベルプレフィックスとチェックポイント内の情報に基づいてクリーンアップされます(これによりデータ損失が発生する可能性があります)。
-
label_keep_max_secondとlabel_keep_max_num: StarRocks FE 設定、デフォルト値はそれぞれ259200と1000です。詳細については、FE 設定 を参照してください。label_keep_max_secondの値は Flink ジョブのダウンタイムよりも大きくする必要があります。そうしないと、Flink コネクタは Flink のセーブポイントまたはチェックポイントに保存されたトランザクションラベルを使用して StarRocks 内のトランザクションの状態を確認し、これらのトランザクションがコミットされているかどうかを確認できず、最終的にデータ損失が発生する可能性があります。
これらの設定は可変であり、
ADMIN SET FRONTEND CONFIGを使用して変更できます。ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600");
ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200");
ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000"); -
フラッシュポリシー
Flink コネクタはデータをメモリにバッファし、Stream Load を介してバッチで StarRocks にフラッシュします。フラッシュがトリガーされる方法は、at-least-once と exactly-once で異なります。
at-least-once の場合、次の条件のいずれかが満たされるとフラッシュがトリガーされます。
- バッファされた行のバイト数が
sink.buffer-flush.max-bytesの制限に達する - バッファされた行の数が
sink.buffer-flush.max-rowsの制限に達する(sink バージョン V1 のみ有効) - 最後のフラッシュからの経過時間が
sink.buffer-flush.interval-msの制限に達する - チェックポイントがトリガーされる
exactly-once の場合、フラッシュはチェックポイントがトリガーされたときにのみ発生します。
ロードメトリクスの監視
Flink コネクタは、ロードを監視するための次のメトリクスを提供します。
| 名前 | タイプ | 説明 |
|---|---|---|
| totalFlushBytes | カウンター | 成功したフラッシュバイト数。 |
| totalFlushRows | カウンター | 成功したフラッシュ行数。 |
| totalFlushSucceededTimes | カウンター | データバッチが成功したフラッシュ回数。 |
| totalFlushFailedTimes | カウンター | フラッシュが失敗した回数。 |
| totalFilteredRows | カウンター | フィルタリングされた行数(totalFlushRows にも含まれます)。 |
例
以下の例は、Flink SQL または Flink DataStream を使用して Flink コネクタで StarRocks テーブルにデータをロードする方法を示しています。
準備
StarRocks テーブルの作成
データベ ース test を作成し、主キーテーブル score_board を作成します。
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
Flink 環境のセットアップ
-
Flink バイナリ Flink 1.15.2 をダウンロードし、
flink-1.15.2ディレクトリに解凍します。 -
Flink コネクタ 1.2.7 をダウンロードし、
flink-1.15.2/libディレクトリに配置します。 -
次のコマンドを実行して Flink クラスターを起動します。
cd flink-1.15.2
./bin/start-cluster.sh
ネットワーク設定
Flink が配置されているマシンが、StarRocks クラスターの FE ノードに http_port(デフォルト: 8030)および query_port(デフォルト: 9030)を介してアクセスでき、BE ノードに be_http_port(デフォルト: 8040)を介してアクセスできることを確認してください。
Flink SQL での実行
-
次のコマンドを実行して Flink SQL クライアントを起動します。
./bin/sql-client.sh -
Flink テーブル
score_boardを作成し、Flink SQL クライアントを介してテーブルに値を挿入します。 StarRocks の Primary Key テーブルにデータをロードしたい場合は、Flink DDL で主キーを定義する必要があります。他のタイプの StarRocks テーブルではオプションです。CREATE TABLE `score_board` (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'load-url' = '127.0.0.1:8030',
'database-name' = 'test',
'table-name' = 'score_board',
'username' = 'root',
'password' = ''
);
INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);