メインコンテンツまでスキップ
バージョン: 2.5

MySQL からのリアルタイム同期

StarRocks は、MySQL から StarRocks へのデータをリアルタイムで同期するための複数の方法をサポートしており、大量データの低遅延リアルタイム分析を実現します。

このトピックでは、Apache Flink® を使用して MySQL から StarRocks へリアルタイム(数秒以内)でデータを同期する方法を説明します。

仕組み

ヒント

Flink CDC は MySQL から Flink への同期に使用されます。このトピックでは、バージョン 3.0 未満の Flink CDC を使用しており、SMT を使用してテーブルスキーマを同期します。ただし、Flink CDC 3.0 を使用する場合、StarRocks へのテーブルスキーマの同期に SMT を使用する必要はありません。Flink CDC 3.0 は、MySQL データベース全体のスキーマ、シャード化されたデータベースとテーブルのスキーマを同期することもでき、スキーマ変更の同期もサポートしています。詳細な使用方法については、Streaming ELT from MySQL to StarRocks を参照してください。

以下の図は、同期プロセス全体を示しています。

img

MySQL から Flink を経由して StarRocks へのリアルタイム同期は、データベース & テーブルスキーマの同期とデータの同期の 2 段階で実装されます。まず、SMT が MySQL データベース & テーブルスキーマを StarRocks 用のテーブル作成文に変換します。その後、Flink クラスターが Flink ジョブを実行して、MySQL の完全および増分データを StarRocks に同期します。

備考

同期プロセスは、厳密な一度だけのセマンティクスを保証します。

同期プロセス:

  1. データベース & テーブルスキーマの同期。

    SMT は、同期対象の MySQL データベース & テーブルのスキーマを読み取り、StarRocks でのデスティネーションデータベース & テーブルを作成するための SQL ファイルを生成します。この操作は、SMT の設定ファイルに基づく MySQL および StarRocks の情報に基づいています。

  2. データの同期。

    a. Flink SQL クライアントがデータロード文 INSERT INTO SELECT を実行して、1 つまたは複数の Flink ジョブを Flink クラスターに送信します。

    b. Flink クラスターが Flink ジョブを実行してデータを取得します。Flink CDC connector は、最初にソースデータベースから完全な履歴データを読み取り、その後シームレスに増分読み取りに切り替え、データを flink-connector-starrocks に送信します。

    c. flink-connector-starrocks はデータをミニバッチで蓄積し、各バッチのデータを StarRocks に同期します。

    備考

    MySQL のデータ操作言語 (DML) 操作のみが StarRocks に同期できます。データ定義言語 (DDL) 操作は同期できません。

シナリオ

MySQL からのリアルタイム同期は、データが常に変更される幅広いユースケースに対応しています。実際のユースケースとして「商品の売上のリアルタイムランキング」を例にとります。

Flink は、MySQL の元の注文テーブルに基づいて商品の売上のリアルタイムランキングを計算し、そのランキングを StarRocks の主キーテーブルにリアルタイムで同期します。ユーザーは、StarRocks に接続されたビジュアライゼーションツールを使用して、リアルタイムでランキングを表示し、オンデマンドの運用インサイトを得ることができます。

準備

同期ツールのダウンロードとインストール

