从本地文件系统导入
StarRocks 提供两种导入方式帮助您从本地文件系统导入数据:
- 使用 Stream Load 进行同步导入。
- 使用 Broker Load 进行异步导入。
两种导入方式各有优势:
- Stream Load 支持 CSV 和 JSON 两种数据文件格式,适用于数据文件数量较少且单个文件的大小不超过 10 GB 的场景。
- Broker Load 支持 Parquet、ORC、CSV、及 JSON 四种文件格式(JSON 文件格式自 3.2.3 版本起支持),适用于数据文件数量较多且单个文件的大小超过 10 GB 的场景、以及文件存储在 NAS 的场景。
对于 CSV 格式的数据,需要注意以下两点:
- StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符,包括常见的逗号 (,)、Tab 和 Pipe (|)。
- 空值 (null) 用
\N
表示。比如,数据文件一共有三列,其中某行数据的第一列、第三列数据分别为a
和b
,第二列没有数据,则第二列需要用\N
来表示空值,写作a,\N,b
,而不是a,,b
。a,,b
表示第二列是一个空字符串。
Stream Load 和 Broker Load 均支持在导入过程中做数据转换、以及通过 UPSERT 和 DELETE 操作实现数据变更。请参见导入过程中实现数据转换和通过导入实现数据变更。
准备工作
查看权限
导入操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权,语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}
。
查看网络配置
确保待导入数据所在的机器能够访问 StarRocks 集群中 FE 节点的 http_port
端口(默认 8030
)、以及 BE 节点的 be_http_port
端口(默认 8040
)。
使用 Stream Load 从本地导入
Stream Load 是一种基于 HTTP PUT 的同步导入方式。提交导入作业以后,StarRocks 会同步地执行导入作业,并返回导入作业的结果信息。您可以通过返回的结果信息来判断导入作业是否成功。
NOTICE
Stream Load 操作会同时更新和 StarRocks 原始表相关的物化视图的数据。
基本原理
您需要在客户端上通过 HTTP 发送导入作业请求给 FE,FE 会通过 HTTP 重定向 (Redirect) 指令将请求转发给某一个 BE(或 CN)。或者,您也可以直接发送导入作业请求给某一个 BE(或 CN)。
如果把导入作业请求发送给 FE,FE 会通过轮询机制选定由哪一个 BE(或 CN)来接收请求,从而实现 StarRocks 集群内的负载均衡。因此,推荐您把导入作业请求发送给 FE。
接收导入作业请求的 BE(或 CN)作为 Coordinator BE(或 CN),将数据按表结构划分、并分发数据到其他各相关的 BE(或 CN)。导入作业的结果信息由 Coordinator BE(或 CN)返回给客户端。需要注意的是,如果您在导入过程中停止 Coordinator BE(或 CN),会导致导入作业失败。
下图展示了 Stream Load 的主要流程:
使用限制
Stream Load 当前不支持导入某一列为 JSON 的 CSV 文件的数据。
操作示例
本文以 curl 工具为例,介绍如何使用 Stream Load 从本地文件系统导入 CSV 或 JSON 格式的数据。有关创建导入作业的详细语法和参数说明,请参见 STREAM LOAD。
注意在 StarRocks 中,部分文字是 SQL 语言的保留关键字,不能直接用于 SQL 语句。如果想在 SQL 语句中使用这些保留关键字,必须用反引号 (`) 包裹起来。参见关键字。
导入 CSV 格式的数据
数据样例
在本地文件系统中创建一个 CSV 格式的数据文件 example1.csv
。文件一共包含三列,分别代表用户 ID、用户姓名和用户得分,如下所示:
1,Lily,23
2,Rose,23
3,Alice,24
4,Julia,25
建库建表
通过如下语句创建数据库、并切换至该数据库:
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
通过如下语句手动创建主键表 table1
,包含 id
、name
和 score
三列,分别代表用户 ID、用户姓名和用户得分,主键为 id
列,如下所示:
CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
提交导入作业
通过如下命令,把 example1.csv
文件中的数据导入到 table1
表中:
curl --location-trusted -u <username>:<password> -H "label:123" \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns: id, name, score" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/mydatabase/table1/_stream_load
- 如果账号没有设置密码,这里只需要传入
<username>:
。 - 您可以通过 SHOW FRONTENDS 命令查看 FE 节点的 IP 地址和 HTTP 端口号。
example1.csv
文件中包含三列,跟 table1
表的 id
、name
、score
三列一一对应,并用逗号 (,) 作为列分隔符。因此,需要通过 column_separator
参数指定列分隔符为逗号 (,),并且在 columns
参数中按顺序把 example1.csv
文件中的三列临时命名为 id
、name
、score
。columns
参数中声明的三列,按名称对应 table1
表中的三列。
导入完成后,您可以查询 table1
表,验证数据导入是否成功,如下所示:
SELECT * FROM table1;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Lily | 23 |
| 2 | Rose | 23 |
| 3 | Alice | 24 |
| 4 | Julia | 25 |
+------+-------+-------+
4 rows in set (0.00 sec)
导入 JSON 格式的数据
从 3.2.7 版本起,STREAM LOAD 支持在传输过程中对 JSON 数据进行压缩,减少网络带宽开销。用户可以通过 compression
或 Content-Encoding
参数指定不同的压缩方式,支持 GZIP、BZIP2、LZ4_FRAME、ZSTD 压缩算法。语法参见STREAM LOAD。
数据样例
在本地文件系统中创建一个 JSON 格式的数据文件 example2.json
。文件一共包含两个字段,分别代表城市名称和城市 ID,如下所示:
{"name": "北京", "code": 2}
建库建表
通过如下语句创建数据库、并切换至该数据库:
CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;
通过如下语句手动创建主键表 table2
,包含 id
和 city
两列,分别代表城市 ID 和城市名称,主键为 id
列,如下所示:
CREATE TABLE `table2`
(
`id` int(11) NOT NULL COMMENT "城市 ID",
`city` varchar(65533) NULL COMMENT "城市名称"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
提交导入作业
通过如下语句把 example2.json
文件中的数据导入到 table2
表中:
curl -v --location-trusted -u <username>:<password> -H "strict_mode: true" \
-H "Expect:100-continue" \
-H "format: json" -H "jsonpaths: [\"$.name\", \"$.code\"]" \
-H "columns: city,tmp_id, id = tmp_id * 100" \
-T example2.json -XPUT \
http://<fe_host>:<fe_http_port>/api/mydatabase/table2/_stream_load
- 如果账号没有设置密码,这里只需要传入
<username>:
。 - 您可以通过 SHOW FRONTENDS 命令查看 FE 节点的 IP 地址和 HTTP 端口号。
example2.json
文件中包含 name
和 code
两个键,跟 table2
表中的列之间的对应关系如下图所示。
上图所示的对应关系描述如下:
- 提取
example2.json
文件中包含的name
和code
两个字段,按顺序依次映射到jsonpaths
参数中声明的name
和code
两个字段。 - 提取
jsonpaths
参数中声明的name
和code
两个字段,按顺序映射到columns
参数中声明的city
和tmp_id
两列。 - 提取
columns
参数声明中的city
和id
两列,按名称映射到table2
表中的city
和id
两列。
上述示例中,在导入过程中先将 example2.json
文件中 code
字段对应的值乘以 100,然后再落入到 table2
表的 id
中。
有关导入 JSON 数据时 jsonpaths
、columns
和 StarRocks 表中的字段之间的对应关系,请参见 STREAM LOAD 文档中“列映射”章节。
导入完成后,您可以查询 table2
表,验证数据导入是否成功,如下所示:
SELECT * FROM table2;
+------+--------+
| id | city |
+------+--------+
| 200 | 北京 |
+------+--------+
4 rows in set (0.01 sec)
查看 Stream Load 导入进度
导入作业结束后,StarRocks 会以 JSON 格式返回本次导入作业的结果信息,具体请参见 STREAM LOAD 文档中“返回值”章节。
Stream Load 不支持通过 SHOW LOAD 语句查看导入作业执行情况。
取消 Stream Load 作业
Stream Load 不支持手动取消导入作业。如果导入作业发生超时或者导入错误,StarRocks 会自动取消该作业。