Skip to main content
Version: Stable-3.1

Realtime synchronization from MySQL

StarRocks supports multiple methods to synchronize data from MySQL to StarRocks in real time, delivering low latency real-time analytics of massive data.

This topic describes how to synchronize data from MySQL to StarRocks in real-time (within seconds) through Apache Flink®.

You can load data into StarRocks tables only as a user who has the INSERT privilege on those StarRocks tables. If you do not have the INSERT privilege, follow the instructions provided in GRANT to grant the INSERT privilege to the user that you use to connect to your StarRocks cluster.

How it works

tip

Flink CDC is used in the synchronization from MySQL to Flink. This topic uses Flink CDC whose version is less than 3.0, so SMT is used to synchronize table schemas. However, if Flink CDC 3.0 is used, it is not necessary to use SMT to synchronize table schemas to StarRocks. Flink CDC 3.0 can even synchronize the schemas of the entire MySQL database, the sharded databases and tables, and also supports schema changes synchronization. For detailed usage, see Streaming ELT from MySQL to StarRocks.

The following figure illustrates the entire synchronization process.

img

Real-time synchronization from MySQL through Flink to StarRocks is implemented in two stages: synchronizing database & table schema and synchronizing data. First, the SMT converts MySQL database & table schema into table creation statements for StarRocks. Then, the Flink cluster runs Flink jobs to synchronize full and incremental MySQL data to StarRocks.

info

The synchronization process guarantees exactly-once semantics.

Synchronization process:

  1. Synchronize database & table schema.

    The SMT reads the schema of the MySQL database & table to be synchronized and generates SQL files for creating a destination database & table in StarRocks. This operation is based on the MySQL and StarRocks information in SMT's configuration file.

  2. Synchronize data.

    a. The Flink SQL client executes the data loading statement INSERT INTO SELECT to submit one or more Flink jobs to the Flink cluster.

    b. The Flink cluster runs the Flink jobs to obtain data. The Flink CDC connector first reads full historical data from the source database, then seamlessly switches to incremental reading, and sends the data to flink-connector-starrocks.

    c. flink-connector-starrocks accumulates data in mini-batches, and synchronizes each batch of data to StarRocks.

    info

    Only data manipulation language (DML) operations in MySQL can be synchronized to StarRocks. Data definition language (DDL) operations cannot be synchronized.

Scenarios

Real-time synchronization from MySQL has a broad range of use cases where data is constantly changed. Take a real-world use case "real-time ranking of commodity sales" as an example.

Flink calculates the real-time ranking of commodity sales based on the original order table in MySQL and synchronizes the ranking to StarRocks' Primary Key table in real time. Users can connect a visualization tool to StarRocks to view the ranking in real time to gain on-demand operational insights.

Preparations

Download and install synchronization tools

