メインコンテンツまでスキップ
バージョン: 3.2

StarRocks Migration Tool (SMT)

StarRocks Migration Tool (SMT) は、StarRocks が提供するデータ移行ツールで、Flink を通じてソースデータベースから StarRocks にデータをロードします。SMT の主な機能は以下の通りです。

  • ソースデータベースとターゲットの StarRocks クラスターの情報に基づいて、StarRocks にテーブルを作成するためのステートメントを生成します。
  • Flink の SQL クライアントで実行可能な SQL ステートメントを生成して、データ同期のための Flink ジョブを提出します。これにより、パイプラインでのフルまたはインクリメンタルデータ同期が簡素化されます。現在、SMT は以下のソースデータベースをサポートしています。
ソースデータベースStarRocks にテーブルを作成するためのステートメントを生成フルデータ同期インクリメンタルデータ同期
MySQLサポート済みサポート済みサポート済み
PostgreSQLサポート済みサポート済みサポート済み
Oracleサポート済みサポート済みサポート済み
Hiveサポート済みサポート済みサポートされていない
ClickHouseサポート済みサポート済みサポートされていない
SQL Serverサポート済みサポート済みサポート済み
TiDBサポート済みサポート済みサポート済み

ダウンロードリンク: https://cdn-thirdparty.starrocks.com/smt.tar.gz?r=2

SMT の使用手順

一般的な手順は以下の通りです。

  1. conf/config_prod.conf ファイルを設定します。

  2. starrocks-migration-tool を実行します。

  3. 実行後、SQL スクリプトがデフォルトで result ディレクトリに生成されます。

    その後、result ディレクトリ内の SQL スクリプトを使用してメタデータまたはデータ同期を行うことができます。

SMT の設定

  • [db]: データソースに接続するための情報。type パラメータで指定されたデータベースタイプに対応するデータソースに接続するための情報を設定します。

  • [other]: 追加設定。be_num パラメータに実際の BE ノード数を指定することをお勧めします。

  • flink.starrocks.sink.*: flink-connector-starrocks の設定。詳細な設定と説明については、configuration description を参照してください。

  • [table-rule.1]: データソース内のテーブルをマッチングするためのルール。このルールに設定された正規表現に基づいて、データソース内のデータベース名とテーブル名をマッチングし、CREATE TABLE ステートメントが生成されます。複数のルールを設定でき、各ルールは対応する結果ファイルを生成します。例えば:

    • [table-rule.1] -> result/starrocks-create.1.sql
    • [table-rule.2] -> result/starrocks-create.2.sql

    各ルールには、データベース、テーブル、および flink-connector-starrocks の設定が含まれている必要があります。

    [table-rule.1]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^database1.*$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^.*$
    schema = ^.*$
    ############################################
    ### flink sink 設定
    ### `connector`、`table-name`、`database-name` は自動生成されるため設定しないでください
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url=192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true

    [table-rule.2]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^database2.*$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^.*$
    schema = ^.*$
    ############################################
    ### flink sink 設定
    ### `connector`、`table-name`、`database-name` は自動生成されるため設定しないでください
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url=192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  • データベース内でシャードに分割された大きなテーブルには、別のルールを設定できます。例えば、edu_db_1edu_db_2 の 2 つのデータベースにそれぞれ course_1course_2 というテーブルが含まれており、これらのテーブルが同じ構造を持っている場合、以下のルールを使用して、これらの 2 つのテーブルから 1 つの StarRocks テーブルにデータをロードして分析することができます。

    [table-rule.3]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^edu_db_[0-9]*$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^course_[0-9]*$
    schema = ^.*$

    このルールは自動的に多対一のロード関係を形成します。StarRocks に生成されるテーブルのデフォルト名は course__auto_shard であり、関連する SOL スクリプトでテーブル名を変更することもできます。例えば、result/starrocks-create.3.sql です。

MySQL から StarRocks への同期

概要

Flink CDC コネクタと SMT は、MySQL からサブセカンドでデータを同期できます。

img