MySQL からデータを同期するには、以下のツールをインストールする必要があります: SMT、Flink、Flink CDC connector、flink-connector-starrocks。

  1. Flink をダウンロードしてインストールし、Flink クラスターを開始します。この手順は Flink 公式ドキュメント に従って実行することもできます。

    a. Flink を実行する前に、オペレーティングシステムに Java 8 または Java 11 をインストールします。以下のコマンドを実行して、インストールされている Java のバージョンを確認できます。

        # Java のバージョンを表示します。
    java -version

    # 以下の出力が返された場合、Java 8 がインストールされています。
    java version "1.8.0_301"
    Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
    Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)

    b. Flink インストールパッケージ をダウンロードして解凍します。Flink 1.14 以降の使用を推奨します。最小許容バージョンは Flink 1.11 です。このトピックでは Flink 1.14.5 を使用します。

       # Flink をダウンロードします。
    wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
    # Flink を解凍します。
    tar -xzf flink-1.14.5-bin-scala_2.11.tgz
    # Flink ディレクトリに移動します。
    cd flink-1.14.5

    c. Flink クラスターを開始します。

       # Flink クラスターを開始します。
    ./bin/start-cluster.sh

    # 以下の出力が返された場合、Flink クラスターが開始されています。
    Starting cluster.
    Starting standalonesession daemon on host.
    Starting taskexecutor daemon on host.
  2. Flink CDC connector をダウンロードします。このトピックではデータソースとして MySQL を使用しているため、flink-sql-connector-mysql-cdc-x.x.x.jar をダウンロードします。コネクタのバージョンは Flink のバージョンと一致している必要があります。詳細なバージョンマッピングについては、Supported Flink Versions を参照してください。このトピックでは Flink 1.14.5 を使用し、flink-sql-connector-mysql-cdc-2.2.0.jar をダウンロードできます。

    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.2.0.jar
  3. flink-connector-starrocks をダウンロードします。バージョンは Flink のバージョンと一致している必要があります。

    flink-connector-starrocks パッケージ x.x.x_flink-y.yy _ z.zz.jar には 3 つのバージョン番号が含まれています:

    • x.x.x は flink-connector-starrocks のバージョン番号です。
    • y.yy はサポートされている Flink のバージョンです。
    • z.zz は Flink がサポートする Scala のバージョンです。Flink のバージョンが 1.14.x 以下の場合、Scala バージョンを持つパッケージをダウンロードする必要があります。

    このトピックでは Flink 1.14.5 と Scala 2.11 を使用します。したがって、以下のパッケージをダウンロードできます: 1.2.3_flink-14_2.11.jar

  4. Flink CDC connector (flink-sql-connector-mysql-cdc-2.2.0.jar) と flink-connector-starrocks (1.2.3_flink-1.14_2.11.jar) の JAR パッケージを Flink の lib ディレクトリに移動します。

    Note

    システムに既に Flink クラスターが実行されている場合、Flink クラスターを停止し、JAR パッケージをロードして検証するために再起動する必要があります。

    $ ./bin/stop-cluster.sh
    $ ./bin/start-cluster.sh
  5. SMT パッケージ をダウンロードして解凍し、flink-1.14.5 ディレクトリに配置します。StarRocks は Linux x86 および macos ARM64 用の SMT パッケージを提供しています。オペレーティングシステムと CPU に基づいて選択できます。以下のコマンドを実行して SMT パッケージを取得することもできます。

    # for Linux x86
    wget https://releases.starrocks.io/resources/smt.tar.gz
    # for macOS ARM64
    wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz

MySQL バイナリログの有効化

MySQL からリアルタイムでデータを同期するには、システムが MySQL バイナリログ (binlog) からデータを読み取り、データを解析してから StarRocks に同期する必要があります。MySQL バイナリログが有効になっていることを確認してください。

  1. MySQL バイナリログを有効にするために、MySQL 設定ファイル my.cnf(デフォルトパス: /etc/my.cnf)を編集します。

    # MySQL Binlog を有効にします。
    log_bin = ON
    # Binlog の保存パスを設定します。
    log_bin =/var/lib/mysql/mysql-bin
    # server_id を設定します。
    # MySQL 5.7.3 以降では server_id が設定されていない場合、MySQL サービスを使用できません。
    server_id = 1
    # Binlog フォーマットを ROW に設定します。
    binlog_format = ROW
    # Binlog ファイルの基本名。各 Binlog ファイルを識別するための識別子が追加されます。
    log_bin_basename =/var/lib/mysql/mysql-bin
    # Binlog ファイルのインデックスファイル。すべての Binlog ファイルのディレクトリを管理します。
    log_bin_index =/var/lib/mysql/mysql-bin.index
  2. 修正した設定ファイルを有効にするために、次のいずれかのコマンドを実行して MySQL を再起動します。

    # service を使用して MySQL を再起動します。
    service mysqld restart
    # mysqld スクリプトを使用して MySQL を再起動します。
    /etc/init.d/mysqld restart
  3. MySQL に接続し、MySQL バイナリログが有効になっているかどうかを確認します。

    -- MySQL に接続します。
    mysql -h xxx.xx.xxx.xx -P 3306 -u root -pxxxxxx

    -- MySQL バイナリログが有効になっているかどうかを確認します。
    mysql> SHOW VARIABLES LIKE 'log_bin';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin | ON |
    +---------------+-------+
    1 row in set (0.00 sec)

