跳到主要内容
版本:Latest-4.0

使用 Stream Load 事务接口导入

为了支持和 Apache Flink®、Apache Kafka® 等其他系统之间实现跨系统的两阶段提交,并提升高并发 Stream Load 导入场景下的性能,StarRocks 自 2.4 版本起提供 Stream Load 事务接口。

从 v4.0 版本开始,Stream Load 事务接口支持多表事务,即向同一数据库内的多个表导入数据。

本文介绍 Stream Load 事务接口、以及如何使用该事务接口把数据导入到 StarRocks 中。

接口说明

Stream Load 事务接口支持通过兼容 HTTP 协议的工具或语言发起接口请求。本文以 curl 工具为例介绍如何使用该接口。该接口提供事务管理、数据写入、事务预提交、事务去重和超时管理等功能。

备注

Stream Load 支持导入 CSV 和 JSON 格式的数据,并且建议在导入的数据文件数量较少、单个数据文件的大小不超过 10 GB 时使用。Stream Load 不支持 Parquet 文件格式。如果要导入 Parquet 格式的数据,请使用 INSERT+files().

事务管理

提供如下标准接口,用于管理事务:

  • /api/transaction/begin:开启一个新事务。

  • /api/transaction/prepare: 预提交当前事务并使数据更改暂时持久化。预提交事务后,您可以继续提交或回滚该事务。如果集群在事务预提交后发生故障,您仍可在集群恢复后继续提交该事务。

  • /api/transaction/commit:提交当前事务,持久化变更。

  • /api/transaction/rollback:回滚当前事务,回滚变更。

说明

在事务预提交以后,请勿继续写入数据。继续写入数据的话,写入请求会报错。

下图展示了事务状态与操作之间的关系:

stateDiagram-v2
direction LR
[*] --> PREPARE : begin
PREPARE --> PREPARED : prepare
PREPARE --> ABORTED : rollback
PREPARED --> COMMITTED : commit
PREPARED --> ABORTED : rollback

数据写入

提供 /api/transaction/load 接口,用于写入数据。您可以在同一个事务中多次调用该接口来写入数据。

从 v4.0 版本开始,您可以在不同表上调用 /api/transaction/load 操作,将数据导入到同一数据库中的多个表中。

事务去重

复用 StarRocks 现有的标签机制,通过标签绑定事务,实现事务的 “至多一次 (At-Most-Once)” 语义。

超时管理

当开始事务时,您可以使用 HTTP 请求 Header 中的 timeout 字段来指定从 PREPARE 状态到 PREPARED 状态的超时时间(以秒为单位)。如果在此时间段内事务未完成准备,将自动取消该事务。如果未指定此字段,默认值由 FE 配置 stream_load_default_timeout_second 决定(默认:600 秒)。

当开始事务时,您还可以通过HTTP请求 Header 中的 idle_transaction_timeout 字段指定事务可保持空闲状态的超时时间(以秒为单位)。若在此期间内未写入任何数据,该事务将被自动回滚。

在预提交事务时,您可以通过 HTT P请求 Header 中的 prepared_timeout 字段指定事务从 PREPARED 状态转换为 COMMITTED 状态的超时时间(以秒为单位)。如果在此时间段内事务未完成提交,系统将自动取消该事务。如果未指定此字段,默认值由 FE 配置 prepared_transaction_default_timeout_second 决定(默认:86400 秒)。prepared_timeout 自 v3.5.4 版本起支持。

接口优势

Stream Load 事务接口具有如下优势:

  • Exactly-Once 语义

    通过“预提交事务”、“提交事务”,方便实现跨系统的两阶段提交。例如配合在 Flink 实现“精确一次 (Exactly-Once)”语义的导入。

  • 提升导入性能

    在通过程序提交 Stream Load 作业的场景中,Stream Load 事务接口允许在一个导入作业中按需合并发送多次小批量的数据后“提交事务”,从而能减少数据导入的版本,提升导入性能。

使用限制

事务接口当前具有如下使用限制:

  • 从 v4.0 版本起支持单库多表事务。未来将会支持跨库多表事务。

  • 只支持单客户端并发数据写入,未来将会支持多客户端并发数据写入。

  • 支持在单个事务中多次调用数据写入接口 /api/transaction/load 来写入数据,但是要求所有 /api/transaction/load 接口中的参数设置(除 table 外)必须保持一致。

  • 导入 CSV 格式的数据时,需要确保每行数据结尾都有行分隔符。

注意事项

  • 使用 Stream Load 事务接口导入数据的过程中,注意 /api/transaction/begin/api/transaction/load/api/transaction/prepare 接口报错后,事务将失败并自动回滚。
  • 在调用 /api/transaction/begin 接口开启事务时,您必须指定标签 (Label),其后的 /api/transaction/load/api/transaction/prepare/api/transaction/commit 三个接口中,必须使用与 /api/transaction/begin 接口中相同的标签。
  • 重复调用标签相同的 /api/transaction/begin 接口,会导致前面使用相同标签正在进行中的事务失败并回滚。
  • 若使用多表事务将数据导入到不同表中,则必须为事务涉及的所有操作指定参数 -H "transaction_type:multi"
  • StarRocks支持导入的 CSV 格式数据默认的列分隔符是 \t,默认的行分隔符是 \n。如果源数据文件中的列分隔符和行分隔符不是 \t\n,则在调用 /api/transaction/load 接口时必须通过 "column_separator: <column_separator>""row_delimiter: <row_delimiter>" 指定行分隔符和列分隔符。

准备工作

查看权限

导入操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权,语法为 GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}

查看网络配置

确保待导入数据所在的机器能够访问 StarRocks 集群中 FE 节点的 http_port 端口(默认 8030)、以及 BE 节点的 be_http_port 端口(默认 8040)。

