メインコンテンツまでスキップ
バージョン: 3.2

Apache Flink® からのデータの継続的なロード

StarRocks は、Apache Flink® 用に独自開発したコネクタである StarRocks Connector for Apache Flink®(以下、Flink コネクタ)を提供しています。これを使用して、Flink を介して StarRocks テーブルにデータをロードできます。基本的な原理は、データを蓄積し、STREAM LOAD を通じて一度に StarRocks にロードすることです。

Flink コネクタは、DataStream API、Table API & SQL、Python API をサポートしています。Apache Flink® が提供する flink-connector-jdbc よりも高い安定したパフォーマンスを持っています。

注意

Flink コネクタを使用して StarRocks テーブルにデータをロードするには、対象の StarRocks テーブルに対する SELECT および INSERT 権限が必要です。これらの権限がない場合は、GRANT に従って、StarRocks クラスターに接続するために使用するユーザーにこれらの権限を付与してください。

バージョン要件

コネクタFlinkStarRocksJavaScala
1.2.101.15,1.16,1.17,1.18,1.192.1 and later82.11,2.12
1.2.91.15,1.16,1.17,1.182.1 and later82.11,2.12
1.2.81.13,1.14,1.15,1.16,1.172.1 and later82.11,2.12
1.2.71.11,1.12,1.13,1.14,1.152.1 and later82.11,2.12

Flink コネクタの JAR ファイルは以下の方法で取得できます。

  • コンパイル済みの Flink コネクタ JAR ファイルを直接ダウンロードする。
  • Flink コネクタを Maven プロジェクトの依存関係として追加し、JAR ファイルをダウンロードする。
  • Flink コネクタのソースコードを自分でコンパイルして JAR ファイルを作成する。

Flink コネクタ JAR ファイルの命名形式は以下の通りです。

  • Flink 1.15 以降では、flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar です。例えば、Flink 1.15 をインストールし、Flink コネクタ 1.2.7 を使用したい場合、flink-connector-starrocks-1.2.7_flink-1.15.jar を使用できます。

  • Flink 1.15 より前では、flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar です。例えば、Flink 1.14 と Scala 2.12 を環境にインストールし、Flink コネクタ 1.2.7 を使用したい場合、flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar を使用できます。

注意

一般に、Flink コネクタの最新バージョンは、Flink の最新の 3 つのバージョンとのみ互換性を維持します。

コンパイル済みの Jar ファイルをダウンロード

Maven Central Repository から対応するバージョンの Flink コネクタ Jar ファイルを直接ダウンロードします。

Maven 依存関係

Maven プロジェクトの pom.xml ファイルに、以下の形式で Flink コネクタを依存関係として追加します。flink_versionscala_versionconnector_version をそれぞれのバージョンに置き換えてください。

  • Flink 1.15 以降

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <version>${connector_version}_flink-${flink_version}</version>
    </dependency>
  • Flink 1.15 より前のバージョン

    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <version>${connector_version}_flink-${flink_version}_${scala_version}</version>
    </dependency>

自分でコンパイル

  1. Flink コネクタのソースコード をダウンロードします。

  2. Flink コネクタのソースコードを JAR ファイルにコンパイルするために、以下のコマンドを実行します。flink_version は対応する Flink バージョンに置き換えてください。

    sh build.sh <flink_version>

    例えば、環境の Flink バージョンが 1.15 の場合、以下のコマンドを実行する必要があります。

    sh build.sh 1.15
  3. target/ ディレクトリに移動し、コンパイル時に生成された Flink コネクタ JAR ファイル(例: flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar)を見つけます。

注意

正式にリリースされていない Flink コネクタの名前には SNAPSHOT サフィックスが含まれています。

オプション

connector

必須: はい
デフォルト値: NONE
説明: 使用したいコネクタ。値は "starrocks" でなければなりません。

jdbc-url

必須: はい
デフォルト値: NONE
説明: FE の MySQL サーバーに接続するために使用されるアドレス。複数のアドレスを指定でき、カンマ (,) で区切る必要があります。形式: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>

load-url

必須: はい
デフォルト値: NONE
説明: FE の HTTP サーバーに接続するために使用されるアドレス。複数のアドレスを指定でき、セミコロン (;) で区切る必要があります。形式: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>