データベース & テーブルスキーマの同期

  1. SMT 設定ファイルを編集します。 SMT の conf ディレクトリに移動し、設定ファイル config_prod.conf を編集します。MySQL 接続情報、同期対象のデータベース & テーブルのマッチングルール、flink-connector-starrocks の設定情報などを設定します。

    [db]
    type = mysql
    host = xxx.xx.xxx.xx
    port = 3306
    user = user1
    password = xxxxxx

    [other]
    # StarRocks の BE の数
    be_num = 3
    # `decimal_v3` は StarRocks-1.18.1 以降でサポートされています。
    use_decimal_v3 = true
    # 変換された DDL SQL を保存するファイル
    output_dir = ./result

    [table-rule.1]
    # プロパティを設定するためのデータベースをマッチングするパターン
    database = ^demo.*$
    # プロパティを設定するためのテーブルをマッチングするパターン
    table = ^.*$

    ############################################
    ### Flink sink 設定
    ### `connector`、`table-name`、`database-name` を設定しないでください。これらは自動生成されます。
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
    flink.starrocks.load-url= <fe_host>:<fe_http_port>
    flink.starrocks.username=user2
    flink.starrocks.password=xxxxxx
    flink.starrocks.sink.properties.format=csv
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    • [db]: ソースデータベースにアクセスするための情報。

      • type: ソースデータベースの種類。このトピックでは、ソースデータベースは mysql です。
      • host: MySQL サーバーの IP アドレス。
      • port: MySQL データベースのポート番号。デフォルトは 3306
      • user: MySQL データベースにアクセスするためのユーザー名。
      • password: ユーザー名のパスワード。
    • [table-rule]: データベース & テーブルのマッチングルールと対応する flink-connector-starrocks の設定。

      • Database, table: MySQL のデータベース & テーブルの名前。正規表現がサポートされています。
      • flink.starrocks.*: flink-connector-starrocks の設定情報。詳細な設定と情報については、flink-connector-starrocks を参照してください。

      異なるテーブルに対して異なる flink-connector-starrocks 設定を使用する必要がある場合。たとえば、一部のテーブルが頻繁に更新され、データロードを加速する必要がある場合は、異なるテーブルに対して異なる flink-connector-starrocks 設定を使用する を参照してください。MySQL シャーディングから取得した複数のテーブルを同じ StarRocks テーブルにロードする必要がある場合は、MySQL シャーディング後の複数のテーブルを StarRocks の 1 つのテーブルに同期する を参照してください。

    • [other]: その他の情報

      • be_num: StarRocks クラスター内の BE の数(このパラメータは、後続の StarRocks テーブル作成で合理的なタブレット数を設定するために使用されます)。
      • use_decimal_v3: Decimal V3 を有効にするかどうか。Decimal V3 が有効になると、MySQL の decimal データは StarRocks に同期される際に Decimal V3 データに変換されます。
      • output_dir: 生成される SQL ファイルを保存するパス。SQL ファイルは、StarRocks でデータベース & テーブルを作成し、Flink クラスターに Flink ジョブを送信するために使用されます。デフォルトパスは ./result であり、デフォルト設定を保持することをお勧めします。
  2. SMT を実行して MySQL のデータベース & テーブルスキーマを読み取り、設定ファイルに基づいて ./result ディレクトリに SQL ファイルを生成します。starrocks-create.all.sql ファイルは StarRocks でデータベース & テーブルを作成するために使用され、flink-create.all.sql ファイルは Flink クラスターに Flink ジョブを送信するために使用されます。

    # SMT を実行します。
    ./starrocks-migrate-tool

    # 結果ディレクトリに移動し、このディレクトリ内のファイルを確認します。
    cd result
    ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql
  3. 次のコマンドを実行して StarRocks に接続し、starrocks-create.all.sql ファイルを実行して StarRocks にデータベースとテーブルを作成します。SQL ファイルのデフォルトのテーブル作成文を使用して 主キーテーブル を作成することをお勧めします。

    Note

    ビジネスニーズに基づいてテーブル作成文を変更し、主キーテーブルを使用しないテーブルを作成することもできます。ただし、ソース MySQL データベースの DELETE 操作は非主キーテーブルに同期できません。このようなテーブルを作成する際は注意が必要です。

    mysql -h <fe_host> -P <fe_query_port> -u user2 -pxxxxxx < starrocks-create.all.sql

    データがデスティネーションの StarRocks テーブルに書き込まれる前に Flink によって処理される必要がある場合、ソースとデスティネーションのテーブル間でテーブルスキーマが異なります。この場合、テーブル作成文を変更する必要があります。この例では、デスティネーションテーブルには product_idproduct_name 列および商品の売上のリアルタイムランキングのみが必要です。以下のテーブル作成文を使用できます。

    CREATE DATABASE IF NOT EXISTS `demo`;

    CREATE TABLE IF NOT EXISTS `demo`.`orders` (
    `product_id` INT(11) NOT NULL COMMENT "",
    `product_name` STRING NOT NULL COMMENT "",
    `sales_cnt` BIGINT NOT NULL COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`product_id`)
    DISTRIBUTED BY HASH(`product_id`) BUCKETS 1
    PROPERTIES (
    "replication_num" = "3"
    );