To synchronize data from MySQL, you need to install the following tools: SMT, Flink, Flink CDC connector, and flink-connector-starrocks.

  1. Download and install Flink, and start the Flink cluster. You can also perform this step by following the instructions in Flink official documentation.

    a. Install Java 8 or Java 11 in your operating system before you run Flink. You can run the following command to check the installed Java version.

        # View the Java version.
    java -version

    # Java 8 is installed if the following output is returned.
    java version "1.8.0_301"
    Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
    Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)

    b. Download the Flink installation package and decompress it. We recommend that you use Flink 1.14 or later. The minimum allowed version is Flink 1.11. This topic uses Flink 1.14.5.

       # Download Flink.
    wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
    # Decompress Flink.
    tar -xzf flink-1.14.5-bin-scala_2.11.tgz
    # Go to the Flink directory.
    cd flink-1.14.5

    c. Start the Flink cluster.

       # Start the Flink cluster.
    ./bin/start-cluster.sh

    # The Flink cluster is started if the following output is returned.
    Starting cluster.
    Starting standalonesession daemon on host.
    Starting taskexecutor daemon on host.
  2. Download Flink CDC connector. This topic uses MySQL as the data source and therefore, flink-sql-connector-mysql-cdc-x.x.x.jar is downloaded. The connector version must match the Flink version. This topic uses Flink 1.14.5 and you can download flink-sql-connector-mysql-cdc-2.2.0.jar.

    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.2.0.jar
  3. Download flink-connector-starrocks. The version must match the Flink version.

    The flink-connector-starrocks package x.x.x_flink-y.yy _ z.zz.jar contains three version numbers:

    • x.x.x is the version number of flink-connector-starrocks.
    • y.yy is the supported Flink version.
    • z.zz is the Scala version supported by Flink. If the Flink version is 1.14.x or earlier, you must download a package that has the Scala version.

    This topic uses Flink 1.14.5 and Scala 2.11. Therefore, you can download the following package: 1.2.3_flink-14_2.11.jar.

  4. Move the JAR packages of Flink CDC connector (flink-sql-connector-mysql-cdc-2.2.0.jar) and flink-connector-starrocks (1.2.3_flink-1.14_2.11.jar) to the lib directory of Flink.

    Note

    If a Flink cluster is already running in your system, you must stop the Flink cluster and restart it to load and validate the JAR packages.

    $ ./bin/stop-cluster.sh
    $ ./bin/start-cluster.sh
  5. Download and decompress the SMT package and place it in the flink-1.14.5 directory. StarRocks provides SMT packages for Linux x86 and macos ARM64. You can choose one based on your operating system and CPU.

    # for Linux x86
    wget https://releases.starrocks.io/resources/smt.tar.gz
    # for macOS ARM64
    wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz

Enable MySQL binary log

To synchronize data from MySQL in real time, the system needs to read data from MySQL binary log (binlog), parse the data, and then synchronize the data to StarRocks. Make sure that MySQL binary log is enabled.

  1. Edit the MySQL configuration file my.cnf (default path: /etc/my.cnf) to enable MySQL binary log.

    # Enable MySQL Binlog.
    log_bin = ON
    # Configure the save path for the Binlog.
    log_bin =/var/lib/mysql/mysql-bin
    # Configure server_id.
    # If server_id is not configured for MySQL 5.7.3 or later, the MySQL service cannot be used.
    server_id = 1
    # Set the Binlog format to ROW.
    binlog_format = ROW
    # The base name of the Binlog file. An identifier is appended to identify each Binlog file.
    log_bin_basename =/var/lib/mysql/mysql-bin
    # The index file of Binlog files, which manages the directory of all Binlog files.
    log_bin_index =/var/lib/mysql/mysql-bin.index
  2. Run one of the following commands to restart MySQL for the modified configuration file to take effect.

    # Use service to restart MySQL.
    service mysqld restart
    # Use mysqld script to restart MySQL.
    /etc/init.d/mysqld restart
  3. Connect to MySQL and check whether MySQL binary log is enabled.

    -- Connect to MySQL.
    mysql -h xxx.xx.xxx.xx -P 3306 -u root -pxxxxxx

    -- Check whether MySQL binary log is enabled.
    mysql> SHOW VARIABLES LIKE 'log_bin';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin | ON |
    +---------------+-------+
    1 row in set (0.00 sec)

