从 RisingWave 持续导入
RisingWave 是 Apache 2.0 协议开源的分布式流数据库。旨在让用户以操作传统数据库的方式来处理流数据。通过创建实时物化视图,RisingWave 可以让用户轻松编写流计算逻辑。用户可以访问物化视图来对流计算结果进行及时、一致的查询。想快速上手 RisingWave 可参考此文档。
RisingWave 提供了直连的 StarRocks Sink 功能,可用于将数据导入至 StarRocks 表,不依赖第三方组件。该功能支持 StarRocks 全部 4 种表类型:明细表、聚合表、更新表和主键表。
前提条件
- 请确保您的 RisingWave 集群升级至 v1.7 或以上。v1.7 版本对 StarRocks Sink 进行了优化。
- 推荐使用 StarRocks v2.5 及以上版本。更早版本未经过充分测试,如遇到稳定性问题可以通过 Github Issues 进行反馈。
- 使用 RisingWave connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。
提示
RisingWave 目前对 StarRocks Sink 仅支持 At-least-once 语义,这意味着在故障发生时,数据可能被重复写入。使用 StarRocks 主键表可对数据进行去重,实现端到端的幂等写。
参数说明
下表解释了 RisingWave 向 StarRocks Sink 数据时需要配置的参数。若未特殊标识,所有的参数都默认为必填。
参数名 | 描述 |
---|---|
connector | 固定设置为 starrocks 。 |
starrocks.host | StarRocks FE 节点的 IP 地址。 |
starrocks.query_port | FE 节点的 MySQL 接口所用端口。 |
starrocks.http_port | FE 节点的 HTTP 接口所用端口。 |
starrocks.user | 用于访问 StarRocks 数据库的用户名。 |
starrocks.password | 用户名所对应的密码。 |
starrocks.database | StarRocks 目标表所在的数据库名。 |
starrocks.table | StarRocks 目标表。 |
starrocks.partial_update | (选填)是否启用部分列更新的优化。在更新列较少时,启用该选项可以提高导入性能。 |
type | 输出数据类型:
|
force_append_only | (选填)当输出类型为 append-only 而 Sink 实际有可能输出 upsert/delete 变更时,将 upsert 和 delete 数据丢弃,强行让 Sink 输出 append-only 流。 |
primary_key | (选填)StarRocks 表的主键。当 type 为 upsert 时,需要填入该项。 |
数据 类型映射
RisingWave 类型 | StarRocks 类型 |
---|---|
BOOLEAN | BOOLEAN |
SMALLINT | SMALLINT |
INTEGER | INT |
BIGINT | BIGINT |
REAL | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
DATE | DATE |
VARCHAR | VARCHAR |
TIME(建议预先转类型为 VARCHAR) | 不支持 |
TIMESTAMP | DATETIME |
TIMESTAMP WITH TIME ZONE(建议预先转类型为 TIMESTAMP) | 不支持 |
INTERVAL(建议预先转类型为 VARCHAR) | 不支持 |
STRUCT | JSON |
ARRAY | ARRAY |
BYTEA(建议预先转类型为 VARCHAR) | 不支持 |
JSONB | JSON |
SERIAL | BIGINT |
使用示例
-
在 StarRocks 上创建数据库
demo
,并创建主键表score_board
。CREATE DATABASE demo;
USE demo;
CREATE TABLE demo.score_board(
id int(11) NOT NULL COMMENT "",
name varchar(65533) NULL DEFAULT "" COMMENT "",
score int(11) NOT NULL DEFAULT "0" COMMENT ""
)
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id); -
使用 RisingWave 向 StarRocks 表写入数据:
-- 创建一张 RisingWave 表。
CREATE TABLE score_board (
id INT PRIMARY KEY,
name VARCHAR,
score INT
);
-- 向表中插入数据。
INSERT INTO score_board VALUES (1, 'starrocks', 100), (2, 'risingwave', 100);
-- 将表中的数据 Sink 到 StarRocks 表中。
CREATE SINK score_board_sink
FROM score_board WITH (
connector = 'starrocks',
type = 'upsert',
starrocks.host = 'starrocks-fe',
starrocks.mysqlport = '9030',
starrocks.httpport = '8030',
starrocks.user = 'users',
starrocks.password = '123456',
starrocks.database = 'demo',
starrocks.table = 'score_board',
primary_key = 'id'
);