データの同期

Flink クラスターを実行し、Flink ジョブを送信して MySQL から StarRocks への完全および増分データを継続的に同期します。

  1. Flink ディレクトリに移動し、次のコマンドを実行して Flink SQL クライアントで flink-create.all.sql ファイルを実行します。

    ./bin/sql-client.sh -f flink-create.all.sql

    この SQL ファイルは、動的テーブル source tablesink table、クエリ文 INSERT INTO SELECT を定義し、コネクタ、ソースデータベース、およびデスティネーションデータベースを指定します。このファイルが実行されると、Flink ジョブが Flink クラスターに送信され、データ同期が開始されます。

    Note

    • Flink クラスターが開始されていることを確認してください。flink/bin/start-cluster.sh を実行して Flink クラスターを開始できます。
    • Flink のバージョンが 1.13 より前の場合、SQL ファイル flink-create.all.sql を直接実行できない場合があります。このファイル内の SQL 文を SQL クライアントのコマンドラインインターフェース (CLI) で 1 つずつ実行する必要があります。また、\ 文字をエスケープする必要があります。
    'sink.properties.column_separator' = '\\x01'
    'sink.properties.row_delimiter' = '\\x02'

    同期中のデータ処理:

    同期中にデータを処理する必要がある場合、たとえばデータに対して GROUP BY や JOIN を実行する場合、flink-create.all.sql ファイルを変更できます。以下の例では、COUNT (*) と GROUP BY を実行して商品の売上のリアルタイムランキングを計算します。

        $ ./bin/sql-client.sh -f flink-create.all.sql
    No default environment is specified.
    Searching for '/home/disk1/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
    [INFO] Executing SQL from file.

    Flink SQL> CREATE DATABASE IF NOT EXISTS `default_catalog`.`demo`;
    [INFO] Execute statement succeed.

    -- MySQL の注文テーブルに基づいて動的テーブル `source table` を作成します。
    Flink SQL>
    CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_src` (`order_id` BIGINT NOT NULL,
    `product_id` INT NULL,
    `order_date` TIMESTAMP NOT NULL,
    `customer_name` STRING NOT NULL,
    `product_name` STRING NOT NULL,
    `price` DECIMAL(10, 5) NULL,
    PRIMARY KEY(`order_id`)
    NOT ENFORCED
    ) with ('connector' = 'mysql-cdc',
    'hostname' = 'xxx.xx.xxx.xxx',
    'port' = '3306',
    'username' = 'root',
    'password' = '',
    'database-name' = 'demo',
    'table-name' = 'orders'
    );
    [INFO] Execute statement succeed.

    -- 動的テーブル `sink table` を作成します。
    Flink SQL>
    CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_sink` (`product_id` INT NOT NULL,
    `product_name` STRING NOT NULL,
    `sales_cnt` BIGINT NOT NULL,
    PRIMARY KEY(`product_id`)
    NOT ENFORCED
    ) with ('sink.max-retries' = '10',
    'jdbc-url' = 'jdbc:mysql://<fe_host>:<fe_query_port>',
    'password' = '',
    'sink.properties.strip_outer_array' = 'true',
    'sink.properties.format' = 'json',
    'load-url' = '<fe_host>:<fe_http_port>',
    'username' = 'root',
    'sink.buffer-flush.interval-ms' = '15000',
    'connector' = 'starrocks',
    'database-name' = 'demo',
    'table-name' = 'orders'
    );
    [INFO] Execute statement succeed.

    -- 商品の売上のリアルタイムランキングを実装し、`sink table``source table` のデータ変更を反映するように動的に更新されます。
    Flink SQL>
    INSERT INTO `default_catalog`.`demo`.`orders_sink` select product_id,product_name, count(*) as cnt from `default_catalog`.`demo`.`orders_src` group by product_id,product_name;
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 5ae005c4b3425d8bb13fe660260a35da

    データの一部のみを同期する必要がある場合、たとえば支払い時間が 2021 年 12 月 21 日以降のデータのみを同期する場合、INSERT INTO SELECTWHERE 句を使用してフィルター条件を設定できます。たとえば、WHERE pay_dt > '2021-12-21' のように設定します。この条件を満たさないデータは StarRocks に同期されません。

    以下の結果が返された場合、Flink ジョブが完全および増分同期のために送信されました。

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 5ae005c4b3425d8bb13fe660260a35da
  2. Flink WebUI を使用するか、Flink SQL クライアントで bin/flink list -running コマンドを実行して、Flink クラスターで実行中の Flink ジョブとジョブ ID を確認できます。

    • Flink WebUI img

    • bin/flink list -running

        $ bin/flink list -running
    Waiting for response...
    ------------------ Running/Restarting Jobs -------------------
    13.10.2022 15:03:54 : 040a846f8b58e82eb99c8663424294d5 : insert-into_default_catalog.lily.example_tbl1_sink (RUNNING)
    --------------------------------------------------------------

    Note

    ジョブが異常な場合、Flink WebUI を使用してトラブルシューティングを行うか、Flink 1.14.5 の /log ディレクトリ内のログファイルを確認してトラブルシューティングを行うことができます。