Synchronize database & table schema

  1. Edit the SMT configuration file. Go to the SMT conf directory and edit the configuration file config_prod.conf, such as MySQL connection information, the matching rules of the database & table to be synchronized, and configuration information of flink-connector-starrocks.

    [db]
    type = mysql
    host = xxx.xx.xxx.xx
    port = 3306
    user = user1
    password = xxxxxx

    [other]
    # Number of BEs in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1.
    use_decimal_v3 = true
    # File to save the converted DDL SQL
    output_dir = ./result

    [table-rule.1]
    # Pattern to match databases for setting properties
    database = ^demo.*$
    # Pattern to match tables for setting properties
    table = ^.*$

    ############################################
    ### Flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`. They are auto-generated.
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
    flink.starrocks.load-url= <fe_host>:<fe_http_port>
    flink.starrocks.username=user2
    flink.starrocks.password=xxxxxx
    flink.starrocks.sink.properties.format=csv
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    • [db]: information used to access the source database.

      • type: type of the source database. In this topic, the source database is mysql.
      • host: IP address of the MySQL server.
      • port: port number of the MySQL database, defaults to 3306
      • user: username for accessing the MySQL database
      • password: password of the username
    • [table-rule]: database & table matching rules and the corresponding flink-connector-starrocks configuration.

      • Database, table: the names of the database & table in MySQL. Regular expressions are supported.

      • flink.starrocks.*: configuration information of flink-connector-starrocks. For more configurations and information, see flink-connector-starrocks.

      If you need to use different flink-connector-starrocks configurations for different tables. For example, if some tables are frequently updated and you need to accelerate data loading, see Use different flink-connector-starrocks configurations for different tables. If you need to load multiple tables obtained from MySQL sharding into the same StarRocks table, see Synchronize multiple tables after MySQL sharding to one table in StarRocks.

    • [other]: other information

      • be_num: The number of BEs in your StarRocks cluster (This parameter will be used for setting a reasonable number of tablets in subsequent StarRocks table creation).
      • use_decimal_v3: Whether to enable Decimal V3. After Decimal V3 is enabled, MySQL decimal data will be converted into Decimal V3 data when data is synchronized to StarRocks.
      • output_dir: The path to save the SQL files to be generated. The SQL files will be used to create a database & table in StarRocks and submit a Flink job to the Flink cluster. The default path is ./result and we recommend that you retain the default settings.
  2. Run the SMT to read the database & table schema in MySQL and generate SQL files in the ./result directory based on the configuration file. The starrocks-create.all.sql file is used to create a database & table in StarRocks and the flink-create.all.sql file is used to submit a Flink job to the Flink cluster.

    # Run the SMT.
    ./starrocks-migrate-tool

    # Go to the result directory and check the files in this directory.
    cd result
    ls result
    flink-create.1.sql smt.tar.gz starrocks-create.all.sql
    flink-create.all.sql starrocks-create.1.sql
  3. Run the following command to connect to StarRocks and execute the starrocks-create.all.sql file to create a database and table in StarRocks. We recommend that you use the default table creation statement in the SQL file to create a table of the Primary Key table.

    Note

    You can also modify the table creation statement based on your business needs and create a table that does not use the Primary Key table. However, the DELETE operation in the source MySQL database cannot be synchronized to the non-primary key table. Exercise caution when you create such a table.

    mysql -h <fe_host> -P <fe_query_port> -u user2 -pxxxxxx < starrocks-create.all.sql

    If the data needs to be processed by Flink before it is written to the destination StarRocks table, the table schema will be different between the source and destination tables. In this case, you must modify the table creation statement. In this example, the destination table requires only the product_id and product_name columns and real-time ranking of commodity sales. You can use the following table creation statement.

    CREATE DATABASE IF NOT EXISTS `demo`;

    CREATE TABLE IF NOT EXISTS `demo`.`orders` (
    `product_id` INT(11) NOT NULL COMMENT "",
    `product_name` STRING NOT NULL COMMENT "",
    `sales_cnt` BIGINT NOT NULL COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`product_id`)
    DISTRIBUTED BY HASH(`product_id`)
    PROPERTIES (
    "replication_num" = "3"
    );

    NOTICE

    Since v2.5.7, StarRocks can automatically set the number of buckets (BUCKETS) when you create a table or add a partition. You no longer need to manually set the number of buckets. For detailed information, see determine the number of buckets.

Synchronize data