database-name

必須: はい
デフォルト値: NONE
説明: データをロードしたい StarRocks データベースの名前。

table-name

必須: はい
デフォルト値: NONE
説明: StarRocks にデータをロードするために使用したいテーブルの名前。

username

必須: はい
デフォルト値: NONE
説明: StarRocks にデータをロードするために使用したいアカウントのユーザー名。このアカウントには、対象の StarRocks テーブルに対する SELECT および INSERT 権限 が必要です。

password

必須: はい
デフォルト値: NONE
説明: 前述のアカウントのパスワード。

sink.version

必須: いいえ
デフォルト値: AUTO
説明: データをロードするために使用されるインターフェース。このパラメータは Flink コネクタバージョン 1.2.4 以降でサポートされています。

  • V1: Stream Load インターフェースを使用してデータをロードします。1.2.4 より前のコネクタはこのモードのみをサポートしています。
  • V2: Stream Load トランザクション インターフェースを使用してデータをロードします。StarRocks のバージョンが少なくとも 2.4 である必要があります。V2 を推奨します。これはメモリ使用量を最適化し、より安定した exactly-once 実装を提供します。
  • AUTO: StarRocks のバージョンがトランザクション Stream Load をサポートしている場合、自動的に V2 を選択し、そうでない場合は V1 を選択します。

sink.label-prefix

必須: いいえ
デフォルト値: NONE
説明: Stream Load で使用されるラベルプレフィックス。コネクタ 1.2.8 以降で exactly-once を使用する場合、設定を推奨します。exactly-once 使用メモ を参照してください。

sink.semantic

必須: いいえ
デフォルト値: at-least-once
説明: sink によって保証されるセマンティクス。有効な値: at-least-onceexactly-once

sink.buffer-flush.max-bytes

必須: いいえ
デフォルト値: 94371840(90M)
説明: 一度に StarRocks に送信される前にメモリに蓄積できるデータの最大サイズ。最大値は 64 MB から 10 GB の範囲です。このパラメータを大きな値に設定すると、ロードパフォーマンスが向上しますが、ロードの遅延が増加する可能性があります。このパラメータは sink.semanticat-least-once に設定されている場合にのみ有効です。sink.semanticexactly-once に設定されている場合、Flink チェックポイントがトリガーされたときにメモリ内のデータがフラッシュされます。この場合、このパラメータは効果を発揮しません。

sink.buffer-flush.max-rows

必須: いいえ
デフォルト値: 500000
説明: 一度に StarRocks に送信される前にメモリに蓄積できる行の最大数。このパラメータは sink.versionV1 であり、sink.semanticat-least-once の場合にのみ利用可能です。有効な値: 64000 から 5000000。

sink.buffer-flush.interval-ms

必須: いいえ
デフォルト値: 300000
説明: データがフラッシュされる間隔。このパラメータは sink.semanticat-least-once の場合にのみ利用可能です。有効な値: 1000 から 3600000。単位: ms。

sink.max-retries

必須: いいえ
デフォルト値: 3
説明: Stream Load ジョブを実行するためにシステムが再試行する回数。このパラメータは sink.versionV1 に設定した場合にのみ利用可能です。有効な値: 0 から 10。

sink.connect.timeout-ms

必須: いいえ
デフォルト値: 30000
説明: HTTP 接続を確立するためのタイムアウト。有効な値: 100 から 60000。単位: ms。Flink コネクタ v1.2.9 より前では、デフォルト値は 1000 です。

sink.socket.timeout-ms

必須: いいえ
デフォルト値: -1
説明: 1.2.10 以降でサポートされています。HTTP クライアントがデータを待機する時間。単位: ms。デフォルト値 -1 はタイムアウトがないことを意味します。

sink.wait-for-continue.timeout-ms

必須: いいえ
デフォルト値: 10000
説明: 1.2.7 以降でサポートされています。FE からの HTTP 100-continue 応答を待つタイムアウト。有効な値: 3000 から 60000。単位: ms

sink.ignore.update-before

