メインコンテンツまでスキップ
バージョン: 3.1

Flink コネクタを使用して StarRocks からデータを読み取る

StarRocks は、Flink を使用して StarRocks クラスターから大量のデータを読み取るのを支援するために、StarRocks Connector for Apache Flink®(Flink コネクタと略称)という独自開発のコネクタを提供しています。

Flink コネクタは、Flink SQL と Flink DataStream の2つの読み取り方法をサポートしています。Flink SQL が推奨されます。

注意

Flink コネクタは、Flink によって読み取られたデータを別の StarRocks クラスターまたはストレージシステムに書き込むこともサポートしています。Apache Flink® からデータを継続的にロードする を参照してください。

背景情報

Flink が提供する JDBC コネクタとは異なり、StarRocks の Flink コネクタは、StarRocks クラスターの複数の BE からデータを並行して読み取ることをサポートしており、読み取りタスクを大幅に高速化します。以下の比較は、2つのコネクタの実装の違いを示しています。

  • StarRocks の Flink コネクタ

    StarRocks の Flink コネクタを使用すると、Flink はまず責任を持つ FE からクエリプランを取得し、次に取得したクエリプランをパラメータとして関与するすべての BE に配布し、最終的に BE から返されたデータを取得できます。

    - StarRocks の Flink コネクタ

  • Flink の JDBC コネクタ

    Flink の JDBC コネクタを使用すると、Flink は個々の FE からのみデータを1つずつ読み取ることができます。データの読み取りは遅いです。

    Flink の JDBC コネクタ

バージョン要件

コネクタFlinkStarRocksJavaScala
1.2.101.15,1.16,1.17,1.18,1.192.1 and later82.11,2.12
1.2.91.15,1.16,1.17,1.182.1 and later82.11,2.12
1.2.81.13,1.14,1.15,1.16,1.172.1 and later82.11,2.12
1.2.71.11,1.12,1.13,1.14,1.152.1 and later82.11,2.12

前提条件

Flink がデプロイされています。Flink がデプロイされていない場合は、以下の手順に従ってデプロイしてください。

  1. Java 8 または Java 11 をオペレーティングシステムにインストールして、Flink が正常に動作するようにします。Java のインストールバージョンを確認するには、次のコマンドを使用できます。

    java -version

    例えば、次の情報が返された場合、Java 8 がインストールされています。

    openjdk version "1.8.0_322"
    OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
    OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode)
  2. Flink パッケージ を選択してダウンロードし、解凍します。

    注意

    Flink v1.14 以降の使用を推奨します。サポートされる最小の Flink バージョンは v1.11 です。

    # Flink パッケージをダウンロードします。
    wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
    # Flink パッケージを解凍します。
    tar -xzf flink-1.14.5-bin-scala_2.11.tgz
    # Flink ディレクトリに移動します。
    cd flink-1.14.5
  3. Flink クラスターを開始します。

    # Flink クラスターを開始します。
    ./bin/start-cluster.sh

    # 次の情報が表示された場合、Flink クラスターが正常に開始されています。
    Starting cluster.
    Starting standalonesession daemon on host.
    Starting taskexecutor daemon on host.

Flink のデプロイについては、Flink ドキュメント に記載された手順に従うこともできます。

始める前に

Flink コネクタをデプロイするには、次の手順に従います。

  1. 使用している Flink バージョンに一致する flink-connector-starrocks JAR パッケージを選択してダウンロードします。コードデバッグが必要な場合は、ビジネス要件に合わせて Flink コネクタパッケージをコンパイルします。

    注意

    Flink コネクタパッケージのバージョンが 1.2.x 以降であり、使用している Flink バージョンと同じ最初の2桁を持つ Flink バージョンに一致するものをダウンロードすることをお勧めします。例えば、Flink v1.14.x を使用している場合、flink-connector-starrocks-1.2.4_flink-1.14_x.yy.jar をダウンロードできます。

  2. ダウンロードまたはコンパイルした Flink コネクタパッケージを Flink の lib ディレクトリに配置します。

  3. Flink クラスターを再起動します。

ネットワーク構成

Flink が配置されているマシンが、StarRocks クラスターの FE ノードに http_port(デフォルト: 8030)および query_port(デフォルト: 9030)を介してアクセスでき、BE ノードに be_port(デフォルト: 9060)を介してアクセスできることを確認してください。

パラメータ

共通パラメータ

以下のパラメータは、Flink SQL と Flink DataStream の両方の読み取り方法に適用されます。