Run the Flink cluster and submit a Flink job to continuously synchronize full and incremental data from MySQL to StarRocks.

  1. Go to the Flink directory and run the following command to run the flink-create.all.sql file on your Flink SQL client.

    ./bin/sql-client.sh -f flink-create.all.sql

    This SQL file defines dynamic tables source table and sink table, query statement INSERT INTO SELECT, and specifies the connector, source database, and destination database. After this file is executed, a Flink job is submitted to the Flink cluster to start data synchronization.

    Note

    • Make sure that the Flink cluster has been started. You can start the Flink cluster by running flink/bin/start-cluster.sh.
    • If your Flink version is earlier than 1.13, you may not be able to directly run the SQL file flink-create.all.sql. You need to execute SQL statements one by one in this file in the command line interface (CLI) of the SQL client. You also need to escape the \ character.
    'sink.properties.column_separator' = '\\x01'
    'sink.properties.row_delimiter' = '\\x02'

    Process data during synchronization:

    If you need to process data during synchronization, such as performing GROUP BY or JOIN on the data, you can modify the flink-create.all.sql file. The following example calculates real-time ranking of commodity sales by executing COUNT (*) and GROUP BY.

        $ ./bin/sql-client.sh -f flink-create.all.sql
    No default environment is specified.
    Searching for '/home/disk1/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
    [INFO] Executing SQL from file.

    Flink SQL> CREATE DATABASE IF NOT EXISTS `default_catalog`.`demo`;
    [INFO] Execute statement succeed.

    -- Create a dynamic table `source table` based on the order table in MySQL.
    Flink SQL>
    CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_src` (`order_id` BIGINT NOT NULL,
    `product_id` INT NULL,
    `order_date` TIMESTAMP NOT NULL,
    `customer_name` STRING NOT NULL,
    `product_name` STRING NOT NULL,
    `price` DECIMAL(10, 5) NULL,
    PRIMARY KEY(`order_id`)
    NOT ENFORCED
    ) with ('connector' = 'mysql-cdc',
    'hostname' = 'xxx.xx.xxx.xxx',
    'port' = '3306',
    'username' = 'root',
    'password' = '',
    'database-name' = 'demo',
    'table-name' = 'orders'
    );
    [INFO] Execute statement succeed.

    -- Create a dynamic table `sink table`.
    Flink SQL>
    CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_sink` (`product_id` INT NOT NULL,
    `product_name` STRING NOT NULL,
    `sales_cnt` BIGINT NOT NULL,
    PRIMARY KEY(`product_id`)
    NOT ENFORCED
    ) with ('sink.max-retries' = '10',
    'jdbc-url' = 'jdbc:mysql://<fe_host>:<fe_query_port>',
    'password' = '',
    'sink.properties.strip_outer_array' = 'true',
    'sink.properties.format' = 'json',
    'load-url' = '<fe_host>:<fe_http_port>',
    'username' = 'root',
    'sink.buffer-flush.interval-ms' = '15000',
    'connector' = 'starrocks',
    'database-name' = 'demo',
    'table-name' = 'orders'
    );
    [INFO] Execute statement succeed.

    -- Implement real-time ranking of commodity sales, where `sink table` is dynamically updated to reflect data changes in `source table`.
    Flink SQL>
    INSERT INTO `default_catalog`.`demo`.`orders_sink` select product_id,product_name, count(*) as cnt from `default_catalog`.`demo`.`orders_src` group by product_id,product_name;
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 5ae005c4b3425d8bb13fe660260a35da

    If you only need to synchronize only a portion of the data, such as data whose payment time is later than December 21, 2021, you can use the WHERE clause in INSERT INTO SELECT to set a filter condition, such as WHERE pay_dt > '2021-12-21'. Data that does not meet this condition will not be synchronized to StarRocks.

    If the following result is returned, the Flink job has been submitted for full and incremental synchronization.

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 5ae005c4b3425d8bb13fe660260a35da
  2. You can use the Flink WebUI or run the bin/flink list -running command on your Flink SQL client to view Flink jobs that are running in the Flink cluster and the job IDs.

    • Flink WebUI img

    • bin/flink list -running

        $ bin/flink list -running
    Waiting for response...
    ------------------ Running/Restarting Jobs -------------------
    13.10.2022 15:03:54 : 040a846f8b58e82eb99c8663424294d5 : insert-into_default_catalog.lily.example_tbl1_sink (RUNNING)
    --------------------------------------------------------------

    Note

    If the job is abnormal, you can perform troubleshooting by using Flink WebUI or by viewing the log file in the /log directory of Flink 1.14.5.