必須: いいえ
デフォルト値: true
説明: バージョン 1.2.8 以降でサポートされています。Primary Key テーブルにデータをロードする際に Flink からの UPDATE_BEFORE レコードを無視するかどうか。このパラメータが false に設定されている場合、レコードは StarRocks テーブルへの削除操作として扱われます。

sink.parallelism

必須: いいえ
デフォルト値: NONE
説明: ロードの並行性。Flink SQL のみで利用可能です。このパラメータが指定されていない場合、Flink プランナーが並行性を決定します。マルチ並行性のシナリオでは、ユーザーはデータが正しい順序で書き込まれることを保証する必要があります。

sink.properties.*

必須: いいえ
デフォルト値: NONE
説明: Stream Load の動作を制御するために使用されるパラメータ。例えば、パラメータ sink.properties.format は Stream Load に使用される形式を指定します。サポートされているパラメータとその説明のリストについては、STREAM LOAD を参照してください。

sink.properties.format

必須: いいえ
デフォルト値: csv
説明: Stream Load に使用される形式。Flink コネクタは、各バッチのデータを StarRocks に送信する前にその形式に変換します。有効な値: csvjson

sink.properties.column_separator

必須: いいえ
デフォルト値: \t
説明: CSV 形式のデータのカラムセパレータ。

sink.properties.row_delimiter

必須: いいえ
デフォルト値: \n
説明: CSV 形式のデータの行区切り文字。

sink.properties.max_filter_ratio

必須: いいえ
デフォルト値: 0
説明: Stream Load の最大エラー許容率。データ品質が不十分なためにフィルタリングされるデータレコードの最大割合です。有効な値: 0 から 1。デフォルト値: 0Stream Load を参照してください。

sink.properties.partial_update

必須: いいえ
デフォルト値: FALSE
説明: 部分更新を使用するかどうか。有効な値: TRUEFALSE。デフォルト値: FALSE、この機能を無効にすることを示します。

sink.properties.partial_update_mode

必須: いいえ
デフォルト値: row
説明: 部分更新のモードを指定します。有効な値: rowcolumn

  • row(デフォルト)は行モードでの部分更新を意味し、多くのカラムと小さなバッチでのリアルタイム更新に適しています。
  • column はカラムモードでの部分更新を意味し、少ないカラムと多くの行でのバッチ更新に適しています。このようなシナリオでは、カラムモードを有効にすると更新速度が速くなります。例えば、100 カラムのテーブルで、すべての行に対して 10 カラム(全体の 10%)のみが更新される場合、カラムモードの更新速度は 10 倍速くなります。

sink.properties.strict_mode

必須: いいえ
デフォルト値: false
説明: Stream Load の厳密モードを有効にするかどうかを指定します。これは、カラム値が一致しないなどの不適格な行がある場合のロード動作に影響します。有効な値: truefalse。デフォルト値: falseStream Load を参照してください。

sink.properties.compression

必須: いいえ
デフォルト値: NONE
説明: 1.2.10 以降でサポートされています。Stream Load に使用される圧縮アルゴリズム。現在、圧縮は JSON 形式に対してのみサポートされています。有効な値: lz4_frame。JSON 形式の圧縮は StarRocks v3.2.7 以降でのみサポートされています。

Flink データ型StarRocks データ型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARSTRING
VARCHARSTRING
STRINGSTRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAY<T>ARRAY<T>
MAP<KT,VT>JSON STRING
ROW<arg T...>JSON STRING

使用メモ

