Flink コネクタを使用して StarRocks からデータを読み取る
StarRocks は、Flink を使用して StarRocks クラスターから大量のデータを読み取るのを支援するために、StarRocks Connector for Apache Flink®(Flink コネクタと略称)という独自開発のコネクタを提供しています。
Flink コネクタは、Flink SQL と Flink DataStream の2つの読み取り方法をサポートしています。Flink SQL が推奨されます。
注意
Flink コネクタは、Flink によって読み取られたデータを別の StarRocks クラスターまたはストレージシステムに書き込むこともサポートしています。Apache Flink® からデータを継続的にロードする を 参照してください。
背景情報
Flink が提供する JDBC コネクタとは異なり、StarRocks の Flink コネクタは、StarRocks クラスターの複数の BE からデータを並行して読み取ることをサポートしており、読み取りタスクを大幅に高速化します。以下の比較は、2つのコネクタの実装の違いを示しています。
-
StarRocks の Flink コネクタ
StarRocks の Flink コネクタを使用すると、Flink はまず責任を持つ FE からクエリプランを取得し、次に取得したクエリプランをパラメータとして関与するすべての BE に配布し、最終的に BE から返されたデータを取得できます。

-
Flink の JDBC コネクタ
Flink の JDBC コネクタを使用すると、Flink は個々の FE からのみデータを1つずつ読み取ることができます。データの読み取りは遅いです。

バージョン要件
| コネクタ | Flink | StarRocks | Java | Scala |
|---|---|---|---|---|
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later | 8 | 2.11,2.12 |
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later | 8 | 2.11,2.12 |
前提条件
Flink がデプロイされています。Flink がデプロイされていない場合は、以下の手順に従ってデプロイしてください。
-
Java 8 または Java 11 をオペレーティングシステムにインストールして、Flink が正常に動作するようにします。Java のインストールバージョンを確認するには、次のコマンドを使用できます。
java -version例えば、次の情報が返された場合、Java 8 がインストールされています。
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode) -
Flink パッケージ を選択してダウンロードし、解凍します。
注意
Flink v1.14 以降の使用を推奨します。サポートされる最小の Flink バージョンは v1.11 です。
# Flink パッケージをダウンロードします。
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# Flink パッケージを解凍します。
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# Flink ディレクトリに移動します。
cd flink-1.14.5 -
Flink クラスターを開始します。
# Flink クラスターを開始します。
./bin/start-cluster.sh
# 次の情報が表示された場合、Flink クラスターが正常に開始されています。
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
Flink のデプロイについては、Flink ドキュメント に記載された手順に従うこともできます。
始める前に
Flink コネクタのデプロイ
Flink コネクタをデプロイするには、次の手順に従います。
-
使用している Flink バージョンに一致する flink-connector-starrocks JAR パッケージを選択してダウンロードします。コードデバッグが必要な場合は、ビジネス要件に合わせて Flink コネクタパッケージをコンパイルします。
注意
Flink コネクタパッケージのバージョンが 1.2.x 以降であり、使用している Flink バージョンと同じ最初の2桁を持つ Flink バージョンに一致するものをダウンロードすることをお勧めします。例えば、Flink v1.14.x を使用している場合、
flink-connector-starrocks-1.2.4_flink-1.14_x.yy.jarをダウンロードできます。 -
ダウンロードまたはコンパイルした Flink コネクタパッケージを Flink の
libディレクトリに配置します。 -
Flink クラスターを再起動します。
ネットワーク構成
Flink が配置されているマシンが、StarRocks クラスターの FE ノードに http_port(デフォルト: 8030)および query_port(デフォルト: 9030)を介してアクセスでき、BE ノードに be_port(デフォルト: 9060)を介してアクセスできることを確認してください。
パラメータ
共通パラメータ
以下のパラメータは、Flink SQL と Flink DataStream の両方の読み取り方法に適用されます。
| パラメータ | 必須 | データ型 | 説明 |
|---|---|---|---|
| connector | Yes | STRING | データを読み取るために使用するコネクタのタイプ。値を starrocks に設定します。 |
| scan-url | Yes | STRING | Web サーバーから FE に接続するために使用されるアドレス。形式: <fe_host>:<fe_http_port>。デフォルトポートは 8030 です。複数のアドレスを指定することができ、カンマ (,) で区切る必要があります。例: 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030。 |
| jdbc-url | Yes | STRING | FE の MySQL クライアントに接続するために使用されるアドレス。形式: jdbc:mysql://<fe_host>:<fe_query_port>。デフォル トのポート番号は 9030 です。 |
| username | Yes | STRING | StarRocks クラスターアカウントのユーザー名。読み取りたい StarRocks テーブルに対する読み取り権限を持つアカウントである必要があります。ユーザー権限 を参照してください。 |
| password | Yes | STRING | StarRocks クラスターアカウントのパスワード。 |
| database-name | Yes | STRING | 読み取りたい StarRocks テーブルが属する StarRocks データベースの名前。 |
| table-name | Yes | STRING | 読み取りたい StarRocks テーブルの名前。 |
| scan.connect.timeout-ms | No | STRING | Flink コネクタから StarRocks クラスターへの接続がタイムアウトするまでの最大時間。単位: ミリ秒。デフォルト値: 1000。接続の確立にかかる時間がこの制限を超える場合、読み取りタスクは失敗します。 |
| scan.params.keep-alive-min | No | STRING | 読み取りタスクが存続する最大時間。存続時間はポーリングメカニズムを使用して定期的にチェックされます。単位: 分。デフォルト値: 10。このパラメータを 5 以上に設定することをお勧めします。 |
| scan.params.query-timeout-s | No | STRING | 読み取りタスクがタイムアウトするまでの最大時間。タスク実行中にタイムアウト期間がチェックされます。単位: 秒。デフォルト値: 600。時間が経過しても読み取り結果が返されない場合、読み取りタスクは停止します。 |
| scan.params.mem-limit-byte | No | STRING | 各 BE でク エリごとに許可される最大メモリ量。単位: バイト。デフォルト値: 1073741824、1 GB に相当します。 |
| scan.max-retries | No | STRING | 失敗時に読み取りタスクが再試行できる最大回数。デフォルト値: 1。再試行回数がこの制限を超える場合、読み取りタスクはエラーを返します。 |
Flink DataStream 用パラメータ
以下のパラメータは、Flink DataStream の読み取り方法にのみ適用されます。
| パラメータ | 必須 | データ型 | 説明 |
|---|---|---|---|
| scan.columns | No | STRING | 読み取りたい列。複数の列を指定することができ、カンマ (,) で区切る必要があります。 |
| scan.filter | No | STRING | データをフィルタリングするためのフィルタ条件。 |
Flink で c1、c2、c3 の3つの列からなるテーブルを作成したと仮定します。この Flink テーブルの c1 列の値が 100 に等しい行を読み取るには、2つのフィルタ条件 "scan.columns, "c1" と "scan.filter, "c1 = 100" を指定できます。