从 HDFS 或外部云存储系统导入数据
StarRocks 提供基于 MySQL 协议的 Broker Load 导入方式,帮助您从 HDFS 或外部云存储系统导入大批量数据。
Broker Load 是一种异步的导入方式。您提交导入作业以后,StarRocks 会异步地执行导入作业。您需要通过 SHOW LOAD 语句或者 curl 命令来查看导入作业的结果。
Broker Load 支持单表导入 (Single-Table Load) 和多表导入 (Multi-Table Load)。您可以通过单次导入操作,把一个或多个数据文件导入单张或多张目标表。而且 Broker Load 能够保证单次导入事务的原子性,即单次导入的多个数据文件都成功或者都失败,而不会出现部分导入成功、部分导入失败的情况。
Broker Load 支持在导入过程中做数据转换、以及通过 UPSERT 和 DELETE 操作实现数据变更。请参见导入过程中实现数据转换和通过导入实现数据变更。
背景信息
在 v2.4 及以前版本,StarRocks 在执行 Broker Load 时需要借助 Broker 才能访问外部存储系统,称为“有 Broker 的导入”。导入语句中需要通过 WITH BROKER "<broker_name>"
来指定使用哪个 Broker。Broker 是一个独立的无状态服务,封装了文件系统接口。通过 Broker,StarRocks 能够访问和读取外部存储系统上的数据文件,并利用自身的计算资源对数据文件中的数据进行预处理和导入。
自 v2.5 起,StarRocks 在执行 Broker Load 时不需要借助 Broker 即可访问外部存储系统,称为“无 Broker 的导入”。导入语句中也不再需要指定 broker_name
,但继续保留 WITH BROKER
关键字。
需要注意的是,无 Broker 的导入在数据源为 HDFS 的某些场景下会受限,例如,在多 HDFS 集群或者多 Kerberos 用户的场景。在这些场景下,可以继续采用有 Broker 的导入,需要确保至少部署了一组独立的 Broker。有关各种场景下如何指定认证方式和 HA 配置,参见 HDFS。
说明
您可以通过 SHOW BROKER 语句来查看 StarRocks 集群中已经部署的 Broker。如果集群中没有部署 Broker,请参见部署 Broker 节点完成 Broker 部署。
支持的数据文件格式
Broker Load 支持如下数据文件格式:
-
CSV
-
Parquet
-
ORC
说明
对于 CSV 格式的数据,需要注意以下两点:
- StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符,包括常见的逗号 (,)、Tab 和 Pipe (|)。
- 空值 (null) 用
\N
表示。比如,数据文件一共有三列,其中某行数据的第一列、第三列数据分别为a
和b
,第二列没有数据,则第二列需要用\N
来表示空值,写作a,\N,b
,而不是a,,b
。a,,b
表示第二列是一个空字符串。
支持的外部存储系统
Broker Load 支持从如下外部存储系统导入数据:
-
HDFS
-
AWS S3
-
Google GCS
-
阿里云 OSS
-
腾讯云 COS
-
华为云 OBS
-
其他兼容 S3 协议的对象存储(如 MinIO)
基本原理
提交导入作业以后,FE 会生成对应的查询计划,并根据目前可用 BE 的个数和源数据文件的大小,将 查询计划分配给多个 BE 执行。每个 BE 负责执行一部分导入任务。BE 在执行过程中,会从 HDFS 或云存储系统拉取数据,并且会在对数据进行预处理之后将数据导入到 StarRocks 中。所有 BE 均完成导入后,由 FE 最终判断导入作业是否成功。
下图展示了 Broker Load 的主要流程:
基本操作
创建多表导入 (Multi-Table Load) 作业
这里以 CSV 格式的数据为例,介绍如何导入多个数据文件至多张目标表。有关如何导入其他格式的数据、以及 Broker Load 的详细语法和参数说明,请参见 BROKER LOAD。
注意在 StarRocks 中,部分文字是 SQL 语言的保留关键字,不能直接用于 SQL 语句。如果想在 SQL 语句中使用这些保留关键字,必须用反引号 (`) 包裹起来。参见关键字。
数据样例
-
在本地文件系统中创建 CSV 格式的数据文件。
a. 创建一个名为
file1.csv
的数据文件。文件一共包含三列,分别代表用户 ID、用户姓名和用户得分,如下所示:1,Lily,23
2,Rose,23
3,Alice,24
4,Julia,25b. 创建一个名为
file2.csv
的数据文件。文件一共包含两列,分别代表城市 ID 和城市名称,如下所示:200,'北京'
-
把创建好的数据文件
file1.csv
和file2.csv
分别上传到 HDFS 集群的/user/starrocks/
路径下、AWS S3 存储空间bucket_s3
里的input
文件夹下、 Google GCS 存储空间bucket_gcs
里的input
文件夹下、阿里云 OSS 存储空间bucket_oss
里的input
文件夹下、腾讯云 COS 存储空间bucket_cos
里的input
文件夹下、华为云 OBS 存储空间bucket_obs
里的input
文件夹下、以及其他兼容 S3 协议的对象存储空间(如 MinIO)bucket_minio
里的input
文件夹下。 -
在 StarRocks 数据库
test_db
中创建 StarRocks 表。说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 确定分桶数量。
a. 创建一张名为
table1
的主键模型表。表包含id
、name
和score
三列,分别代表用户 ID、 用户名称和用户得分,主键为id
列,如下所示:CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
`score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10;b. 创建一张名为
table2
的主键模型表。表包含id
和city
两列,分别代表城市 ID 和城市名称,主键为id
列,如下所示:CREATE TABLE `table2`
(
`id` int(11) NOT NULL COMMENT "城市 ID",
`city` varchar(65533) NULL DEFAULT "" COMMENT "城市名称"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10;
从 HDFS 导入
可以通过如下语句,把 HDFS 集群 /user/starrocks/
路径下的 CSV 文件 file1.csv
和 file2.csv
分别导入到 StarRocks 表 table1
和 table2
中:
LOAD LABEL test_db.label1
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
)
PROPERTIES
(
"timeout" = "3600"
);
以上示例中,StorageCredentialParams
代表一组认证参数,具体包含哪些参数,需要根据您所使用的认证方式来确定,详情请参见 BROKER LOAD。
从 AWS S3 导入
可以通过如下语句,把 AWS S3 存储空间 bucket_s3
里 input
文件夹内的 CSV 文件 file1.csv
和 file2.csv
分别导入到 StarRocks 表 table1
和 table2
中:
LOAD LABEL test_db.label2
(
DATA INFILE("s3a://bucket_s3/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("s3a://bucket_s3/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);
说明
由于 Broker Load 只支持通过 S3A 协议访问 AWS S3,因此当从 AWS S3 导入数据时,
DATA INFILE
中传入的目标文件的 S3 URI,前缀必须将s3://
修改为s3a://
。
以上示例中,StorageCredentialParams
代表一组认证参数,具体包含哪些参数,需要根据您所使用的认证方式来确定,详情请参见 BROKER LOAD。
从 Google GCS 导入
可以通过如下语句,把 Google GCS 存储空间 bucket_gcs
里 input
文件夹内的 CSV 文件 file1.csv
和 file2.csv
分别导入到 StarRocks 表 table1
和 table2
中:
LOAD LABEL test_db.label3
(
DATA INFILE("s3a://bucket_gcs/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("s3a://bucket_gcs/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);
说明
由于 Broker Load 只支持通过 S3A 协议访问 Google GCS,因此当从 Google GCS 导入数据时,
DATA INFILE
中传入的目标文件的 GCS URI,前缀必须修改为s3a://
。
以上示例中,StorageCredentialParams
代表一组认证参数,具体包含哪些参数,需要根据您所使用的认证方式来确定,详情请参见 BROKER LOAD。
从阿里云 OSS 导入
可以通过如下语句,把阿里云 OSS 存储空间 bucket_oss
里 input
文件夹内的 CSV 文件 file1.csv
和 file2.csv
分别导入到 StarRocks 表 table1
和 table2
中:
LOAD LABEL test_db.label4
(
DATA INFILE("oss://bucket_oss/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("oss://bucket_oss/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);
以上示例中,StorageCredentialParams
代表一组认证参数,具体包含哪些参数,需要根据您所使用的认证方式来确定,详情请参见 BROKER LOAD。
从腾讯云 COS 导入
可以通过如下语句,把腾讯云 COS 存储空间 bucket_cos
里 input
文件夹内的 CSV 文件 file1.csv
和 file2.csv
分别导入到 StarRocks 表 table1
和 table2
中:
LOAD LABEL test_db.label5
(
DATA INFILE("cosn://bucket_cos/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("cosn://bucket_cos/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);
以上示例中,StorageCredentialParams
代表一组认证参数,具体包含哪些参数,需要根据您所使用的认证方式来确定,详情请参见 BROKER LOAD。
从华为云 OBS 导入
可以通过如下语句,把华为云 OBS 存储空间 bucket_obs
里 input
文件夹内的 CSV 文件 file1.csv
和 file2.csv
分别导入到 StarRocks 表 table1
和 table2
中:
LOAD LABEL test_db.label6
(
DATA INFILE("obs://bucket_obs/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("obs://bucket_obs/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);
说明
从华为云 OBS 导入数据时,需要先下载依赖库添加到 $BROKER_HOME/lib/ 路径下并重启 Broker。
以上示例中,StorageCredentialParams
代表一组认证参数,具体包含哪些参数,需要根据您所使用的认证方式来确定,详情请参见 BROKER LOAD。
从其他兼容 S3 协议的对象存储导入
可以通过如下语句,把兼容 S3 协议的对象存储空间(如 MinIO) bucket_minio
里 input
文件夹内的 CSV 文件 file1.csv
和 file2.csv
分别导入到 StarRocks 表 table1
和 table2
中:
LOAD LABEL test_db.label7
(
DATA INFILE("s3://bucket_minio/input/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("s3://bucket_minio/input/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, city)
)
WITH BROKER
(
StorageCredentialParams
);