画像に示されているように、SMT は MySQL と StarRocks のクラスター情報とテーブル構造に基づいて、Flink のソースおよびシンクテーブルの CREATE TABLE ステートメントを自動生成します。Flink CDC コネクタは MySQL Binlog を読み取り、Flink-connector-starrocks はデータを StarRocks に書き込みます。

手順

  1. Flink をダウンロードします。Flink のバージョンは 1.11 以降がサポートされています。

  2. Flink CDC コネクタ をダウンロードします。Flink のバージョンに対応する flink-sql-connector-mysql-cdc-xxx.jar をダウンロードしてください。

  3. Flink-connector-starrocks をダウンロードします。

  4. flink-sql-connector-mysql-cdc-xxx.jarflink-connector-starrocks-xxx.jarflink-xxx/lib/ にコピーします。

  5. smt.tar.gz をダウンロードします。

  6. SMT の設定ファイルを抽出して修正します。

    [db]
    host = 192.168.1.1
    port = 3306
    user = root
    password =
    type = mysql

    [other]
    # StarRocks のバックエンド数
    be_num = 3
    # `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
    use_decimal_v3 = false
    # 変換された DDL SQL を保存するディレクトリ
    output_dir = ./result

    [table-rule.1]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^db$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^table$
    schema = ^.*$

    ############################################
    ### flink sink 設定
    ### `connector`、`table-name`、`database-name` は自動生成されるため設定しないでください
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url=192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. starrocks-migrate-tool を実行します。すべての SQL スクリプトが result ディレクトリに生成されます。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql
  8. プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. プレフィックスが flink-create の SQL スクリプトを使用して、Flink のソースおよびシンクテーブルを生成し、Flink ジョブを開始してデータを同期します。

    bin/sql-client.sh embedded < flink-create.all.sql

    上記のコマンドが正常に実行されると、データを同期するための Flink ジョブが継続して実行されます。

  10. Flink ジョブのステータスを確認します。

    bin/flink list 

    ジョブの実行中にエラーが発生した場合、Flink ログで詳細なエラー情報を確認できます。また、conf/flink-conf.yaml ファイルで Flink の設定を変更することもできます。例えば、メモリやスロットなどです。

注意事項

  • MySQL binlog を有効にする方法

    1. /etc/my.cnf を修正します。

      # binlog を有効にする
      log-bin=/var/lib/mysql/mysql-bin

      #log_bin=ON
      ## binlog ファイルのベース名
      #log_bin_basename=/var/lib/mysql/mysql-bin
      ## すべての binlog ファイルを管理するインデックスファイル
      #log_bin_index=/var/lib/mysql/mysql-bin.index
      # サーバー ID を設定
      server-id=1
      binlog_format = row
    2. mysqld を再起動します。MySQL binlog が有効になっているかどうかを確認するには、SHOW VARIABLES LIKE 'log_bin'; を実行します。

PostgreSQL から StarRocks への同期

概要

Flink CDC コネクタと SMT は、PostgreSQL からサブセカンドでデータを同期できます。

SMT は PostgreSQL と StarRocks のクラスター情報とテーブル構造に基づいて、Flink のソースおよびシンクテーブルの CREATE TABLE ステートメントを自動生成します。

Flink CDC コネクタは PostgreSQL の WAL を読み取り、Flink-connector-starrocks はデータを StarRocks に書き込みます。