Exactly Once

  • sink に exactly-once セマンティクスを保証させたい場合は、StarRocks を 2.5 以降に、Flink コネクタを 1.2.4 以降にアップグレードすることをお勧めします。

    • Flink コネクタ 1.2.4 以降では、exactly-once は StarRocks 2.4 以降で提供される Stream Load トランザクションインターフェース に基づいて再設計されています。以前の非トランザクション Stream Load インターフェースに基づく実装と比較して、新しい実装はメモリ使用量とチェックポイントのオーバーヘッドを削減し、ロードのリアルタイムパフォーマンスと安定性を向上させます。

    • StarRocks のバージョンが 2.4 より前、または Flink コネクタのバージョンが 1.2.4 より前の場合、sink は自動的に非トランザクション Stream Load インターフェースに基づく実装を選択します。

  • exactly-once を保証するための設定

    • sink.semantic の値は exactly-once である必要があります。

    • Flink コネクタのバージョンが 1.2.8 以降の場合、sink.label-prefix の値を指定することをお勧めします。ラベルプレフィックスは、Flink ジョブ、Routine Load、Broker Load など、StarRocks のすべてのロードタイプ間で一意でなければなりません。

      • ラベルプレフィックスが指定されている場合、Flink コネクタはラベルプレフィックスを使用して、Flink の失敗シナリオ(例: チェックポイントが進行中のときに Flink ジョブが失敗する)で生成される可能性のある残存トランザクションをクリーンアップします。これらの残存トランザクションは、SHOW PROC '/transactions/<db_id>/running'; を使用して StarRocks で表示すると、通常 PREPARED ステータスになります。Flink ジョブがチェックポイントから復元されると、Flink コネクタはラベルプレフィックスとチェックポイント内の情報に基づいてこれらの残存トランザクションを見つけ、アボートします。Flink ジョブが終了するときに Flink コネクタはそれらをアボートできません。これは、exactly-once を実装するための二段階コミットメカニズムのためです。Flink ジョブが終了すると、Flink コネクタはトランザクションが成功したチェックポイントに含まれるべきかどうかについて Flink チェックポイントコーディネーターから通知を受け取っていないため、これらのトランザクションをアボートするとデータが失われる可能性があります。Flink でエンドツーエンドの exactly-once を達成する方法については、この ブログ記事 を参照してください。

      • ラベルプレフィックスが指定されていない場合、残存トランザクションはタイムアウト後にのみ StarRocks によってクリーンアップされます。ただし、Flink ジョブがトランザクションのタイムアウト前に頻繁に失敗すると、実行中のトランザクションの数が StarRocks の max_running_txn_num_per_db の制限に達する可能性があります。タイムアウトの長さは StarRocks FE 設定 prepared_transaction_default_timeout_second によって制御され、デフォルト値は 86400(1 日)です。ラベルプレフィックスが指定されていない場合、トランザクションがより速く期限切れになるように、これに小さい値を設定することができます。

  • Flink ジョブが停止または継続的なフェイルオーバーのために長時間のダウンタイムの後にチェックポイントまたはセーブポイントから最終的に復旧することが確実である場合、データ損失を避けるために次の StarRocks 設定を調整してください。

    • prepared_transaction_default_timeout_second: StarRocks FE 設定、デフォルト値は 86400。この設定の値は Flink ジョブのダウンタイムよりも大きくする必要があります。そうしないと、Flink ジョブを再起動する前にタイムアウトのために成功したチェックポイントに含まれる残存トランザクションがアボートされ、データ損失が発生する可能性があります。

      この設定に大きな値を設定する場合、ラベルプレフィックスの値を指定することをお勧めします。これにより、残存トランザクションはタイムアウトによるものではなく、ラベルプレフィックスとチェックポイント内の情報に基づいてクリーンアップされます(これによりデータ損失が発生する可能性があります)。

    • label_keep_max_secondlabel_keep_max_num: StarRocks FE 設定、デフォルト値はそれぞれ 2592001000 です。詳細については、FE 設定 を参照してください。label_keep_max_second の値は Flink ジョブのダウンタイムよりも大きくする必要があります。そうしないと、Flink コネクタは Flink のセーブポイントまたはチェックポイントに保存されたトランザクションラベルを使用して StarRocks 内のトランザクションの状態を確認し、これらのトランザクションがコミットされているかどうかを確認できず、最終的にデータ損失が発生する可能性があります。

    これらの設定は可変であり、ADMIN SET FRONTEND CONFIG を使用して変更できます。

      ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600");
    ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200");
    ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000");

フラッシュポリシー

Flink コネクタはデータをメモリにバッファし、Stream Load を介してバッチで StarRocks にフラッシュします。フラッシュがトリガーされる方法は、at-least-once と exactly-once で異なります。

