CREATE ROUTINE LOAD
Routine Load can continuously consume messages from Apache Kafka® and load data into StarRocks. Routine Load can consume CSV, JSON, and Avro (supported since v3.0.1) data from a Kafka cluster and access Kafka via multiple security protocols, including plaintext
, ssl
, sasl_plaintext
, and sasl_ssl
.
This topic describes the syntax, parameters, and examples of the CREATE ROUTINE LOAD statement.
NOTE
- For information about the application scenarios, principles, and basic operations of Routine Load, see Load data using Routine Load.
- You can load data into StarRocks tables only as a user who has the INSERT privilege on those StarRocks tables. If you do not have the INSERT privilege, follow the instructions provided in GRANT to grant the INSERT privilege to the user that you use to connect to your StarRocks cluster.
Syntax
CREATE ROUTINE LOAD <database_name>.<job_name> ON <table_name>
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
Parameters
database_name
, job_name
, table_name
database_name
Optional. The name of the StarRocks database.
job_name
Required. The name of the Routine Load job. A table may receive data from multiple Routine Load jobs. We recommend that you set a meaningful Routine Load job name by using identifiable information, for example, Kafka topic name and the approximate job creation time, to distinguish multiple Routine Load jobs. The name of the Routine Load job must be unique within the same database.
table_name
Required. The name of the StarRocks table to which data is loaded.
load_properties
Optional. The properties of the data. Syntax:
[COLUMNS TERMINATED BY '<column_separator>'],
[ROWS TERMINATED BY '<row_separator>'],
[COLUMNS (<column1_name>[, <column2_name>, <column_assignment>, ... ])],
[WHERE <expr>],
[PARTITION (<partition1_name>[, <partition2_name>, ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name>, ...])]
COLUMNS TERMINATED BY
The column separator for CSV-formatted data. The default column separator is \t
(Tab). For example, you can use COLUMNS TERMINATED BY ","
to specify the column separator as a comma.
Note
- Ensure that the column separator specified here is the same as the column separator in the data to be ingested.
- You can use a UTF-8 string, such as a comma (,), tab, or pipe (|), whose length does not exceed 50 bytes as a text delimiter.
- Null values are denoted by using
\N
. For example, a data record consists of three columns, and the data record holds data in the first and third columns but does not hold data in the second column. In this situation, you need to use\N
in the second column to denote a null value. This means the record must be compiled asa,\N,b
instead ofa,,b
.a,,b
denotes that the second column of the record holds an empty string.
ROWS TERMINATED BY
The row separator for CSV-formatted data. The default row separator is \n
.
COLUMNS
The mapping between the columns in the source data and the columns in the StarRocks table. For more information, see Column mapping in this topic.
column_name
: If a column of the source data can be mapped to a column of the StarRocks table without any computation, you only need to specify the column name. These columns can be referred to as mapped columns.column_assignment
: If a column of the source data cannot be directly mapped to a column of the StarRocks table, and the column's values must be computed by using functions before data loading, you must specify the computation function inexpr
. These columns can be referred to as derived columns. It is recommended to place derived columns after mapped columns because StarRocks first parses mapped columns.
WHERE
The filter condition. Only data that meets the filter condition can be loaded into StarRocks. For example, if you only want to ingest rows whose col1
value is greater than 100
and col2
value is equal to 1000
, you can use WHERE col1 > 100 and col2 = 1000
.
NOTE
The columns specified in the filter condition can be source columns or derived columns.
PARTITION
If a StarRocks table is distributed on partitions p0, p1, p2 and p3, and you want to load the data only to p1, p2, and p3 in StarRocks and filter out the data that will be stored in p0, then you can specify PARTITION(p1, p2, p3)
as a filter condition. By default, if you do not specify this parameter, the data will be loaded into all the partitions. Example:
PARTITION (p1, p2, p3)
TEMPORARY PARTITION
The name of the temporary partition into which you want to load data. You can specify multiple temporary partitions, which must be separated by commas (,).
job_properties
Required. The properties of the load job. Syntax:
PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
Property | Required | Description |
---|---|---|
desired_concurrent_number | No | The expected task parallelism of a single Routine Load job. Default value: 3 . The actual task parallelism is determined by the minimum value of the multiple parameters: min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num) .
|
max_batch_interval | No | The scheduling interval for a task, that is, how often a task is executed. Unit: seconds. Value range: 5 ~ 60 . Default value: 10 . It is recommended to set a value larger than 10 . If the scheduling is shorter than 10 seconds, too many tablet versions are generated due to an excessively high loading frequency. |
max_batch_rows | No | This property is only used to define the window of error detection. The window is the number of rows of data consumed by a single Routine Load task. The value is 10 * max_batch_rows . The default value is 10 * 200000 = 2000000 . The Routine Load task detects error data in the error detection window. Error data refers to data that StarRocks cannot parse, such as invalid JSON-formatted data. |
max_error_number | No | The maximum number of error data rows allowed within an error detection window. If the number of error data rows exceeds this value, the load job will pause. You can execute SHOW ROUTINE LOAD and view the error logs by using ErrorLogUrls . After that, you can correct the error in Kafka according to the error logs. The default value is 0 , which means error rows are not allowed. NOTE
|
max_filter_ratio | No | The maximum error tolerance of the load job. The error tolerance is the maximum percentage of data records that can be filtered out due to inadequate data quality in all data records requested by the load job. Valid values: 0 to 1 . Default value: 1 (That means it won't actually take effect).We recommend that you set it to 0 . This way, if unqualified data records are detected, the load job pauses, thereby ensuring data correctness.If you want to ignore unqualified data records, you can set this parameter to a value greater than 0 . This way, the load job can succeed even if the data file contains unqualified data records. NOTE
|
strict_mode | No | Specifies whether to enable the strict mode. Valid values: true and false . Default value: false . When the strict mode is enabled, if the value for a column in the loaded data is NULL but the target table does not allow a NULL value for this column, the data row will be filtered out. |
log_rejected_record_num | No | Specifies the maximum number of unqualified data rows that can be logged. This parameter is supported from v3.1 onwards. Valid values: 0 , -1 , and any non-zero positive integer. Default value: 0 .
|
timezone | No | The time zone used by the load job. Default value: Asia/Shanghai . The value of this parameter affects the results returned by functions such as strftime(), alignment_timestamp(), and from_unixtime(). The time zone specified by this parameter is a session-level time zone. For more information, see Configure a time zone. |
merge_condition | No | Specifies the name of the column you want to use as the condition to determine whether to update data. Data will be updated only when the value of the data to be loaded into this column is greater than or equal to the current value of this column. NOTE Only Primary Key tables support conditional updates. The column that you specify cannot be a primary key column. |
format | No | The format of the data to be loaded. Valid values: CSV , JSON , and Avro (supported since v3.0.1). Default value: CSV . |
trim_space | No | Specifies whether to remove spaces preceding and following column separators from the data file when the data file is in CSV format. Type: BOOLEAN. Default value: false .For some databases, spaces are added to column separators when you export data as a CSV-formatted data file. Such spaces are called leading spaces or trailing spaces depending on their locations. By setting the trim_space parameter, you can enable StarRocks to remove such unnecessary spaces during data loading.Note that StarRocks does not remove the spaces (including leading spaces and trailing spaces) within a field wrapped in a pair of enclose -specified characters. For example, the following field values use pipe (| ) as the column separator and double quotation marks (" ) as the enclose -specified character: | "Love StarRocks" | . If you set trim_space to true , StarRocks processes the preceding field values as |"Love StarRocks"| . |
enclose | No | Specifies the character that is used to wrap the field values in the data file according to RFC4180 when the data file is in CSV format. Type: single-byte character. Default value: NONE . The most prevalent characters are single quotation mark (' ) and double quotation mark (" ).All special characters (including row separators and column separators) wrapped by using the enclose -specified character are considered normal symbols. StarRocks can do more than RFC4180 as it allows you to specify any single-byte character as the enclose -specified character.If a field value contains an enclose -specified character, you can use the same character to escape that enclose -specified character. For example, you set enclose to " , and a field value is a "quoted" c . In this case, you can enter the field value as "a ""quoted"" c" into the data file. |
escape | No | Specifies the character that is used to escape various special characters, such as row separators, column separators, escape characters, and enclose -specified characters, which are then considered by StarRocks to be common characters and are parsed as part of the field values in which they reside. Type: single-byte character. Default value: NONE . The most prevalent character is slash (\ ), which must be written as double slashes (\\ ) in SQL statements.NOTE The character specified by escape is applied to both inside and outside of each pair of enclose -specified characters.Two examples are as follows:
|
strip_outer_array | No | Specifies whether to strip the outermost array structure of the JSON-formatted data. Valid values: true and false . Default value: false . In real-world business scenarios, JSON-formatted data may have an outermost array structure as indicated by a pair of square brackets [] . In this situation, we recommend that you set this parameter to true , so StarRocks removes the outermost square brackets [] and loads each inner array as a separate data record. If you set this parameter to false , StarRocks parses the entire JSON-formatted data into one array and loads the array as a single data record. Use the JSON-formatted data [{"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] as an example. If you set this parameter to true , {"category" : 1, "author" : 2} and {"category" : 3, "author" : 4} are parsed as two separate data records and are loaded into two StarRocks data rows. |
jsonpaths | No | The names of the fields that you want to load from JSON-formatted data. The value of this parameter is a valid JsonPath expression. For more information, see StarRocks table contains derived columns whose values are generated by using expressions) in this topic. |
json_root | No | The root element of the JSON-formatted data to load. StarRocks extracts the elements of the root node through json_root for parsing. By default, the value of this parameter is empty, indicating that all JSON-formatted data will be loaded. For more information, see Specify the root element of the JSON-formatted data to be loaded in this topic. |
task_consume_second | No | The maximum time for each Routine Load task within the specified Routine Load job to consume data. Unit: second. Unlike the FE dynamic parameters routine_load_task_consume_second (which applies to all Routine Load jobs within the cluster), this parameter is specific to an individual Routine Load job, which is more flexible. This parameter is supported since v3.1.0.
|
task_timeout_second | No | The timeout duration for each Routine Load task within the specified Routine Load job. Unit: second. Unlike the FE dynamic parameter routine_load_task_timeout_second (which applies to all Routine Load jobs within the cluster), this parameter is specific to an individual Routine Load job, which is more flexible. This parameter is supported since v3.1.0.
|
data_source
, data_source_properties
Required. The data source and relevant properties.
FROM <data_source>
("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
data_source
Required. The source of the data you want to load. Valid value: KAFKA
.
data_source_properties
The properties of the data source.
Property | Required | Description |
---|---|---|
kafka_broker_list | Yes | Kafka's broker connection information. The format is <kafka_broker_ip>:<broker_ port> . Multiple brokers are separated by commas (,). The default port used by Kafka brokers is 9092 . Example:"kafka_broker_list" = ""xxx.xx.xxx.xx:9092,xxx.xx.xxx.xx:9092" . |
kafka_topic | Yes | The Kafka topic to be consumed. A Routine Load job can only consume messages from one topic. |
kafka_partitions | No | The Kafka partitions to be consumed, for example, "kafka_partitions" = "0, 1, 2, 3" . If this property is not specified, all partitions are consumed by default. |
kafka_offsets | No | The starting offset from which to consume data in a Kafka partition as specified in kafka_partitions . If this property is not specified, the Routine Load job consumes data starting from the latest offsets in kafka_partitions . Valid values:
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000" . |
property.kafka_default_offsets | No | The default starting offset for all consumer partitions. The supported values for this property are same as those for the kafka_offsets property. |
confluent.schema.registry.url | No | The URL of the Schema Registry where the Avro schema is registered. StarRocks retrieves the Avro schema by using this URL. The format is as follows:confluent.schema.registry.url = http[s]://[<schema-registry-api-key>:<schema-registry-api-secret>@]<hostname or ip address>[:<port>] |
More data source-related properties
You can specify additional data source (Kafka) related properties, which are equivalent to using the Kafka command line --property
. For more supported properties, see the properties for a Kafka consumer client in librdkafka configuration properties.
NOTE
If the value of a property is a file name, add the keyword
FILE:
preceding the file name. For information about how to create a file, see CREATE FILE.
- Specify the default initial offset for all the partitions to be consumed
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
- Specify the ID of the consumer group used by the Routine Load job
"property.group.id" = "group_id_0"
If property.group.id
is not specified, StarRocks generates a random value based on the name of the Routine Load job, in the format of {job_name}_{random uuid}
, such as simple_job_0a64fe25-3983-44b2-a4d8-f52d3af4c3e8
.
-
Specify the security protocol and relevant parameters used by BE to access Kafka
The security protocol can be specified as
plaintext
(default),ssl
,sasl_plaintext
, orsasl_ssl
. And you need to configure the related parameters according to the specified security protocol.When the security protocol is set to
sasl_plaintext
orsasl_ssl
, the following SASL authentication mechanisms are supported:- PLAIN
- SCRAM-SHA-256 and SCRAM-SHA-512
- OAUTHBEARER
- GSSAPI (Kerberos)
Examples:
-
Access Kafka using the SSL security protocol:
"property.security.protocol" = "ssl", -- Specify the security protocol as SSL.
"property.ssl.ca.location" = "FILE:ca-cert", -- File or directory path to CA certificate(s) for verifying the kafka broker's key.
-- If the Kafka server enables client authentication, the following three parameters are also required:
"property.ssl.certificate.location" = "FILE:client.pem", -- Path to the client's public key used for authentication.
"property.ssl.key.location" = "FILE:client.key", -- Path to the client's private key used for authentication.
"property.ssl.key.password" = "xxxxxx" -- Password for the client's private key. -
Access Kafka using the SASL_PLAINTEXT security protocol and SASL/PLAIN authentication mechanism:
"property.security.protocol" = "SASL_PLAINTEXT", -- Specify the security protocol as SASL_PLAINTEXT.
"property.sasl.mechanism" = "PLAIN", -- Specify the SASL mechanism as PLAIN which is a simple username/password authentication mechanism.
"property.sasl.username" = "admin", -- SASL username.
"property.sasl.password" = "xxxxxx" -- SASL password. -
Access Kafka using the SASL_PLAINTEXT security protocol and SASL/GSSAPI (Kerberos) authentication mechanism:
"property.security.protocol" = "SASL_PLAINTEXT", -- Specify the security protocol as SASL_PLAINTEXT.
"property.sasl.mechanism" = "GSSAPI", -- Specify the SASL authentication mechanism as GSSAPI. Default value is GSSAPI.
"property.sasl.kerberos.service.name" = "kafka", -- The broker service name. Default value is kafka.
"property.sasl.kerberos.keytab" = "/home/starrocks/starrocks.keytab", -- The client keytab location.
"property.sasl.kerberos.principal" = "starrocks@YOUR.COM" -- The Kerberos principal.note-
Since StarRocks v3.1.4, SASL/GSSAPI (Kerberos) authentication is supported.
-
SASL related modules need to be installed on the BE machine.
# Debian/Ubuntu:
sudo apt-get install libsasl2-modules-gssapi-mit libsasl2-dev
# CentOS/Redhat:
sudo yum install cyrus-sasl-gssapi cyrus-sasl-devel
-
FE and BE configuration items
For FE and BE configuration items related to Routine Load, see configuration items.
Column mapping
Configure column mapping for loading CSV-formatted data
If the columns of the CSV-formatted data can be mapped one on one in sequence to the columns of the StarRocks table, you do not need to configure the column mapping between the data and the StarRocks table.
If the columns of the CSV-formatted data cannot be mapped one on one in sequence to the columns of the StarRocks table, you need to use the columns
parameter to configure the column mapping between the data file and the StarRocks table. This includes the following two use cases:
-
Same number of columns but different column sequence. Also, the data from the data file does not need to be computed by functions before it is loaded into the matching StarRocks table columns.
-
In the
columns
parameter, you need to specify the names of the StarRocks table columns in the same sequence as how the data file columns are arranged. -
For example, the StarRocks table consists of three columns, which are
col1
,col2
, andcol3
in sequence, and the data file also consists of three columns, which can be mapped to the StarRocks table columnscol3
,col2
, andcol1
in sequence. In this case, you need to specify"columns: col3, col2, col1"
.
-
-
Different number of columns and different column sequence. Also, the data from the data file needs to be computed by functions before it is loaded into the matching StarRocks table columns.
In the
columns
parameter, you need to specify the names of the StarRocks table columns in the same sequence as how the data file columns are arranged and specify the functions you want to use to compute the data. Two examples are as follows:- The StarRocks table consists of three columns, which are
col1
,col2
, andcol3
in sequence. The data file consists of four columns, among which the first three columns can be mapped in sequence to the StarRocks table columnscol1
,col2
, andcol3
and the fourth column cannot be mapped to any of the StarRocks table columns. In this case, you need to temporarily specify a name for the fourth column of the data file, and the temporary name must be different from any of the StarRocks table column names. For example, you can specify"columns: col1, col2, col3, temp"
, in which the fourth column of the data file is temporarily namedtemp
. - The StarRocks table consists of three columns, which are
year
,month
, andday
in sequence. The data file consists of only one column that accommodates date and time values inyyyy-mm-dd hh:mm:ss
format. In this case, you can specify"columns: col, year = year(col), month=month(col), day=day(col)"
, in whichcol
is the temporary name of the data file column and the functionsyear = year(col)
,month=month(col)
, andday=day(col)
are used to extract data from the data file columncol
and loads the data into the mapping StarRocks table columns. For example,year = year(col)
is used to extract theyyyy
data from the data file columncol
and loads the data into the StarRocks table columnyear
.
- The StarRocks table consists of three columns, which are
For more examples, see Configure column mapping.
Configure column mapping for loading JSON-formatted or Avro-formatted data
NOTE
Since v3.0.1, StarRocks supports loading Avro data by using Routine Load. When you load JSON or Avro data, the configuration for column mapping and transformation is the same. Therefore, in this section, JSON data is used as an example to introduce the configuration.
If the keys of the JSON-formatted data have the same names as the columns of the StarRocks table, you can load the JSON-formatted data by using the simple mode. In simple mode, you do not need to specify the jsonpaths
parameter. This mode requires that the JSON-formatted data must be an object as indicated by curly brackets {}
, such as {"category": 1, "author": 2, "price": "3"}
. In this example, category
, author
, and price
are key names, and these keys can be mapped one on one by name to the columns category
, author
, and price
of the StarRocks table. For examples, please see simple mode.
If the keys of the JSON-formatted data have different names than the columns of the StarRocks table, you can load the JSON-formatted data by using the matched mode. In matched mode, you need to use the jsonpaths
and COLUMNS
parameters to specify the column mapping between the JSON-formatted data and the StarRocks table:
- In the
jsonpaths
parameter, specify the JSON keys in the sequence as how they are arranged in the JSON-formatted data. - In the
COLUMNS
parameter, specify the mapping between the JSON keys and the StarRocks table columns:- The column names specified in the
COLUMNS
parameter are mapped one on one in sequence to the JSON-formatted data. - The column names specified in the
COLUMNS
parameter are mapped one on one by name to the StarRocks table columns.
- The column names specified in the
For examples, see StarRocks table contains derived columns whose values are generated by using expressions.
Examples
Load CSV-formatted data
This section uses CSV-formatted data as an example to describe how you can employ various parameter settings and combinations to meet your diverse loading requirements.
Prepare a dataset
Suppose you want to load CSV-formatted data from a Kafka topic named ordertest1
.Every message in the dataset includes six columns: order ID, payment date, customer name, nationality, gender, and price.
2020050802,2020-05-08,Johann Georg Faust,Deutschland,male,895
2020050802,2020-05-08,Julien Sorel,France,male,893
2020050803,2020-05-08,Dorian Grey,UK,male,1262
2020050901,2020-05-09,Anna Karenina,Russia,female,175
2020051001,2020-05-10,Tess Durbeyfield,US,female,986
2020051101,2020-05-11,Edogawa Conan,japan,male,8924
Create a table
According to the columns of CSV-formatted data, create a table named example_tbl1
in the database example_db
.
CREATE TABLE example_db.example_tbl1 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`gender` varchar(26) NULL COMMENT "Gender",
`price` double NULL COMMENT "Price")
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(`order_id`);
Consume data starting from specified offsets for specified partitions
If the Routine Load job needs to consume data starting from specified partitions and offsets, you need to configure the parameters kafka_partitions
and kafka_offsets
.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1",
"kafka_partitions" ="0,1,2,3,4", -- partitions to be consumed
"kafka_offsets" = "1000, OFFSET_BEGINNING, OFFSET_END, 2000" -- corresponding initial offsets
);
Improve loading performance by increasing task parallelism
To improve loading performance and avoid accumulative consumption, you can increase task parallelism by increasing the desired_concurrent_number
value when you create the Routine Load job. Task parallelism allows splitting one Routine Load job into as many parallel tasks as possible.
Note that the actual task parallelism is determined by the minimum value among the following multiple parameters:
min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
Note
The maximum actual task parallelism is either the number of alive BE nodes or the number of partitions to be consumed.
Therefore, when the number of alive BE nodes and the number of partitions to be consumed are greater than the values of the other two parameters max_routine_load_task_concurrent_num
and desired_concurrent_number
, you can increase the values of the other two parameters to increase the actual task parallelism.
Assume that the number of partitions to be consumed is 7, the number of alive BE nodes is 5, and max_routine_load_task_concurrent_num
is the default value 5
. If you want to increase the actual task parallelism, you can set desired_concurrent_number
to 5
(the default value is 3
). In this case, the actual task parallelism min(5,7,5,5)
is configured at 5
.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"desired_concurrent_number" = "5" -- set the value of desired_concurrent_number to 5
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Configure column mapping
If the sequence of columns in the CSV-formatted data is inconsistent with the columns in the target table, assuming that the fifth column in the CSV-formatted data does not need to be imported to the target table, you need to specify the column mapping between the CSV-formatted data and the target table through the COLUMNS
parameter.
Target database and table
Create the target table example_tbl2
in the target database example_db
according to the columns in the CSV-formatted data. In this scenario, you need to create five columns corresponding to the five columns in the CSV-formatted data, except for the fifth column that stores gender.
CREATE TABLE example_db.example_tbl2 (
`order_id` bigint NOT NULL COMMENT "Order ID",
`pay_dt` date NOT NULL COMMENT "Payment date",
`customer_name` varchar(26) NULL COMMENT "Customer name",
`nationality` varchar(26) NULL COMMENT "Nationality",
`price` double NULL COMMENT "Price"
)
DUPLICATE KEY (order_id,pay_dt)
DISTRIBUTED BY HASH(order_id);
Routine Load job
In this example, since the fifth column in the CSV-formatted data does not need to be loaded to the target table, the fifth column is temporarily named temp_gender
in COLUMNS
, and the other columns are directly mapped to the table example_tbl2
.
CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Set filter conditions
If you want to load only data that meets certain conditions, you can set filter conditions in the WHERE
clause, for example, price > 100.
CREATE ROUTINE LOAD example_db.example_tbl2_ordertest1 ON example_tbl2
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price),
WHERE price > 100 -- set the filter condition
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Enable strict mode to filter out rows with NULL values
In PROPERTIES
, you can set "strict_mode" = "true"
, which means that the Routine Load job is in strict mode. If there is a NULL
value in a source column, but the destination StarRocks table column does not allow NULL values, the row that holds a NULL value in the source column is filtered out.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"strict_mode" = "true" -- enable the strict mode
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Set error tolerance
If your business scenario has low tolerance for unqualified data, you need to set the error detection window and the maximum number of error data rows by configuring the parameters max_batch_rows
and max_error_number
. When the number of error data rows within an error detection window exceeds the value of max_error_number
, the Routine Load job pauses.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"max_batch_rows" = "100000",-- The value of max_batch_rows multiplied by 10 equals the error detection window.
"max_error_number" = "100" -- The maximum number of error data rows allowed within an error detection window.
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Specify the security protocol as SSL and configure the related parameters
If you need to specify the security protocol as SSL used by BE to access Kafka, you need to configure "property.security.protocol" = "ssl"
and related parameters.
CREATE ROUTINE LOAD example_db.example_tbl1_ordertest1 ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
-- Specify the security protocol as SSL.
"property.security.protocol" = "ssl",
-- The location of the CA certificate.
"property.ssl.ca.location" = "FILE:ca-cert",
-- If authentication is enabled for Kafka clients, you need to configure the following properties:
-- The location of the Kafka client's public key.
"property.ssl.certificate.location" = "FILE:client.pem",
-- The location of the Kafka client's private key.
"property.ssl.key.location" = "FILE:client.key",
-- The password to the Kafka client's private key.
"property.ssl.key.password" = "abcdefg"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest1"
);
Setting trim_space, enclose, and escape
Suppose you want to load CSV-formatted data from a Kafka topic named test_csv
. Every message in the dataset includes six columns: order ID, payment date, customer name, nationality, gender, and price.
"2020050802" , "2020-05-08" , "Johann Georg Faust" , "Deutschland" , "male" , "895"
"2020050802" , "2020-05-08" , "Julien Sorel" , "France" , "male" , "893"
"2020050803" , "2020-05-08" , "Dorian Grey\,Lord Henry" , "UK" , "male" , "1262"
"2020050901" , "2020-05-09" , "Anna Karenina" , "Russia" , "female" , "175"
"2020051001" , "2020-05-10" , "Tess Durbeyfield" , "US" , "female" , "986"
"2020051101" , "2020-05-11" , "Edogawa Conan" , "japan" , "male" , "8924"
If you want to load all data from the Kafka topic test_csv
into example_tbl1
, with the intention of removing the spaces preceding and following column separators and setting enclose
to "
and escape
to \
, run the following command:
CREATE ROUTINE LOAD example_db.example_tbl1_test_csv ON example_tbl1
COLUMNS TERMINATED BY ",",
COLUMNS (order_id, pay_dt, customer_name, nationality, gender, price)
PROPERTIES
(
"trim_space"="true",
"enclose"="\"",
"escape"="\\",
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic"="test_csv",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);
Load JSON-formatted data
StarRocks table column names consistent with JSON key names
Prepare a dataset
For example, the following JSON-formatted data exists in the Kafka topic ordertest2
.
{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}
Note Each JSON object must be in one Kafka message. Otherwise, an error that indicates a failure in parsing JSON-formatted data occurs.
Target database and table
Create table example_tbl3
in the target database example_db
in the StarRocks cluster. The column names are consistent with the keys names in the JSON-formatted data.
CREATE TABLE example_db.example_tbl3 (
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL COMMENT "Price")
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
DISTRIBUTED BY HASH(commodity_id);
Routine Load job
You can use the simple mode for the Routine Load job. That is, you do not need to specify jsonpaths
and COLUMNS
parameters when creating the Routine Load job. StarRocks extracts the keys of JSON-formatted data in the topic ordertest2
of the Kafka cluster according to the column names of the target table example_tbl3
and loads the JSON-formatted data into the target table.
CREATE ROUTINE LOAD example_db.example_tbl3_ordertest2 ON example_tbl3
PROPERTIES
(
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
Note
- If the outermost layer of the JSON-formatted data is an array structure, you need to set
"strip_outer_array"="true"
inPROPERTIES
to strip the outermost array structure. Additionally, when you need to specifyjsonpaths
, the root element of the entire JSON-formatted data is the flattened JSON object because the outermost array structure of the JSON-formatted data is stripped.- You can use
json_root
to specify the root element of the JSON-formatted data.
StarRocks table contains derived columns whose values are generated by using expressions
Prepare a dataset
For example, the following JSON-formatted data exists in the topic ordertest2
of the Kafka cluster.
{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875}
{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895}
{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}
Target database and table
Create a table named example_tbl4
in the database example_db
in the StarRocks cluster. The column pay_dt
is a derived column whose values are generated by computing values of the key pay_time
in the JSON-formatted data.
CREATE TABLE example_db.example_tbl4 (
`commodity_id` varchar(26) NULL,
`customer_name` varchar(26) NULL,
`country` varchar(26) NULL,
`pay_time` bigint(20) NULL,
`pay_dt` date NULL,
`price` double SUM NULL)
AGGREGATE KEY(`commodity_id`,`customer_name`,`country`,`pay_time`,`pay_dt`)
DISTRIBUTED BY HASH(`commodity_id`);
Routine Load job
You can use the matched mode for the Routine Load job. That is, you need to specify jsonpaths
and COLUMNS
parameters when creating the Routine Load job.
You need to specify the keys of the JSON-formatted data and arrange them in sequence in the jsonpaths
parameter.
And since the values in the key pay_time
of the JSON-formatted data need to be converted to the DATE type before the values are stored in the pay_dt
column of the example_tbl4
table, you need to specify the computation by using pay_dt=from_unixtime(pay_time,'%Y%m%d')
in COLUMNS
. The values of other keys in the JSON-formatted data can be directly mapped to the example_tbl4
table.
CREATE ROUTINE LOAD example_db.example_tbl4_ordertest2 ON example_tbl4
COLUMNS(commodity_id, customer_name, country, pay_time, pay_dt=from_unixtime(pay_time, '%Y%m%d'), price)
PROPERTIES
(
"format" = "json",
"jsonpaths" = "[\"$.commodity_id\",\"$.customer_name\",\"$.country\",\"$.pay_time\",\"$.price\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
Note
- If the outermost layer of the JSON data is an array structure, you need to set
"strip_outer_array"="true"
in thePROPERTIES
to strip the outermost array structure. Additionally, when you need to specifyjsonpaths
, the root element of the entire JSON data is the flattened JSON object because the outermost array structure of the JSON data is stripped.- You can use
json_root
to specify the root element of the JSON-formatted data.
StarRocks table contains derived column whose values are generated by using CASE expression
Prepare a dataset
For example, the following JSON-formatted data exists in the Kafka topic topic-expr-test
.
{"key1":1, "key2": 21}
{"key1":12, "key2": 22}
{"key1":13, "key2": 23}
{"key1":14, "key2": 24}
Target database and table
Create a table named tbl_expr_test
in the database example_db
in the StarRocks cluster. The target table tbl_expr_test
contains two columns, where the values of the col2
column need to be computed by using case expression on JSON data.
CREATE TABLE tbl_expr_test (
col1 string, col2 string)
DISTRIBUTED BY HASH (col1);
Routine Load job
Because the values in the col2
column in the target table are generated using a CASE expression, you need to specify the corresponding expression in the COLUMNS
parameter for the Routine load job.
CREATE ROUTINE LOAD rl_expr_test ON tbl_expr_test
COLUMNS (
key1,
key2,
col1 = key1,
col2 = CASE WHEN key1 = "1" THEN "key1=1"
WHEN key1 = "12" THEN "key1=12"
ELSE "nothing" END)
PROPERTIES ("format" = "json")
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "topic-expr-test"
);
Query StarRocks table
Query the StarRocks table. The result shows that the values in the col2
column are the output of the CASE expression.
MySQL [example_db]> SELECT * FROM tbl_expr_test;
+------+---------+
| col1 | col2 |
+------+---------+
| 1 | key1=1 |
| 12 | key1=12 |
| 13 | nothing |
| 14 | nothing |
+------+---------+
4 rows in set (0.015 sec)
Specify the root element of the JSON-formatted data to be loaded
You need to use json_root
to specify the root element of the JSON-formatted data to be loaded and the value must be a valid JsonPath expression.
Prepare a dataset
For example, the following JSON-formatted data exists in the topic ordertest3
of the Kafka cluster. And the root element of the JSON-formatted data to be loaded is $.RECORDS
.
{"RECORDS":[{"commodity_id": "1", "customer_name": "Mark Twain", "country": "US","pay_time": 1589191487,"price": 875},{"commodity_id": "2", "customer_name": "Oscar Wilde", "country": "UK","pay_time": 1589191487,"price": 895},{"commodity_id": "3", "customer_name": "Antoine de Saint-Exupéry","country": "France","pay_time": 1589191487,"price": 895}]}
Target database and table
Create a table named example_tbl3
in the database example_db
in the StarRocks cluster.
CREATE TABLE example_db.example_tbl3 (
commodity_id varchar(26) NULL,
customer_name varchar(26) NULL,
country varchar(26) NULL,
pay_time bigint(20) NULL,
price double SUM NULL)
AGGREGATE KEY(commodity_id,customer_name,country,pay_time)
ENGINE=OLAP
DISTRIBUTED BY HASH(commodity_id);
Routine Load job
You can set "json_root" = "$.RECORDS"
in PROPERTIES
to specify the root element of the JSON-formatted data to be loaded. Also, since the JSON-formatted data to be loaded is in an array structure, you must also set "strip_outer_array" = "true"
to strip the outermost array structure.
CREATE ROUTINE LOAD example_db.example_tbl3_ordertest3 ON example_tbl3
PROPERTIES
(
"format" = "json",
"json_root" = "$.RECORDS",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>",
"kafka_topic" = "ordertest2"
);
Load Avro-formatted data
Since v3.0.1, StarRocks supports loading Avro data by using Routine Load.
Avro schema is simple
Suppose the Avro schema is relatively simple, and you need to load all fields of the Avro data.
Prepare a dataset
-
Avro schema
-
Create the following Avro schema file
avro_schema1.avsc
:{
"type": "record",
"name": "sensor_log",
"fields" : [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "checked", "type" : "boolean"},
{"name": "data", "type": "double"},
{"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}}
]
} -
Register the Avro schema in the Schema Registry.
-
-
Avro data
Prepare the Avro data and send it to the Kafka topic topic_1
.
Target database and table
According to the fields of Avro data, create a table sensor_log1
in the target database sensor
in the StarRocks cluster. The column names of the table must match the field names in the Avro data. For the data types mapping when Avro data is loaded into StarRocks, see [Data types mapping](#Data types mapping).
CREATE TABLE sensor.sensor_log1 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`data` double NULL COMMENT "sensor data",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);
Routine Load job
You can use the simple mode for the Routine Load job. That is, you do not need to specify the parameter jsonpaths
when creating the Routine Load job. Execute the following statement to submit a Routine Load job named sensor_log_load_job1
to consume the Avro messages in the Kafka topic topic_1
and load the data into the table sensor_log1
in the database sensor
.
CREATE ROUTINE LOAD sensor.sensor_log_load_job1 ON sensor_log1
PROPERTIES
(
"format" = "avro"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic"= "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
Avro schema contains a nested record-type field
Suppose the Avro schema contains a nested record-type field, and you need to load the subfield in a nested record-type field into StarRocks.
Prepare a dataset
-
Avro schema
-
Create the following Avro schema file
avro_schema2.avsc
. The outer Avro record includes five fields which areid
,name
,checked
,sensor_type
, anddata
in sequence. And the fielddata
has a nested recorddata_record
.{
"type": "record",
"name": "sensor_log",
"fields" : [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "checked", "type" : "boolean"},
{"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}},
{"name": "data", "type":
{
"type": "record",
"name": "data_record",
"fields" : [
{"name": "data_x", "type" : "boolean"},
{"name": "data_y", "type": "long"}
]
}
}
]
} -
Register the Avro schema in the Schema Registry.
-
-
Avro data
Prepare the Avro data and send it to the Kafka topic topic_2
.
Target database and table
According to the fields of Avro data, create a table sensor_log2
in the target database sensor
in the StarRocks cluster.
Suppose that in addition to loading the fields id
, name
, checked
, and sensor_type
of the outer Record, you also need to load the subfield data_y
in the nested Record data_record
.
CREATE TABLE sensor.sensor_log2 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type",
`data_y` long NULL COMMENT "sensor data"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);
Routine Load job
Submit the load job, use jsonpaths
to specify the fields of the Avro data that need to be loaded. Note that for the subfield data_y
in the nested Record, you need to specify its jsonpath
as "$.data.data_y"
.
CREATE ROUTINE LOAD sensor.sensor_log_load_job2 ON sensor_log2
PROPERTIES
(
"format" = "avro",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.checked\",\"$.sensor_type\",\"$.data.data_y\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
Avro schema contains a Union field
Prepare a dataset
Suppose the Avro schema contains a Union field, and you need to load the Union field into StarRocks.
-
Avro schema
-
Create the following Avro schema file
avro_schema3.avsc
. The outer Avro record includes five fields which areid
,name
,checked
,sensor_type
, anddata
in sequence. And the fielddata
is of Union type and includes two elements,null
and a nested recorddata_record
.{
"type": "record",
"name": "sensor_log",
"fields" : [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "checked", "type" : "boolean"},
{"name": "sensor_type", "type": {"type": "enum", "name": "sensor_type_enum", "symbols" : ["TEMPERATURE", "HUMIDITY", "AIR-PRESSURE"]}},
{"name": "data", "type": [null,
{
"type": "record",
"name": "data_record",
"fields" : [
{"name": "data_x", "type" : "boolean"},
{"name": "data_y", "type": "long"}
]
}
]
}
]
} -
Register the Avro schema in the Schema Registry.
-
-
Avro data
Prepare the Avro data and send it to the Kafka topic topic_3
.
Target database and table
According to the fields of Avro data, create a table sensor_log3
in the target database sensor
in the StarRocks cluster.
Suppose that in addition to loading the fields id
, name
, checked
, and sensor_type
of the outer Record, you also need to load the field data_y
of the element data_record
in the Union type field data
.
CREATE TABLE sensor.sensor_log3 (
`id` bigint NOT NULL COMMENT "sensor id",
`name` varchar(26) NOT NULL COMMENT "sensor name",
`checked` boolean NOT NULL COMMENT "checked",
`sensor_type` varchar(26) NOT NULL COMMENT "sensor type",
`data_y` long NULL COMMENT "sensor data"
)
ENGINE=OLAP
DUPLICATE KEY (id)
DISTRIBUTED BY HASH(`id`);
Routine Load job
Submit the load job, use jsonpaths
to specify the fields that need to be loaded in the Avro data. Note that for the field data_y
, you need to specify its jsonpath
as "$.data.data_y"
.
CREATE ROUTINE LOAD sensor.sensor_log_load_job3 ON sensor_log3
PROPERTIES
(
"format" = "avro",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.checked\",\"$.sensor_type\",\"$.data.data_y\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker1_ip>:<kafka_broker1_port>,<kafka_broker2_ip>:<kafka_broker2_port>,...",
"confluent.schema.registry.url" = "http://172.xx.xxx.xxx:8081",
"kafka_topic" = "topic_1",
"kafka_partitions" = "0,1,2,3,4,5",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
When the value for the Union type field data
is null
, the value loaded into the column data_y
in the StarRocks table is null
. When the value for the Union type field data
is a data record, the value loaded into the column data_y
is of Long type.