手順

  1. Flink をダウンロードします。Flink のバージョンは 1.11 以降がサポートされています。

  2. Flink CDC コネクタ をダウンロードします。Flink のバージョンに対応する flink-sql-connector-postgres-cdc-xxx.jar をダウンロードしてください。

  3. Flink StarRocks コネクタ をダウンロードします。

  4. flink-sql-connector-postgres-cdc-xxx.jarflink-connector-starrocks-xxx.jarflink-xxx/lib/ にコピーします。

  5. smt.tar.gz をダウンロードします。

  6. SMT の設定ファイルを抽出して修正します。

    [db]
    host = 192.168.1.1
    port = 5432
    user = xxx
    password = xxx
    type = pgsql

    [other]
    # StarRocks のバックエンド数
    be_num = 3
    # `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
    use_decimal_v3 = false
    # 変換された DDL SQL を保存するディレクトリ
    output_dir = ./result

    [table-rule.1]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^db$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^table$
    # プロパティを設定するためのスキーマをマッチングするパターン
    schema = ^.*$

    ############################################
    ### flink sink 設定
    ### `connector`、`table-name`、`database-name` は自動生成されるため設定しないでください
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url=192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. starrocks-migrate-tool を実行します。すべての SQL スクリプトが result ディレクトリに生成されます。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql
  8. プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. プレフィックスが flink-create の SQL スクリプトを使用して、Flink のソースおよびシンクテーブルを生成し、Flink ジョブを開始してデータを同期します。

    bin/sql-client.sh embedded < flink-create.all.sql

    上記のコマンドが正常に実行されると、データを同期するための Flink ジョブが継続して実行されます。

  10. Flink ジョブのステータスを確認します。

    bin/flink list 

    ジョブの実行中にエラーが発生した場合、Flink ログで詳細なエラー情報を確認できます。また、conf/flink-conf.yaml ファイルで Flink の設定を変更することもできます。例えば、メモリやスロットなどです。

注意事項

  • PostgreSQL v9.* の場合、以下のような特別な flink-cdc 設定が必要です(PostgreSQL v10.* 以降の使用をお勧めします。それ以外の場合は、WAL デコードプラグインをインストールする必要があります)。

    ############################################
    ############################################
    ### flink-cdc plugin configuration for `postgresql`
    ############################################
    ### for `9.*` decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
    ### refer to https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
    ### and https://debezium.io/documentation/reference/postgres-plugins.html
    ### flink.cdc.decoding.plugin.name = decoderbufs
  • PostgreSQL WAL を有効にする方法

    # 接続権限を開く
    echo "host all all 0.0.0.0/32 trust" >> pg_hba.conf
    echo "host replication all 0.0.0.0/32 trust" >> pg_hba.conf
    # WAL 論理レプリケーションを有効にする
    echo "wal_level = logical" >> postgresql.conf
    echo "max_wal_senders = 2" >> postgresql.conf
    echo "max_replication_slots = 8" >> postgresql.conf

    同期が必要なテーブルには、レプリカアイデンティティ FULL を指定します。

    ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL

    これらの変更を行った後、PostgreSQL を再起動します。

Oracle から StarRocks への同期

概要

Flink CDC コネクタと SMT は、Oracle からサブセカンドでデータを同期できます。

SMT は Oracle と StarRocks のクラスター情報とテーブル構造に基づいて、Flink のソースおよびシンクテーブルの CREATE TABLE ステートメントを自動生成します。

Flink CDC コネクタは Oracle の logminer を読み取り、Flink-connector-starrocks はデータを StarRocks に書き込みます。

手順

  1. Flink をダウンロードします。Flink のバージョンは 1.11 以降がサポートされています。

  2. Flink CDC コネクタ をダウンロードします。Flink のバージョンに対応する flink-sql-connector-oracle-cdc-xxx.jar をダウンロードしてください。

  3. Flink StarRocks コネクタ をダウンロードします。

  4. flink-sql-connector-oracle-cdc-xxx.jarflink-connector-starrocks-xxx.jarflink-xxx/lib/ にコピーします。

  5. smt.tar.gz をダウンロードします。

  6. SMT の設定ファイルを抽出して修正します。

    [db]
    host = 192.168.1.1
    port = 1521
    user = xxx
    password = xxx
    type = oracle

    [other]
    # StarRocks のバックエンド数
    be_num = 3
    # `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
    use_decimal_v3 = false
    # 変換された DDL SQL を保存するディレクトリ
    output_dir = ./result

    [table-rule.1]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^db$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^table$
    # プロパティを設定するためのスキーマをマッチングするパターン
    schema = ^.*$

    ############################################
    ### flink sink 設定
    ### `connector`、`table-name`、`database-name` は自動生成されるため設定しないでください
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url=192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. starrocks-migrate-tool を実行します。すべての SQL スクリプトが result ディレクトリに生成されます。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql
  8. プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. プレフィックスが flink-create の SQL スクリプトを使用して、Flink のソースおよびシンクテーブルを生成し、Flink ジョブを開始してデータを同期します。

    bin/sql-client.sh embedded < flink-create.all.sql

    上記のコマンドが正常に実行されると、データを同期するための Flink ジョブが継続して実行されます。

  10. Flink ジョブのステータスを確認します。

    bin/flink list 

    ジョブの実行中にエラーが発生した場合、Flink ログで詳細なエラー情報を確認できます。また、conf/flink-conf.yaml ファイルで Flink の設定を変更することもできます。例えば、メモリやスロットなどです。