at-least-once の場合、次の条件のいずれかが満たされるとフラッシュがトリガーされます。

  • バッファされた行のバイト数が sink.buffer-flush.max-bytes の制限に達する
  • バッファされた行の数が sink.buffer-flush.max-rows の制限に達する(sink バージョン V1 のみ有効)
  • 最後のフラッシュからの経過時間が sink.buffer-flush.interval-ms の制限に達する
  • チェックポイントがトリガーされる

exactly-once の場合、フラッシュはチェックポイントがトリガーされたときにのみ発生します。

ロードメトリクスの監視

Flink コネクタは、ロードを監視するための次のメトリクスを提供します。

名前タイプ説明
totalFlushBytesカウンター成功したフラッシュバイト数。
totalFlushRowsカウンター成功したフラッシュ行数。
totalFlushSucceededTimesカウンターデータバッチが成功したフラッシュ回数。
totalFlushFailedTimesカウンターフラッシュが失敗した回数。
totalFilteredRowsカウンターフィルタリングされた行数(totalFlushRows にも含まれます)。

以下の例は、Flink SQL または Flink DataStream を使用して Flink コネクタで StarRocks テーブルにデータをロードする方法を示しています。

準備

StarRocks テーブルの作成

データベース test を作成し、主キーテーブル score_board を作成します。

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
  • Flink バイナリ Flink 1.15.2 をダウンロードし、flink-1.15.2 ディレクトリに解凍します。

  • Flink コネクタ 1.2.7 をダウンロードし、flink-1.15.2/lib ディレクトリに配置します。

  • 次のコマンドを実行して Flink クラスターを起動します。

    cd flink-1.15.2
    ./bin/start-cluster.sh

ネットワーク設定

Flink が配置されているマシンが、StarRocks クラスターの FE ノードに http_port(デフォルト: 8030)および query_port(デフォルト: 9030)を介してアクセスでき、BE ノードに be_http_port(デフォルト: 8040)を介してアクセスできることを確認してください。

  • 次のコマンドを実行して Flink SQL クライアントを起動します。

    ./bin/sql-client.sh
  • Flink テーブル score_board を作成し、Flink SQL クライアントを介してテーブルに値を挿入します。 StarRocks の Primary Key テーブルにデータをロードしたい場合は、Flink DDL で主キーを定義する必要があります。他のタイプの StarRocks テーブルではオプションです。

    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',

    'table-name' = 'score_board',
    'username' = 'root',
    'password' = ''
    );

    INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

入力レコードのタイプに応じて、Flink DataStream ジョブを実装する方法はいくつかあります。例えば、CSV Java String、JSON Java String、またはカスタム Java オブジェクトです。

  • 入力レコードが CSV 形式の String の場合。LoadCsvRecords を参照してください。

    /**
    * CSV 形式のレコードを生成します。各レコードには "\t" で区切られた 3 つの値があります。
    * これらの値は StarRocks テーブルのカラム `id`、`name`、`score` にロードされます。
    */
    String[] records = new String[]{
    "1\tstarrocks-csv\t100",
    "2\tflink-csv\t100"
    };
    DataStream<String> source = env.fromElements(records);

    /**
    * 必要なプロパティでコネクタを設定します。
    * 入力レコードが CSV 形式であり、カラムセパレータが "\t" であることをコネクタに伝えるために、
    * "sink.properties.format" と "sink.properties.column_separator" プロパティを追加する必要があります。
    * CSV 形式のレコードで他のカラムセパレータを使用することもできますが、
    * "sink.properties.column_separator" を対応するように変更することを忘れないでください。
    */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
    .withProperty("jdbc-url", jdbcUrl)
    .withProperty("load-url", loadUrl)
    .withProperty("database-name", "test")
    .withProperty("table-name", "score_board")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("sink.properties.format", "csv")
    .withProperty("sink.properties.column_separator", "\t")
    .build();
    // オプションを使用してシンクを作成します。
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • 入力レコードが JSON 形式の String の場合。LoadJsonRecords を参照してください。

/**
* JSON 形式のレコードを生成します。
* 各レコードには、StarRocks テーブルのカラム `id`、`name`、`score` に対応する 3 つのキーと値のペアがあります。
*/
String[] records = new String[]{
"{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
"{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
};
DataStream<String> source = env.fromElements(records);