パラメータ必須データ型説明
connectorYesSTRINGデータを読み取るために使用するコネクタのタイプ。値を starrocks に設定します。
scan-urlYesSTRINGWeb サーバーから FE に接続するために使用されるアドレス。形式: <fe_host>:<fe_http_port>。デフォルトポートは 8030 です。複数のアドレスを指定することができ、カンマ (,) で区切る必要があります。例: 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030
jdbc-urlYesSTRINGFE の MySQL クライアントに接続するために使用されるアドレス。形式: jdbc:mysql://<fe_host>:<fe_query_port>。デフォルトのポート番号は 9030 です。
usernameYesSTRINGStarRocks クラスターアカウントのユーザー名。読み取りたい StarRocks テーブルに対する読み取り権限を持つアカウントである必要があります。ユーザー権限 を参照してください。
passwordYesSTRINGStarRocks クラスターアカウントのパスワード。
database-nameYesSTRING読み取りたい StarRocks テーブルが属する StarRocks データベースの名前。
table-nameYesSTRING読み取りたい StarRocks テーブルの名前。
scan.connect.timeout-msNoSTRINGFlink コネクタから StarRocks クラスターへの接続がタイムアウトするまでの最大時間。単位: ミリ秒。デフォルト値: 1000。接続の確立にかかる時間がこの制限を超える場合、読み取りタスクは失敗します。
scan.params.keep-alive-minNoSTRING読み取りタスクが存続する最大時間。存続時間はポーリングメカニズムを使用して定期的にチェックされます。単位: 分。デフォルト値: 10。このパラメータを 5 以上に設定することをお勧めします。
scan.params.query-timeout-sNoSTRING読み取りタスクがタイムアウトするまでの最大時間。タスク実行中にタイムアウト期間がチェックされます。単位: 秒。デフォルト値: 600。時間が経過しても読み取り結果が返されない場合、読み取りタスクは停止します。
scan.params.mem-limit-byteNoSTRING各 BE でクエリごとに許可される最大メモリ量。単位: バイト。デフォルト値: 1073741824、1 GB に相当します。
scan.max-retriesNoSTRING失敗時に読み取りタスクが再試行できる最大回数。デフォルト値: 1。再試行回数がこの制限を超える場合、読み取りタスクはエラーを返します。

以下のパラメータは、Flink DataStream の読み取り方法にのみ適用されます。

パラメータ必須データ型説明
scan.columnsNoSTRING読み取りたい列。複数の列を指定することができ、カンマ (,) で区切る必要があります。
scan.filterNoSTRINGデータをフィルタリングするためのフィルタ条件。

Flink で c1c2c3 の3つの列からなるテーブルを作成したと仮定します。この Flink テーブルの c1 列の値が 100 に等しい行を読み取るには、2つのフィルタ条件 "scan.columns, "c1""scan.filter, "c1 = 100" を指定できます。

以下のデータ型マッピングは、Flink が StarRocks からデータを読み取る場合にのみ有効です。Flink が StarRocks にデータを書き込む際に使用されるデータ型マッピングについては、Apache Flink® からデータを継続的にロードする を参照してください。

StarRocksFlink
NULLNULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
LARGEINTSTRING
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
DECIMALV2DECIMAL
DECIMAL32DECIMAL
DECIMAL64DECIMAL
DECIMAL128DECIMAL
CHARCHAR
VARCHARSTRING
JSONSTRING
注意:
バージョン 1.2.10 以降でサポートされています
ARRAYARRAY
注意:
バージョン 1.2.10 以降でサポートされており、StarRocks v3.1.12/v3.2.5 以降が必要です。
STRUCTROW
注意:
バージョン 1.2.10 以降でサポートされており、StarRocks v3.1.12/v3.2.5 以降が必要です。
MAPMAP
注意:
バージョン 1.2.10 以降でサポートされており、StarRocks v3.1.12/v3.2.5 以降が必要です。

以下の例では、StarRocks クラスターに test という名前のデータベースを作成し、ユーザー root の権限を持っていると仮定します。

注意

読み取りタスクが失敗した場合、再作成する必要があります。