注意事項

  • logminer を使用して Oracle を同期する方法:

    # ロギングを有効にする
    alter system set db_recovery_file_dest = '/home/oracle/data' scope=spfile;
    alter system set db_recovery_file_dest_size = 10G;
    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;

    ALTER TABLE schema_name.table_name ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

    # ユーザー作成と権限付与
    GRANT CREATE SESSION TO flinkuser;
    GRANT SET CONTAINER TO flinkuser;
    GRANT SELECT ON V_$DATABASE TO flinkuser;
    GRANT FLASHBACK ANY TABLE TO flinkuser;
    GRANT SELECT ANY TABLE TO flinkuser;
    GRANT SELECT_CATALOG_ROLE TO flinkuser;
    GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
    GRANT SELECT ANY TRANSACTION TO flinkuser;
    GRANT LOGMINING TO flinkuser;
    GRANT CREATE TABLE TO flinkuser;
    GRANT LOCK ANY TABLE TO flinkuser;
    GRANT ALTER ANY TABLE TO flinkuser;
    GRANT CREATE SEQUENCE TO flinkuser;
    GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
    GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
    GRANT SELECT ON V_$LOG TO flinkuser;
    GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
    GRANT SELECT ON V_$LOGFILE TO flinkuser;
    GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
    GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
  • [table-rule.1] のデータベース設定は正規表現をサポートしていないため、完全なデータベース名を指定する必要があります。

  • Oracle12c は CDB モードをサポートしているため、SMT は内部的に CDB が有効かどうかを自動的に判断し、flink-cdc 設定を対応して変更します。ただし、ユーザーは [db].user の設定に c## プレフィックスを追加する必要があるかどうかに注意し、権限不足の問題を回避してください。

Hive から StarRocks への同期

概要

このガイドでは、SMT を使用して Hive データを StarRocks に同期する方法を説明します。同期中、StarRocks に Duplicate テーブルが作成され、Flink ジョブが継続してデータを同期します。

手順

準備

[db]
# hiveserver2 サービスの IP
host = 127.0.0.1
# hiveserver2 サービスのポート
port = 10000
user = hive/emr-header-1.cluster-49148
password =
type = hive
# `type = hive` の場合のみ有効です。
# 利用可能な値: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
authentication = kerberos

サポートされている認証方法は以下の通りです。

  • nosasl, zk: userpassword を指定する必要はありません。
  • none, none_http, ldap: userpassword を指定します。
  • kerberos, kerberos_http: 以下の手順を実行します。
    • Hive クラスターで kadmin.local を実行し、list_principals を確認して対応するプリンシパル名を見つけます。例えば、プリンシパル名が hive/emr-header-1.cluster-49148@EMR.49148.COM の場合、ユーザーは hive/emr-header-1.cluster-49148 に設定し、パスワードは空のままにします。
    • SMT を実行するマシンで kinit -kt /path/to/keytab principal を実行し、klist を実行して正しいトークンが生成されているか確認します。

データ同期

  1. starrocks-migrate-tool を実行します。

  2. プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  3. flink/conf/sql-client-defaults.yaml ファイルを作成して編集します。

    execution:
    planner: blink
    type: batch
    current-catalog: hive-starrocks
    catalogs:
    - name: hive-starrocks
    type: hive
    hive-conf-dir: /path/to/apache-hive-xxxx-bin/conf
  4. Flink の対応するバージョンの Hive ページから 依存パッケージ (flink-sql-connector-hive-xxxx) をダウンロードし、flink/lib ディレクトリに配置します。

  5. Flink クラスターを起動し、flink/bin/sql-client.sh embedded < result/flink-create.all.sql を実行してデータ同期を開始します。