/**
* 必要なプロパティでコネクタを設定します。
* 入力レコードが JSON 形式であり、最外部の配列構造を削除することをコネクタに伝えるために、
* "sink.properties.format" と "sink.properties.strip_outer_array" プロパティを追加する必要があります。
*/
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build();
// オプションを使用してシンクを作成します。
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink);
  • 入力レコードがカスタム Java オブジェクトの場合。LoadCustomJavaRecords を参照してください。

    • この例では、入力レコードは単純な POJO RowData です。

      public static class RowData {
      public int id;
      public String name;
      public int score;

      public RowData() {}

      public RowData(int id, String name, int score) {
      this.id = id;
      this.name = name;
      this.score = score;
      }
      }
    • メインプログラムは次のとおりです。

      // RowData をコンテナとして使用するレコードを生成します。
      RowData[] records = new RowData[]{
      new RowData(1, "starrocks-rowdata", 100),
      new RowData(2, "flink-rowdata", 100),
      };
      DataStream<RowData> source = env.fromElements(records);

      // 必要なプロパティでコネクタを設定します。
      StarRocksSinkOptions options = StarRocksSinkOptions.builder()
      .withProperty("jdbc-url", jdbcUrl)
      .withProperty("load-url", loadUrl)
      .withProperty("database-name", "test")
      .withProperty("table-name", "score_board")
      .withProperty("username", "root")
      .withProperty("password", "")
      .build();

      /**
      * Flink コネクタは、StarRocks テーブルにロードされる行を表すために Java オブジェクト配列 (Object[]) を使用し、
      * 各要素はカラムの値です。
      * StarRocks テーブルのスキーマに一致する Object[] のスキーマを定義する必要があります。
      */
      TableSchema schema = TableSchema.builder()
      .field("id", DataTypes.INT().notNull())
      .field("name", DataTypes.STRING())
      .field("score", DataTypes.INT())
      // StarRocks テーブルが Primary Key テーブルの場合、主キー `id` に対して notNull() を指定する必要があります。例: DataTypes.INT().notNull()
      .primaryKey("id")
      .build();
      // RowData をスキーマに従って Object[] に変換します。
      RowDataTransformer transformer = new RowDataTransformer();
      // スキーマ、オプション、およびトランスフォーマーを使用してシンクを作成します。
      SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
      source.addSink(starRockSink);
    • メインプログラム内の RowDataTransformer は次のように定義されています。

      private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {

      /**
      * 入力 RowData に従ってオブジェクト配列の各要素を設定します。
      * 配列のスキーマは StarRocks テーブルのスキーマに一致します。
      */
      @Override
      public void accept(Object[] internalRow, RowData rowData) {
      internalRow[0] = rowData.id;
      internalRow[1] = rowData.name;
      internalRow[2] = rowData.score;
      // StarRocks テーブルが Primary Key テーブルの場合、データロードが UPSERT または DELETE 操作であるかどうかを示すために最後の要素を設定する必要があります。
      internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
      }
      }

Flink CDC 3.0 フレームワークを使用して、CDC ソース(MySQL や Kafka など)から StarRocks へのストリーミング ELT パイプラインを簡単に構築できます。このパイプラインは、ソースから StarRocks へのデータベース全体の同期、シャーディングテーブルのマージ、スキーマ変更を同期できます。

v1.2.9 以降、StarRocks 用 Flink コネクタは StarRocks Pipeline Connector としてこのフレームワークに統合されています。StarRocks Pipeline Connector は以下をサポートしています。

  • データベースとテーブルの自動作成
  • スキーマ変更の同期
  • フルおよびインクリメンタルデータの同期

クイックスタートについては、Flink CDC 3.0 を使用した MySQL から StarRocks へのストリーミング ELT を参照してください。

fast_schema_evolution を有効にするために、StarRocks v3.2.1 以降のバージョンを使用することをお勧めします。これにより、カラムの追加や削除の速度が向上し、リソース使用量が削減されます。

ベストプラクティス

主キーテーブルへのデータロード

このセクションでは、StarRocks 主キーテーブルにデータをロードして部分更新や条件付き更新を実現する方法を示します。 これらの機能の紹介については、ロードによるデータの変更 を参照してください。 これらの例では Flink SQL を使用します。

