使用 DataX 导入
本文介绍如何利用 DataX 基于 StarRocks 开发的 StarRocks Writer 插件将 MySQL、Oracle 等数据库中的数据导入至 StarRocks。该插件将数据转化为 CSV 或 JSON 格式并将其通过 Stream Load 方式批量导入至 StarRocks。
DataX 导入支持多种数据源,您可以参考 DataX - Support Data Channels 了解详情。
前提条件
使用 DataX 导入数据前,请确保已安装以下依赖:
安装 StarRocks Writer
下载 StarRocks Writer 安装包,然后运行如下命令将安装包解压至 datax/plugin/writer 路径下。
tar -xzvf starrockswriter.tar.gz
您也可以编译 StarRocks Writer 源码。
创建配置文件
为导入作业创建 JSON 格式配置文件。
以下示例模拟 MySQL 至 StarRocks 导入作业的配置文件。您可以根据实际使用场景修改相应参数。
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"column": [ "k1", "k2", "v1", "v2" ],
"connection": [
{
"table": [ "table1", "table2" ],
"jdbcUrl": [
"jdbc:mysql://x.x.x.x:3306/datax_test1"
]
},
{
"table": [ "table3", "table4" ],
"jdbcUrl": [
"jdbc:mysql://x.x.x.x:3306/datax_test2"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"database": "xxxx",
"table": "xxxx",
"column": ["k1", "k2", "v1", "v2"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://y.y.y.y:9030/",
"loadUrl": ["y.y.y.y:8030", "y.y.y.y:8030"],
"loadProps": {}
}
}
}
]
}
}
您需要在 writer
部分配置以下参数:
参数 | 说明 | 必选 | 默认值 |
---|---|---|---|
username | StarRocks 集群用户名。 导入操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权。 | 是 | 无 |
password | StarRocks 集群用户密码。 | 是 | 无 |
database | StarRocks 目标数据库名称。 | 是 | 无 |
table | StarRocks 目标表名称。 | 是 | 无 |
loadUrl | StarRocks FE 节点的地址,格式为 "fe_ip:fe_http_port" 。多个地址使用逗号(,)分隔。 | 是 | 无 |
column | 目标表需要写入数据的列,多列使用逗号(,)分隔。例如: "column": ["id","name","age"] 。该参数的对应关系与列名无关,但与其顺序一一对应。建议与 reader 中的 column 一样。 | 是 | 无 |
preSql | 标准 SQL 语句,在数据写入到目标表前执行。 | 否 | 无 |
postSql | 标准 SQL 语句,在数据写入到目标表后执行。 | 否 | 无 |
jdbcUrl | 目标数据库的 JDBC 连接信息,用于执行 preSql 及 postSql 。 | 否 | 无 |
maxBatchRows | 单次 Stream Load 导入的最大行数。导入大量数据时,StarRocks Writer 将根据 maxBatchRows 或 maxBatchSize 将数据分为多个 Stream Load 作业分批导入。 | 否 | 500000 |
maxBatchSize | 单次 Stream Load 导入的最大字节数,单位为 Byte。导入大量数据时,StarRocks Writer 将根据 maxBatchRows 或 maxBatchSize 将数据分为多个 Stream Load 作业分批导入。 | 否 | 104857600 |
flushInterval | 上一次 Stream Load 结束至下一次开始的时间间隔,单位为 ms。 | 否 | 300000 |
loadProps | Stream Load 的导入参数,详情参考 STREAM LOAD。 | 否 | 无 |
注意
column
参数不能留空。如果您希望导入所有列,可以配置该项为"*"
。writer
部分中column
的数量必须与reader
部分一致。
配置特殊参数
- 设置分隔符
默认设置下,数据会被转化为字符串,以 CSV 格式通过 Stream Load 导入至 StarRocks。字符串以 \t
作为列分隔符,\n
作为行分隔符。
您可以通过在参数 loadProps
中添加以下配置,以更改分隔符:
"loadProps": {
"column_separator": "\\x01",
"row_delimiter": "\\x02"
}
其中,\x
代表 16 进制,额外的 \
做转义 x
用。
说明
StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符。
- 设置导入格式
当前导入方式的默认导入格式为 CSV。您可以通过在参数 loadProps
中添加以下配置更改数据格式为 JSON:
"loadProps": {
"format": "json",
"strip_outer_array": true
}
- 导入衍生列
如果您希望在向 StarRocks 表中导入原始数据的同时导入衍生列,则需要在 loadProps
里额外设置 columns
参数。
以下代码片段展示导入原始数据列 a
、b
以及衍生列 c
。
"writer": {
"column": ["a", "b"],
"loadProps": {
"columns": "a,b,c=murmur_hash3_32(a)"
}
...
}
其中,您需要在 column
项中填写原始数据中的列名,并在 columns
项中额外填写衍生列名和其对应的数据处理方式。