MySQL からのリアルタイム同期
StarRocks は、MySQL から StarRocks へのデータをリアルタイムで同期するための複数の方法をサポートしており、大量データの低遅延リアルタ イム分析を実現します。
このトピックでは、Apache Flink® を通じて MySQL から StarRocks へのデータをリアルタイム(数秒以内)で同期する方法について説明します。
StarRocks テーブルにデータを ロード するには、その StarRocks テーブルに対して INSERT 権限を持つユーザーである必要があります。INSERT 権限を持っていない場合は、 GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。構文は GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>} です。
仕組み
Flink CDC は MySQL から Flink への同期に使用されます。このトピックでは、Flink CDC のバージョンが 3.0 未満のものを使用しているため、SMT を使用してテ ーブルスキーマを同期します。ただし、Flink CDC 3.0 を使用する場合、StarRocks へのテーブルスキーマの同期に SMT を使用する必要はありません。Flink CDC 3.0 は、MySQL データベース全体のスキーマ、シャード化されたデータベースとテーブルのスキーマを同期することもでき、スキーマ変更の同期もサポートしています。詳細な使用方法については、Streaming ELT from MySQL to StarRocks を参照してください。
以下の図は、同期プロセス全体を示しています。

MySQL から Flink を通じて StarRocks へのリアルタイム同期は、データベース & テーブルスキーマの同期とデータの同期の 2 段階で実装されます。まず、SMT が MySQL のデータベース & テーブルスキーマを StarRocks のテーブル作成文に変換します。次に、Flink クラスターが Flink ジョブを実行して、MySQL のフルデータと増分データを StarRocks に同期します。
同期プロセスは、正確に一度だけのセマンティクスを保証します。
同期プロセス:
-
データベース & テーブルスキーマの同期。
SMT は、同期対象の MySQL データベース & テーブルのスキーマを読み取り、StarRocks にデスティネーションデータベース & テーブルを作成するための SQL ファイルを生成します。この操作は、SMT の設定ファイルに基づいて MySQL と StarRocks の情報を使用して行われます。
-
データの同期。
a. Flink SQL クライアントがデータロード文
INSERT INTO SELECTを実行して、1 つ以上の Flink ジョブを Flink クラスターに送信します。b. Flink クラスターが Flink ジョブを実行してデータを取得します。Flink CDC コネクタは、最初にソースデータベースからフルの履歴データを読み取り、その後シームレスに増分読み取りに切り替え、データを flink-connector-starrocks に送信します。
c. flink-connector-starrocks はデータをミニバッチで蓄積し、各バッチのデータを StarRocks に同期します。
備考MySQL のデータ操作言語 (DML) 操作のみが StarRocks に同期できます。データ定義言語 (DDL) 操作は同期できません。
シナリオ
MySQL からのリアルタイム同期は、データが常に変更される幅広いユースケースに対応しています。実際のユースケース「商品の売上ランキングのリアルタイム化」を例にとります。
Flink は、MySQL の元の注文テーブルに基づいて商品の売上ランキングをリアルタイムで計算し、そのランキングを StarRocks の主キーテーブルにリアルタイムで同期します。ユーザーは StarRocks に接続された可視化ツールを使用して、リアルタイムでランキングを確認し、オンデマンドで運用の洞察を得ることができます。
準備
同期ツールのダウンロードとインストール
MySQL からデータを同期するには、以下のツールをインストールする必要があります: SMT、Flink、Flink CDC コネクタ、および flink-connector-starrocks。
-
Flink をダウンロードしてインストールし、Flink クラスターを起動します。この手順は Flink 公式ドキュメント に従って実行することもできます。
a. Flink を実行する前に、オペレーティングシステムに Java 8 または Java 11 をインストールします。以下のコマンドを実行して、インストールされている Java のバージョンを確認できます。
# Java のバージョンを表示します。
java -version
# 次の出力が返された場合、Java 8 がインストールされています。
java version "1.8.0_301"
Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)b. Flink インストールパッケージ をダウンロードして解凍します。Flink 1.14 以降を使用することをお勧めします。最小許容バージョンは Flink 1.11 です。このトピックでは Flink 1.14.5 を使用します。
# Flink をダウンロードします。
wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# Flink を解凍します。
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# Flink ディレクトリに移動します。
cd flink-1.14.5c. Flink クラスターを起動します。
# Flink クラスターを起動します。
./bin/start-cluster.sh
# 次の出力が返された場合、Flink クラスターが起動しています。
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host. -
Flink CDC コネクタ をダウンロードします。このトピックでは MySQL をデータソースとして使用するため、
flink-sql-connector-mysql-cdc-x.x.x.jarをダウンロードします。コネクタのバージョンは Flink のバージョンと一致している必要があります。このトピックでは Flink 1.14.5 を使用し、flink-sql-connector-mysql-cdc-2.2.0.jarをダウンロードできます。wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.2.0.jar -
flink-connector-starrocks をダウンロードします。バージョンは Flink のバージョンと一致している必要があります。
flink-connector-starrocks パッケージ
x.x.x_flink-y.yy _ z.zz.jarには 3 つのバージョン番号が含まれています:x.x.xは flink-connector-starrocks のバージョン番号です。y.yyはサポートされている Flink のバージョンです。z.zzは Flink がサポートする Scala のバージョンです。Flink のバージョンが 1.14.x 以前の場合、Scala のバージョンを持つパッケージをダウンロードする必要があります。
このトピックでは Flink 1.14.5 と Scala 2.11 を使用します。したがって、次のパッケージをダウンロードできます:
1.2.3_flink-14_2.11.jar。 -
Flink CDC コネクタ (
flink-sql-connector-mysql-cdc-2.2.0.jar) と flink-connector-starrocks (1.2.3_flink-1.14_2.11.jar) の JAR パッケージを Flink のlibディレクトリに移動します。Note
システムに Flink クラスターが既に実行されている場合、Flink クラスターを停止し、JAR パッケージをロードして検証するために再起動する必要があります。
$ ./bin/stop-cluster.sh
$ ./bin/start-cluster.sh -
SMT パッケージ をダウンロードして解凍し、
flink-1.14.5ディレクトリに配置します。StarRocks は Linux x86 および macOS ARM64 用の SMT パッケージを提供しています。オペレーティングシステムと CPU に基づいて選択できます。# Linux x86 用
wget https://releases.starrocks.io/resources/smt.tar.gz
# macOS ARM64 用
wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz
MySQL バイナリログを有効にする
MySQL からデータをリアルタイムで同期するには、システムが MySQL バイナリログ (binlog) からデータを読み取り、データを解析してから StarRocks に同期する必要があります。MySQL バイナリログが有効になっていることを確認してください。
-
MySQL バイナリログを有効にするために、MySQL 設定ファイル
my.cnf(デフォルトパス:/etc/my.cnf) を編集します。# MySQL Binlog を有効にします。
log_bin = ON
# Binlog の保存パスを設定します。
log_bin =/var/lib/mysql/mysql-bin
# server_id を設定します。
# MySQL 5.7.3 以降で server_id が設定されていない場合、MySQL サービスを使用できません。
server_id = 1
# Binlog の形式を ROW に設定します。
binlog_format = ROW
# Binlog ファイルの基本名。各 Binlog ファイルを識別するための識別子が追加されます。
log_bin_basename =/var/lib/mysql/mysql-bin
# Binlog ファイルのインデッ クスファイル。すべての Binlog ファイルのディレクトリを管理します。
log_bin_index =/var/lib/mysql/mysql-bin.index -
修正された設定ファイルを有効にするために、次のいずれかのコマンドを実行して MySQL を再起動します。
# サービスを使用して MySQL を再起動します。
service mysqld restart
# mysqld スクリプトを使用して MySQL を再起動します。
/etc/init.d/mysqld restart -
MySQL に接続し、MySQL バイナリログが有効になっているかどうかを確認します。
-- MySQL に接続します。
mysql -h xxx.xx.xxx.xx -P 3306 -u root -pxxxxxx
-- MySQL バイナリログが有効になっているかどうかを確認します。
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.00 sec)