基本操作

准备数据样例

这里以 CSV 格式的数据为例。

  1. 在本地文件系统 /home/disk1/ 路径下创建一个 CSV 格式的数据文件 example1.csv。文件一共包含三列,分别代表用户 ID、用户姓名和用户得分,如下所示:

    1,Lily,23
    2,Rose,23
    3,Alice,24
    4,Julia,25
  2. 在数据库 test_db 中创建一张名为 table1 的主键表。表包含 idnamescore 三列,主键为 id 列,如下所示:

    CREATE TABLE `table1`
    (
    `id` int(11) NOT NULL COMMENT "用户 ID",
    `name` varchar(65533) NULL COMMENT "用户姓名",
    `score` int(11) NOT NULL COMMENT "用户得分"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`) BUCKETS 10;

开始事务

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # 可选。启动多表事务。
-H "db:<database_name>" -H "table:<table_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin

说明

若需在事务内向不同表导入数据,请在命令中指定 -H "transaction_type:multi"

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin

说明

上述示例中,指定事务的标签为 streamload_txn_example1_table1

返回结果

  • 如果事务开始成功,则返回如下结果:

    {
    "Status": "OK",
    "Message": "",
    "Label": "streamload_txn_example1_table1",
    "TxnId": 9032,
    "BeginTxnTimeMs": 0
    }
  • 如果事务的标签重复,则返回如下结果:

    {
    "Status": "LABEL_ALREADY_EXISTS",
    "ExistingJobStatus": "RUNNING",
    "Message": "Label [streamload_txn_example1_table1] has already been used."
    }
  • 如果发生标签重复以外的其他错误,则返回如下结果:

    {
    "Status": "FAILED",
    "Message": ""
    }

写入数据

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # 可选。通过多表事务导入数据。
-H "db:<database_name>" -H "table:<table_name>" \
-T <file_path> \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load

说明

  • 调用 /api/transaction/load 接口时,必须通过 -T <file_path> 指定数据文件所在的路径。
  • 您可以通过调用 /api/transaction/load 操作并传入不同的 table 参数值,将数据导入到同一数据库中的不同表中。此时,您必须在命令中指定 -H "transaction_type:multi"

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-T /home/disk1/example1.csv \
-H "column_separator: ," \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load

说明

上述示例中,由于数据文件 example1.csv 中使用的列分隔符为逗号 (,),而不是 StarRocks 默认的列分隔符 (\t),因此在调用 /api/transaction/load 接口时必须通过 "column_separator: <column_separator>" 指定列分隔符为逗号 (,)。

返回结果

  • 如果数据写入成功,则返回如下结果:

    {
    "TxnId": 1,
    "Seq": 0,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    }
  • 如果事务被判定为未知,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "TXN_NOT_EXISTS"
    }
  • 如果事务状态无效,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation State Invalid"
    }
  • 如果发生事务未知和状态无效以外的其他错误,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

预提交事务

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # 可选。预提交多表事务。
-H "db:<database_name>" \
[-H "prepared_timeout:<timeout_seconds>"] \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare

说明

若要预提交的事务为多表事务,请在命令中指定 -H "transaction_type:multi"

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-H "prepared_timeout:300" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare

说明

prepared_timeout 字段为可选。如果未指定该字段,其默认值由 FE 配置中的 prepared_transaction_default_timeout_second 决定(默认值:86400 秒)。prepared_timeout 自 v3.5.4 版本起支持。

返回结果

  • 如果事务预提交成功,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    "WriteDataTimeMs": 417851
    "CommitAndPublishTimeMs": 1393
    }
  • 如果事务被判定为不存在,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • 如果事务预提交超时,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "commit timeout",
    }
  • 如果发生事务不存在和预提交超时以外的其他错误,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "publish timeout"
    }

提交事务

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # 可选。提交多表事务。
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit

说明

若要提交的事务为多表事务,请在命令中指定 -H "transaction_type:multi"

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit

返回结果

  • 如果事务提交成功,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "",
    "NumberTotalRows": 5265644,
    "NumberLoadedRows": 5265644,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 10737418067,
    "LoadTimeMs": 418778,
    "StreamLoadPutTimeMs": 68,
    "ReceivedDataTimeMs": 38964,
    "WriteDataTimeMs": 417851
    "CommitAndPublishTimeMs": 1393
    }
  • 如果事务已经提交过,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": "Transaction already commited",
    }
  • 如果事务被判定为不存在,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • 如果事务提交超时,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "commit timeout",
    }
  • 如果数据发布超时,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "publish timeout",
    "CommitAndPublishTimeMs": 1393
    }
  • 如果发生事务不存在和超时以外的其他错误,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

回滚事务

语法

curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
[-H "transaction_type:multi"]\ # 可选。回滚多表事务。
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback

说明

若要回滚的事务为多表事务,请在命令中指定 -H "transaction_type:multi"

示例

curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback

返回结果

  • 如果事务回滚成功,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "OK",
    "Message": ""
    }
  • 如果事务被判定为不存在,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": "Transcation Not Exist"
    }
  • 如果发生事务不存在以外的其他错误,则返回如下结果:

    {
    "TxnId": 1,
    "Label": "streamload_txn_example1_table1",
    "Status": "FAILED",
    "Message": ""
    }

相关文档

有关 Stream Load 适用的业务场景、支持的数据文件格式、基本原理等信息,参见使用 Stream Load 从本地导入

有关创建 Stream Load 作业的语法和参数,参见STREAM LOAD

Rocky the happy otterStarRocks Assistant

AI generated answers are based on docs and other sources. Please test answers in non-production environments.