準備

StarRocks でデータベース test を作成し、主キーテーブル score_board を作成します。

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

部分更新

この例では、カラム idname のみをロードする方法を示します。

  1. MySQL クライアントで StarRocks テーブル score_board に 2 行のデータを挿入します。

    mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Flink SQL クライアントで Flink テーブル score_board を作成します。

    • カラム idname のみを含む DDL を定義します。
    • Flink コネクタに部分更新を実行させるために、オプション sink.properties.partial_updatetrue に設定します。
    • Flink コネクタバージョン <= 1.2.7 の場合、オプション sink.properties.columnsid,name,__op に設定して、Flink コネクタにどのカラムを更新する必要があるかを伝える必要があります。フィールド __op を末尾に追加する必要があります。フィールド __op はデータロードが UPSERT または DELETE 操作であることを示し、その値はコネクタによって自動的に設定されます。
    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = '',
    'sink.properties.partial_update' = 'true',
    -- Flink コネクタバージョン <= 1.2.7 のみ
    'sink.properties.columns' = 'id,name,__op'
    );
  3. Flink テーブルに 2 行のデータを挿入します。データ行の主キーは StarRocks テーブルの行と同じですが、カラム name の値が変更されています。

    INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'flink-update');
  4. MySQL クライアントで StarRocks テーブルをクエリします。

    mysql> select * from score_board;
    +------+------------------+-------+
    | id | name | score |
    +------+------------------+-------+
    | 1 | starrocks-update | 100 |
    | 2 | flink-update | 100 |
    +------+------------------+-------+
    2 rows in set (0.02 sec)

    name の値のみが変更され、score の値は変更されていないことがわかります。

条件付き更新

この例では、カラム score の値に基づいて条件付き更新を行う方法を示します。id の更新は、新しい score の値が古い値以上の場合にのみ有効です。

  1. MySQL クライアントで StarRocks テーブルに 2 行のデータを挿入します。

    mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    mysql> select * from score_board;
    +------+-----------+-------+
    | id | name | score |
    +------+-----------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink | 100 |
    +------+-----------+-------+
    2 rows in set (0.02 sec)
  2. Flink テーブル score_board を次のように作成します。

    • すべてのカラムを含む DDL を定義します。
    • コネクタにカラム score を条件として使用するように伝えるために、オプション sink.properties.merge_conditionscore に設定します。
    • コネクタに Stream Load を使用するように伝えるために、オプション sink.versionV1 に設定します。
    CREATE TABLE `score_board` (
    `id` INT,
    `name` STRING,
    `score` INT,
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'score_board',
    'username' = 'root',
    'password' = '',
    'sink.properties.merge_condition' = 'score',
    'sink.version' = 'V1'
    );
  3. Flink テーブルに 2 行のデータを挿入します。データ行の主キーは StarRocks テーブルの行と同じです。最初のデータ行はカラム score の値が小さく、2 番目のデータ行はカラム score の値が大きいです。

    INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);
  4. MySQL クライアントで StarRocks テーブルをクエリします。

    mysql> select * from score_board;
    +------+--------------+-------+
    | id | name | score |
    +------+--------------+-------+
    | 1 | starrocks | 100 |
    | 2 | flink-update | 101 |
    +------+--------------+-------+
    2 rows in set (0.03 sec)

    2 番目のデータ行の値のみが変更され、最初のデータ行の値は変更されていないことがわかります。

BITMAP 型のカラムにデータをロード