FAQ

If some tables in the data source are frequently updated and you want to accelerate the loading speed of flink-connector-starrocks, you must set a separate flink-connector-starrocks configuration for each table in the SMT configuration file config_prod.conf.

[table-rule.1]
# Pattern to match databases for setting properties
database = ^order.*$
# Pattern to match tables for setting properties
table = ^.*$

############################################
### Flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`. They are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
flink.starrocks.load-url= <fe_host>:<fe_http_port>
flink.starrocks.username=user2
flink.starrocks.password=xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000[table-rule.2]
# Pattern to match databases for setting properties
database = ^order2.*$
# Pattern to match tables for setting properties
table = ^.*$

############################################
### Flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`. They are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
flink.starrocks.load-url= <fe_host>:<fe_http_port>
flink.starrocks.username=user2
flink.starrocks.password=xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=10000

Synchronize multiple tables after MySQL sharding to one table in StarRocks

After sharding is performed, data in one MySQL table may be split into multiple tables or even distributed to multiple databases. All the tables have the same schema. In this case, you can set [table-rule] to synchronize these tables to one StarRocks table. For example, MySQL has two databases edu_db_1 and edu_db_2, each of which has two tables course_1 and course_2, and the schema of all tables is the same. You can use the following [table-rule] configuration to synchronize all the tables to one StarRocks table.

Note

The name of the StarRocks table defaults to course__auto_shard. If you need to use a different name, you can modify it in the SQL files starrocks-create.all.sql and flink-create.all.sql

[table-rule.1]
# Pattern to match databases for setting properties
database = ^edu_db_[0-9]*$
# Pattern to match tables for setting properties
table = ^course_[0-9]*$

############################################
### Flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`. They are auto-generated
############################################
flink.starrocks.jdbc-url = jdbc: mysql://xxx.xxx.x.x:xxxx
flink.starrocks.load-url = xxx.xxx.x.x:xxxx
flink.starrocks.username = user2
flink.starrocks.password = xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator =\x01
flink.starrocks.sink.properties.row_delimiter =\x02
flink.starrocks.sink.buffer-flush.interval-ms = 5000

Import data in JSON format

Data in the preceding example is imported in CSV format. If you are unable to choose a suitable delimiter, you need to replace the following parameters of flink.starrocks.* in [table-rule].

flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator =\x01
flink.starrocks.sink.properties.row_delimiter =\x02

Data is imported in JSON format after the following parameters are passed in.

flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true

Note

This method slightly slows down the loading speed.

You can use the STATEMENT SET syntax in the flink-create.all.sql file to execute multiple INSERT INTO statements as one Flink job, which prevents multiple statements from taking up too many Flink job resources and improves the efficiency of executing multiple queries.

Note

Flink supports the STATEMENT SET syntax from 1.13 onwards.

  1. Open the result/flink-create.all.sql file.

  2. Modify the SQL statements in the file. Move all the INSERT INTO statements to the end of the file. Place EXECUTE STATEMENT SET BEGIN before the first INSERT INTO statement and place END; after the last INSERT INTO statement.

Note

The positions of CREATE DATABASE and CREATE TABLE remain unchanged.

CREATE DATABASE IF NOT EXISTS db;
CREATE TABLE IF NOT EXISTS db.a1;
CREATE TABLE IF NOT EXISTS db.b1;
CREATE TABLE IF NOT EXISTS db.a2;
CREATE TABLE IF NOT EXISTS db.b2;
EXECUTE STATEMENT SET
BEGIN-- one or more INSERT INTO statements
INSERT INTO db.a1 SELECT * FROM db.b1;
INSERT INTO db.a2 SELECT * FROM db.b2;
END;