FAQ

データソース内の一部のテーブルが頻繁に更新され、flink-connector-starrocks のロード速度を加速したい場合、SMT 設定ファイル config_prod.conf 内で各テーブルに対して個別の flink-connector-starrocks 設定を設定する必要があります。

[table-rule.1]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^order.*$
# プロパティを設定するためのテーブルをマッチングするパターン
table = ^.*$

############################################
### Flink sink 設定
### `connector`、`table-name`、`database-name` を設定しないでください。これらは自動生成されます。
############################################
flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
flink.starrocks.load-url= <fe_host>:<fe_http_port>
flink.starrocks.username=user2
flink.starrocks.password=xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000[table-rule.2]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^order2.*$
# プロパティを設定するためのテーブルをマッチングするパターン
table = ^.*$

############################################
### Flink sink 設定
### `connector`、`table-name`、`database-name` を設定しないでください。これらは自動生成されます。
############################################
flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
flink.starrocks.load-url= <fe_host>:<fe_http_port>
flink.starrocks.username=user2
flink.starrocks.password=xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=10000

MySQL シャーディング後の複数のテーブルを StarRocks の 1 つのテーブルに同期する

シャーディングが行われた後、1 つの MySQL テーブルのデータが複数のテーブルに分割されるか、さらには複数のデータベースに分散されることがあります。すべてのテーブルは同じスキーマを持っています。この場合、[table-rule] を設定してこれらのテーブルを 1 つの StarRocks テーブルに同期できます。たとえば、MySQL には edu_db_1edu_db_2 という 2 つのデータベースがあり、それぞれに course_1course_2 という 2 つのテーブルがあり、すべてのテーブルのスキーマは同じです。以下の [table-rule] 設定を使用して、すべてのテーブルを 1 つの StarRocks テーブルに同期できます。