BITMAP は、UV のカウントなどの正確なカウントディスティンクトを高速化するためによく使用されます。正確なカウントディスティンクトのための Bitmap の使用 を参照してください。 ここでは、UV のカウントを例にとり、BITMAP 型のカラムにデータをロードする方法を示します。

  1. MySQL クライアントで StarRocks 集計テーブルを作成します。

    データベース test に、カラム visit_usersBITMAP 型として定義され、集計関数 BITMAP_UNION が設定された集計テーブル page_uv を作成します。

    CREATE TABLE `test`.`page_uv` (
    `page_id` INT NOT NULL COMMENT 'page ID',
    `visit_date` datetime NOT NULL COMMENT 'access time',
    `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Flink SQL クライアントで Flink テーブルを作成します。

    Flink テーブルのカラム visit_user_idBIGINT 型であり、このカラムを StarRocks テーブルの BITMAP 型のカラム visit_users にロードしたいと考えています。そのため、Flink テーブルの DDL を定義する際に注意すべき点は次のとおりです。

    • Flink は BITMAP をサポートしていないため、StarRocks テーブルの BITMAP 型のカラム visit_users を表すために、BIGINT 型としてカラム visit_user_id を定義する必要があります。
    • Flink テーブルと StarRocks テーブルのカラムマッピングをコネクタに伝えるために、オプション sink.properties.columnspage_id,visit_date,user_id,visit_users=to_bitmap(visit_user_id) に設定する必要があります。また、to_bitmap 関数を使用して、BIGINT 型のデータを BITMAP 型に変換することをコネクタに伝える必要があります。
    CREATE TABLE `page_uv` (
    `page_id` INT,
    `visit_date` TIMESTAMP,
    `visit_user_id` BIGINT
    ) WITH (
    'connector' = 'starrocks',
    'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
    'load-url' = '127.0.0.1:8030',
    'database-name' = 'test',
    'table-name' = 'page_uv',
    'username' = 'root',
    'password' = '',
    'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)'
    );
  3. Flink SQL クライアントで Flink テーブルにデータをロードします。

INSERT INTO `page_uv` VALUES
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
(1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
(1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
(2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  1. MySQL クライアントで StarRocks テーブルからページ UV を計算します。
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`;
+---------+-----------------------------+
| page_id | count(DISTINCT visit_users) |
+---------+-----------------------------+
| 2 | 1 |
| 1 | 3 |
+---------+-----------------------------+
2 rows in set (0.05 sec)

HLL 型のカラムにデータをロード

HLL は、近似カウントディスティンクトに使用できます。近似カウントディスティンクトのための HLL の使用 を参照してください。

ここでは、UV のカウントを例にとり、HLL 型のカラムにデータをロードする方法を示します。

  1. StarRocks 集計テーブルを作成

    データベース test に、カラム visit_usersHLL 型として定義され、集計関数 HLL_UNION が設定された集計テーブル hll_uv を作成します。

CREATE TABLE `hll_uv` (
`page_id` INT NOT NULL COMMENT 'page ID',
`visit_date` datetime NOT NULL COMMENT 'access time',
`visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
) ENGINE=OLAP
AGGREGATE KEY(`page_id`, `visit_date`)
DISTRIBUTED BY HASH(`page_id`);
  1. Flink SQL クライアントで Flink テーブルを作成します。

    Flink テーブルのカラム visit_user_idBIGINT 型であり、このカラムを StarRocks テーブルの HLL 型のカラム visit_users にロードしたいと考えています。そのため、Flink テーブルの DDL を定義する際に注意すべき点は次のとおりです。

    • Flink は BITMAP をサポートしていないため、StarRocks テーブルの HLL 型のカラム visit_users を表すために、BIGINT 型としてカラム visit_user_id を定義する必要があります。
    • Flink テーブルと StarRocks テーブルのカラムマッピングをコネクタに伝えるために、オプション sink.properties.columnspage_id,visit_date,user_id,visit_users=hll_hash(visit_user_id) に設定する必要があります。また、hll_hash 関数を使用して、BIGINT 型のデータを HLL 型に変換することをコネクタに伝える必要があります。
CREATE TABLE `hll_uv` (
`page_id` INT,
`visit_date` TIMESTAMP,
`visit_user_id` BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'load-url' = '127.0.0.1:8030',
'database-name' = 'test',
'table-name' = 'hll_uv',
'username' = 'root',
'password' = '',
'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)'
);
  1. Flink SQL クライアントで Flink テーブルにデータをロードします。
INSERT INTO `hll_uv` VALUES
(3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
(4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
(3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  1. MySQL クライアントで StarRocks テーブルからページ UV を計算します。
mysql> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;
**+---------+-----------------------------+
| page_id | count(DISTINCT visit_users) |
+---------+-----------------------------+
| 3 | 2 |
| 4 | 1 |
+---------+-----------------------------+
2 rows in set (0.04 sec)