Apache Flink
Continuously load data from Apache Flink®
StarRocks provides a self-developed connector named StarRocks Connector for Apache Flink® (Flink connector for short) to help you load data into a StarRocks table by using Flink. The basic principle is to accumulate the data and then load it all at a time into StarRocks through STREAM LOAD.
The Flink connector supports DataStream API, Table API & SQL, and Python API. It has a higher and more stable performance than flink-connector-jdbc provided by Apache Flink®.
NOTICE
Loading data into StarRocks tables with Flink connector needs SELECT and INSERT privileges on the target StarRocks table. If you do not have these privileges, follow the instructions provided in GRANT to grant these privileges to the user that you use to connect to your StarRocks cluster.
Version requirements
Connector | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later | 8 | 2.11,2.12 |
1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later | 8 | 2.11,2.12 |
1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later | 8 | 2.11,2.12 |
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later | 8 | 2.11,2.12 |
Obtain Flink connector
You can obtain the Flink connector JAR file in the following ways:
- Directly download the compiled Flink connector JAR file.
- Add the Flink connector as a dependency in your Maven project and then download the JAR file.
- Compile the source code of the Flink connector into a JAR file by yourself.
The naming format of the Flink connector JAR file is as follows:
-
Since Flink 1.15, it's
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar
. For example, if you install Flink 1.15 and you want to use Flink connector 1.2.7, you can useflink-connector-starrocks-1.2.7_flink-1.15.jar
. -
Prior to Flink 1.15, it's
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar
. For example, if you install Flink 1.14 and Scala 2.12 in your environment, and you want to use Flink connector 1.2.7, you can useflink-connector-starrocks-1.2.7_flink-1.14_2.12.jar
.
NOTICE
In general, the latest version of the Flink connector only maintains compatibility with the three most recent versions of Flink.
Download the compiled Jar file
Directly download the corresponding version of the Flink connector Jar file from the Maven Central Repository.
Maven Dependency
In your Maven project's pom.xml
file, add the Flink connector as a dependency according to the following format. Replace flink_version
, scala_version
, and connector_version
with the respective versions.
-
In Flink 1.15 and later
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
In versions earlier than Flink 1.15
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
Compile by yourself
-
Download the Flink connector source code.
-
Execute the following command to compile the source code of Flink connector into a JAR file. Note that
flink_version
is replaced with the corresponding Flink version.sh build.sh <flink_version>
For example, if the Flink version in your environment is 1.15, you need to execute the following command:
sh build.sh 1.15
-
Go to the
target/
directory to find the Flink connector JAR file, such asflink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar
, generated upon compilation.
NOTE
The name of Flink connector which is not formally released contains the
SNAPSHOT
suffix.
Options
connector
Required: Yes
Default value: NONE
Description: The connector that you want to use. The value must be "starrocks".
jdbc-url
Required: Yes
Default value: NONE
Description: The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>
.
load-url
Required: Yes
Default value: NONE
Description: The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>
.
database-name
Required: Yes
Default value: NONE
Description: The name of the StarRocks database into which you want to load data.
table-name
Required: Yes
Default value: NONE
Description: The name of the table that you want to use to load data into StarRocks.
username
Required: Yes
Default value: NONE
Description: The username of the account that you want to use to load data into StarRocks. The account needs SELECT and INSERT privileges on the target StarRocks table.
password
Required: Yes
Default value: NONE
Description: The password of the preceding account.
sink.version
Required: No
Default value: AUTO
Description: The interface used to load data. This parameter is supported from Flink connector version 1.2.4 onwards.
V1
: Use Stream Load interface to load data. Connectors before 1.2.4 only support this mode.V2
: Use Stream Load transaction interface to load data. It requires StarRocks to be at least version 2.4. RecommendsV2
because it optimizes the memory usage and provides a more stable exactly-once implementation.AUTO
: If the version of StarRocks supports transaction Stream Load, will chooseV2
automatically, otherwise chooseV1
sink.label-prefix
Required: No
Default value: NONE
Description: The label prefix used by Stream Load. Recommend to configure it if you are using exactly-once with connector 1.2.8 and later. See exactly-once usage notes.
sink.semantic
Required: No
Default value: at-least-once
Description: The semantic guaranteed by sink. Valid values: at-least-once and exactly-once.
sink.buffer-flush.max-bytes
Required: No
Default value: 94371840(90M)
Description: The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The maximum value ranges from 64 MB to 10 GB. Setting this parameter to a larger value can improve loading performance but may increase loading latency. This parameter only takes effect when sink.semantic
is set to at-least-once
. If sink.semantic
is set to exactly-once
, the data in memory is flushed when a Flink checkpoint is triggered. In this circumstance, this parameter does not take effect.
sink.buffer-flush.max-rows
Required: No
Default value: 500000
Description: The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time. This parameter is available only when sink.version
is V1
and sink.semantic
is at-least-once
. Valid values: 64000 to 5000000.
sink.buffer-flush.interval-ms
Required: No
Default value: 300000
Description: The interval at which data is flushed. This parameter is available only when sink.semantic
is at-least-once
. Valid values: 1000 to 3600000. Unit: ms.
sink.max-retries
Required: No
Default value: 3
Description: The number of times that the system retries to perform the Stream Load job. This parameter is available only when you set sink.version
to V1
. Valid values: 0 to 10.
sink.connect.timeout-ms
Required: No
Default value: 30000
Description: The timeout for establishing HTTP connection. Valid values: 100 to 60000. Unit: ms. Before Flink connector v1.2.9, the default value is 1000
.
sink.socket.timeout-ms
Required: No
Default value: -1
Description: Supported since 1.2.10. The time duration for which the HTTP client waits for data. Unit: ms. The default value -1
means there is no timeout.
sink.wait-for-continue.timeout-ms
Required: No
Default value: 10000
Description: Supported since 1.2.7. The timeout for waiting response of HTTP 100-continue from the FE. Valid values: 3000
to 60000
. Unit: ms
sink.ignore.update-before
Required: No
Default value: true
Description: Supported since version 1.2.8. Whether to ignore UPDATE_BEFORE
records from Flink when loading data to Primary Key tables. If this parameter is set to false, the record is treated as a delete operation to StarRocks table.
sink.parallelism
Required: No
Default value: NONE
Description: The parallelism of loading. Only available for Flink SQL. If this parameter is not specified, Flink planner decides the parallelism. In the scenario of multi-parallelism, users need to guarantee data is written in the correct order.
sink.properties.*
Required: No
Default value: NONE
Description: The parameters that are used to control Stream Load behavior. For example, the parameter sink.properties.format
specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see STREAM LOAD.
sink.properties.format
Required: No
Default value: csv
Description: The format used for Stream Load. The Flink connector will transform each batch of data to the format before sending them to StarRocks. Valid values: csv
and json
.
sink.properties.column_separator
Required: No
Default value: \t
Description: The column separator for CSV-formatted data.
sink.properties.row_delimiter
Required: No
Default value: \n
Description: The row delimiter for CSV-formatted data.
sink.properties.max_filter_ratio
Required: No
Default value: 0
Description: The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: 0
to 1
. Default value: 0
. See Stream Load for details.
sink.properties.partial_update
Required: NO
Default value: FALSE
Description: Whether to use partial updates. Valid values: TRUE
and FALSE
. Default value: FALSE
, indicating to disable this feature.