Note

StarRocks テーブルの名前はデフォルトで course__auto_shard です。異なる名前を使用する必要がある場合は、SQL ファイル starrocks-create.all.sql および flink-create.all.sql で変更できます。

[table-rule.1]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^edu_db_[0-9]*$
# プロパティを設定するためのテーブルをマッチングするパターン
table = ^course_[0-9]*$

############################################
### Flink sink 設定
### `connector`、`table-name`、`database-name` を設定しないでください。これらは自動生成されます。
############################################
flink.starrocks.jdbc-url = jdbc: mysql://xxx.xxx.x.x:xxxx
flink.starrocks.load-url = xxx.xxx.x.x:xxxx
flink.starrocks.username = user2
flink.starrocks.password = xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator =\x01
flink.starrocks.sink.properties.row_delimiter =\x02
flink.starrocks.sink.buffer-flush.interval-ms = 5000

JSON 形式でデータをインポートする

前述の例では、データは CSV 形式でインポートされています。適切な区切り文字を選択できない場合、[table-rule] 内の flink.starrocks.* の以下のパラメータを置き換える必要があります。

flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator =\x01
flink.starrocks.sink.properties.row_delimiter =\x02

次のパラメータを渡すと、データは JSON 形式でインポートされます。

flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true

Note

この方法はロード速度をわずかに低下させます。

flink-create.all.sql ファイル内で STATEMENT SET 構文を使用して、複数の INSERT INTO 文を 1 つの Flink ジョブとして実行できます。これにより、複数の文が Flink ジョブリソースを過剰に使用するのを防ぎ、複数のクエリを実行する効率が向上します。

Note

Flink は 1.13 以降で STATEMENT SET 構文をサポートしています。

  1. result/flink-create.all.sql ファイルを開きます。

  2. ファイル内の SQL 文を変更します。すべての INSERT INTO 文をファイルの末尾に移動します。最初の INSERT INTO 文の前に EXECUTE STATEMENT SET BEGIN を配置し、最後の INSERT INTO 文の後に END; を配置します。

Note

CREATE DATABASE および CREATE TABLE の位置は変更されません。

CREATE DATABASE IF NOT EXISTS db;
CREATE TABLE IF NOT EXISTS db.a1;
CREATE TABLE IF NOT EXISTS db.b1;
CREATE TABLE IF NOT EXISTS db.a2;
CREATE TABLE IF NOT EXISTS db.b2;
EXECUTE STATEMENT SET
BEGIN-- 1 つまたは複数の INSERT INTO 文
INSERT INTO db.a1 SELECT * FROM db.b1;
INSERT INTO db.a2 SELECT * FROM db.b2;
END;