SQL Server から StarRocks への同期

概要

Flink CDC コネクタと SMT は、SQL Server からサブセカンドでデータを同期できます。

SMT は SQL Server と StarRocks のクラスター情報とテーブル構造に基づいて、Flink のソースおよびシンクテーブルの CREATE TABLE ステートメントを自動生成します。

Flink CDC コネクタは、SQL Server データベースサーバーで発生する行レベルの変更をキャプチャして記録します。原理は、SQL Server 自体が提供する CDC 機能を使用することです。SQL Server 自体の CDC 機能は、データベース内の指定された変更を指定された変更テーブルにアーカイブできます。SQL Server CDC コネクタは、まず JDBC を使用してテーブルから履歴データを読み取り、次に変更テーブルからインクリメンタルな変更を取得し、フルインクリメンタル同期を実現します。その後、Flink-connector-starrocks はデータを StarRocks に書き込みます。

手順

  1. Flink をダウンロードします。Flink のバージョンは 1.11 以降がサポートされています。

  2. Flink CDC コネクタ をダウンロードします。Flink のバージョンに対応する flink-sql-connector-sqlserver-cdc-xxx.jar をダウンロードしてください。

  3. Flink StarRocks コネクタ をダウンロードします。

  4. flink-sql-connector-sqlserver-cdc-xxx.jarflink-connector-starrocks-xxx.jarflink-xxx/lib/ にコピーします。

  5. smt.tar.gz をダウンロードします。

  6. SMT の設定ファイルを抽出して修正します。

    [db]
    host = 127.0.0.1
    port = 1433
    user = xxx
    password = xxx

    # 現在利用可能なタイプ: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`
    type = sqlserver

    [other]
    # StarRocks のバックエンド数
    be_num = 3
    # `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
    use_decimal_v3 = false
    # 変換された DDL SQL を保存するディレクトリ
    output_dir = ./result

    [table-rule.1]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^db$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^table$
    schema = ^.*$

    ############################################
    ### flink sink 設定
    ### `connector`、`table-name`、`database-name` は自動生成されるため設定しないでください
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url=192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true
  7. starrocks-migrate-tool を実行します。すべての SQL スクリプトが result ディレクトリに生成されます。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql
  8. プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. プレフィックスが flink-create の SQL スクリプトを使用して、Flink のソースおよびシンクテーブルを生成し、Flink ジョブを開始してデータを同期します。

    bin/sql-client.sh embedded < flink-create.all.sql     

    上記のコマンドが正常に実行されると、データを同期するための Flink ジョブが継続して実行されます。

  10. Flink ジョブのステータスを確認します。

    bin/flink list 

    ジョブの実行中にエラーが発生した場合、Flink ログで詳細なエラー情報を確認できます。また、conf/flink-conf.yaml ファイルで Flink の設定を変更することもできます。例えば、メモリやスロットなどです。

注意事項

  1. Server Agent Service が有効になっていることを確認します。

    Server Agent Service が正常に動作しているか確認します。

    EXEC master.dbo.xp_servicecontrol N'QUERYSTATE', N'SQLSERVERAGENT'
    GO

    Server Agent Service を有効にします。

    /opt/mssql/bin/mssql-conf set sqlagent.enabled true
  2. 対応するデータベースの CDC が有効になっていることを確認します。

    対応するデータベースの CDC が有効になっているか確認します。

    select is_cdc_enabled, name from sys.databases where name = 'XXX_databases'
    GO

    CDC を有効にします。

    注記

    このコマンドを実行する際は、ユーザー serverRolesysadmin であることを確認してください。

    USE XXX_databases
    GO
    EXEC sys.sp_cdc_enable_db
    GO
  3. 対応するテーブルの CDC が有効になっていることを確認します。

    EXEC sys.sp_cdc_enable_table 
    @source_schema = 'XXX_schema',
    @source_name = 'XXX_table',
    @role_name = NULL,
    @supports_net_changes = 0;
    GO