データ例

  1. test データベースに移動し、score_board という名前のテーブルを作成します。

    MySQL [test]> CREATE TABLE `score_board`
    (
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`)
    PROPERTIES
    (
    "replication_num" = "3"
    );
  2. score_board テーブルにデータを挿入します。

    MySQL [test]> INSERT INTO score_board
    VALUES
    (1, 'Bob', 21),
    (2, 'Stan', 21),
    (3, 'Sam', 22),
    (4, 'Tony', 22),
    (5, 'Alice', 22),
    (6, 'Lucy', 23),
    (7, 'Polly', 23),
    (8, 'Tom', 23),
    (9, 'Rose', 24),
    (10, 'Jerry', 24),
    (11, 'Jason', 24),
    (12, 'Lily', 25),
    (13, 'Stephen', 25),
    (14, 'David', 25),
    (15, 'Eddie', 26),
    (16, 'Kate', 27),
    (17, 'Cathy', 27),
    (18, 'Judy', 27),
    (19, 'Julia', 28),
    (20, 'Robert', 28),
    (21, 'Jack', 29);
  3. score_board テーブルをクエリします。

   MySQL [test]> SELECT * FROM score_board;
+------+---------+-------+
| id | name | score |
+------+---------+-------+
| 1 | Bob | 21 |
| 2 | Stan | 21 |
| 3 | Sam | 22 |
| 4 | Tony | 22 |
| 5 | Alice | 22 |
| 6 | Lucy | 23 |
| 7 | Polly | 23 |
| 8 | Tom | 23 |
| 9 | Rose | 24 |
| 10 | Jerry | 24 |
| 11 | Jason | 24 |
| 12 | Lily | 25 |
| 13 | Stephen | 25 |
| 14 | David | 25 |
| 15 | Eddie | 26 |
| 16 | Kate | 27 |
| 17 | Cathy | 27 |
| 18 | Judy | 27 |
| 19 | Julia | 28 |
| 20 | Robert | 28 |
| 21 | Jack | 29 |
+------+---------+-------+
21 rows in set (0.00 sec)
  1. Flink クラスターで、ソース StarRocks テーブル(この例では score_board)のスキーマに基づいて flink_test という名前のテーブルを作成します。テーブル作成コマンドでは、Flink コネクタ、ソース StarRocks データベース、およびソース StarRocks テーブルに関する情報を含む読み取りタスクプロパティを設定する必要があります。

    CREATE TABLE flink_test
    (
    `id` INT,
    `name` STRING,
    `score` INT
    )
    WITH
    (
    'connector'='starrocks',
    'scan-url'='192.168.xxx.xxx:8030',
    'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
    'username'='xxxxxx',
    'password'='xxxxxx',
    'database-name'='test',
    'table-name'='score_board'
    );
  2. SELECT を使用して StarRocks からデータを読み取ります。

    SELECT id, name FROM flink_test WHERE score > 20;

Flink SQL を使用してデータを読み取る際には、次の点に注意してください。

  • StarRocks からデータを読み取るために SELECT ... FROM <table_name> WHERE ... のような SQL 文のみを使用できます。集計関数の中で、count のみがサポートされています。
  • プレディケートプッシュダウンがサポートされています。例えば、クエリにフィルタ条件 char_1 <> 'A' and int_1 = -126 が含まれている場合、フィルタ条件は Flink コネクタにプッシュダウンされ、クエリが実行される前に StarRocks によって実行可能な文に変換されます。追加の設定を行う必要はありません。
  • LIMIT 文はサポートされていません。
  • StarRocks はチェックポイントメカニズムをサポートしていません。その結果、読み取りタスクが失敗した場合、データの一貫性は保証されません。
  1. pom.xml ファイルに次の依存関係を追加します。

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <version>x.x.x_flink-1.15</version>
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
    <version>x.x.x_flink-1.13_2.11</version>
    <version>x.x.x_flink-1.13_2.12</version>
    <version>x.x.x_flink-1.12_2.11</version>
    <version>x.x.x_flink-1.12_2.12</version>
    <version>x.x.x_flink-1.11_2.11</version>
    <version>x.x.x_flink-1.11_2.12</version>
    </dependency>

    上記のコード例で x.x.x を使用している最新の Flink コネクタバージョンに置き換える必要があります。バージョン情報 を参照してください。

  2. Flink コネクタを呼び出して StarRocks からデータを読み取ります。

    import com.starrocks.connector.flink.StarRocksSource;
    import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.TableSchema;

    public class StarRocksSourceApp {
    public static void main(String[] args) throws Exception {
    StarRocksSourceOptions options = StarRocksSourceOptions.builder()
    .withProperty("scan-url", "192.168.xxx.xxx:8030")
    .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("table-name", "score_board")
    .withProperty("database-name", "test")
    .build();
    TableSchema tableSchema = TableSchema.builder()
    .field("id", DataTypes.INT())
    .field("name", DataTypes.STRING())
    .field("score", DataTypes.INT())
    .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
    env.execute("StarRocks flink source");
    }

    }

次のステップ

Flink が StarRocks からデータを正常に読み取った後、Flink WebUI を使用して読み取りタスクを監視できます。例えば、WebUI の Metrics ページで totalScannedRows メトリックを表示して、正常に読み取られた行数を取得できます。また、Flink SQL を使用して、読み取ったデータに対してジョインなどの計算を行うこともできます。