TiDB から StarRocks への同期

概要

Flink CDC コネクタと SMT は、TiDB からサブセカンドでデータを同期できます。

SMT は TiDB と StarRocks のクラスター情報とテーブル構造に基づいて、Flink のソースおよびシンクテーブルの DDL ステートメントを自動生成します。

Flink CDC コネクタは、TiKV ストレージから直接フルおよびインクリメンタルデータを読み取ることでデータをキャプチャします。フルデータはキーに基づいてパーティション化された範囲から取得され、インクリメンタルデータは TiDB が提供する CDC クライアントを使用して取得されます。その後、Flink-connector-starrocks を通じてデータが StarRocks に書き込まれます。

手順

  1. Flink をダウンロードします。Flink のバージョンは 1.11 以降がサポートされています。

  2. Flink CDC コネクタ をダウンロードします。Flink のバージョンに対応する flink-sql-connector-tidb-cdc-xxx.jar をダウンロードしてください。

  3. Flink StarRocks コネクタ をダウンロードします。

  4. flink-sql-connector-tidb-cdc-xxx.jarflink-connector-starrocks-xxx.jarflink-xxx/lib/ にコピーします。

  5. smt.tar.gz をダウンロードします。

  6. SMT の設定ファイルを抽出して修正します。

    [db]
    host = 127.0.0.1
    port = 4000
    user = root
    password =
    # 現在利用可能なタイプ: `mysql`, `pgsql`, `oracle`, `hive`, `clickhouse`, `sqlserver`, `tidb`
    type = tidb
    # # `type == hive` の場合のみ有効です。
    # # 利用可能な値: kerberos, none, nosasl, kerberos_http, none_http, zk, ldap
    # authentication = kerberos

    [other]
    # StarRocks のバックエンド数
    be_num = 3
    # `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
    use_decimal_v3 = false
    # 変換された DDL SQL を保存するディレクトリ
    output_dir = ./result

    [table-rule.1]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^db$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^table$
    schema = ^.*$

    ############################################
    ### flink sink 設定
    ### `connector`、`table-name`、`database-name` は自動生成されるため設定しないでください
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
    flink.starrocks.load-url=192.168.1.1:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.max-retries=10
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    flink.starrocks.sink.properties.format=json
    flink.starrocks.sink.properties.strip_outer_array=true

    ############################################
    ### flink-cdc 設定 for `tidb`
    ############################################
    # # TiDB v4.0.0 より前のバージョンでのみ有効です。
    # # TiKV クラスターの PD アドレス。
    # flink.cdc.pd-addresses = 127.0.0.1:2379
  7. starrocks-migrate-tool を実行します。すべての SQL スクリプトが result ディレクトリに生成されます。

    $./starrocks-migrate-tool
    $ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql
  8. プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。

    mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql
  9. プレフィックスが flink-create の SQL スクリプトを使用して、Flink のソースおよびシンクテーブルを生成し、Flink ジョブを開始してデータを同期します。

    bin/sql-client.sh embedded < flink-create.all.sql     

    上記のコマンドが正常に実行されると、データを同期するための Flink ジョブが継続して実行されます。

  10. Flink ジョブのステータスを確認します。

    bin/flink list 

    ジョブの実行中にエラーが発生した場合、Flink ログで詳細なエラー情報を確認できます。また、conf/flink-conf.yaml ファイルで Flink の設定を変更することもできます。例えば、メモリやスロットなどです。

注意事項

TiDB のバージョンが v4.0.0 より前の場合、flink.cdc.pd-addresses の追加設定が必要です。

```Bash
############################################
### flink-cdc 設定 for `tidb`
############################################
# # TiDB v4.0.0 より前のバージョンでのみ有効です。
# # TiKV クラスターの PD アドレス。
# flink.cdc.pd-addresses = 127.0.0.1:2379
```