SHOW ROUTINE LOAD
功能
查看 Routine Load 导入作业的执行情况。
Routine Load 操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权。
语法
SHOW [ALL] ROUTINE LOAD [ FOR [<db_name>.]<job_name> | FROM <db_name> ]
[ WHERE [ STATE = { "NEED_SCHEDULE" | "RUNNING" | "PAUSED" | "UNSTABLE" | "STOPPED" | "CANCELLED" } ] ]
[ ORDER BY <field_name> [ ASC | DESC ] ]
[ LIMIT { [offset, ] limit | limit OFFSET offset } ]
备注
如果返回结果中的字段较多,可使用 \G
分行,如 SHOW ROUTINE LOAD FOR <job_name>\G
。
参数说明
参数 | 必选 | 说明 |
---|---|---|
db_name | 否 | 导入作业所属数据库名称。注意在使用 FROM 子句时,该参数为必填参数。 |
job_name | 否 | 导入作业名称。注意在使用 FOR 子句时,该参数为必填参数。 |
ALL | 否 | 显示所有导入作业,包括处于 STOPPED 和 CANCELLED 状态的导入作业。 |
STATE | 否 | 导入作业状态。 |
ORDER BY field_name [ASC | DESC] | 否 | 将返回结果按照指定字段升序或降序排列,当前支持的排序字段(field_name )包括 Id 、Name 、CreateTime 、PauseTime 、EndTime 、TableName 、State 和 CurrentTaskNum 。如要升序排列,指定 ORDER BY field_name ASC 。如要降序排列,指定 ORDER BY field_name DESC 。如既不指定排序字段也不指定排列顺序,则默认按照 Id 升序排列。 |
LIMIT limit | 否 | 查看指定数量的导入作业,例如 LIMIT 10 会显示 10 个符合筛选条件的导入作业。如不指定该参数,则默认显示所有符合筛选条件的导入作业。 |
OFFSET offset | 否 | offset 定义了返回结果中跳过的导入作业的数量,其默认值为 0。例如 OFFSET 5 表示跳过前 5 个导入作业,返回剩下的结果。 |
返回结果说明
参数 | 说明 |
---|---|
Id | 导入作业的全局唯一 ID,由 StarRocks 自动生成。 |
Name | 导入作业的名称。 |
CreateTime | 导入作业创建的时间。 |
PauseTime | 处于 PAUSED 状态的导入作业的暂停时间。 |
EndTime | 处于 STOPPED 状态的导入作业的停止时间。 |
DbName | 导入作业目标表所属数据库。 |
TableName | 导入作业目标表。 |
State | 导入作业状态。包括:
|
DataSourceType | 数据源类型。固定为 KAFKA 。 |
CurrentTaskNum | 导入作业当前的任务数量。 |
JobProperties | 导入作业属性。比如消费分区、列映射关系。 |
DataSourceProperties | 数据源属性。比如 Topic、Kafka 集群中 Broker 的地址和端口列表。 |
CustomProperties | 导入作业中自定义的更多数据源相关属性。 |
Statistic | 导入数据的指标,比如成功导入数据行、总数据行、已接受的数据量。 |
Progress | Topic 中各个分区消息的消费进度(以位点进行衡量)。 |
TimestampProgress | Topic 中各个分区消息的消费进度(以时间戳进行衡量)。 |
ReasonOfStateChanged | 导致导入作业处于 CANCELLED 或者 PAUSED 状态的原因。 |
ErrorLogUrls | 错误日志 URL。您可以使用 curl 或 wget 命令打开该地址。 |
TrackingSQL | 直接通过 SQL 查询 Information Schema 中记录的错误日志信息。 |
OtherMsg | 所有失败的导入任务的信息。 |
LatestSourcePosition | Topic 中各个分区中消息的最新消费位点。 |
示例
如果导入作业成功处于 RUNNING
状态,则返回结果可能如下:
MySQL [example_db]> SHOW ROUTINE LOAD FOR example_tbl_ordertest1\G
*************************** 1. row ***************************
Id: 10204
Name: example_tbl_ordertest1
CreateTime: 2023-12-21 21:01:31
PauseTime: NULL
EndTime: NULL
DbName: example_db
TableName: example_tbl
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","rowDelimiter":"\t","partial_update":"false","columnToColumnExpr":"order_id,pay_dt,customer_name,nationality,temp_gender,price","maxBatchIntervalS":"10","partial_update_mode":"null","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","log_rejected_record_num":"0","taskTimeoutSecond":"60","json_root":"","maxFilterRatio":"1.0","strict_mode":"false","jsonpaths":"","taskConsumeSecond":"15","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"lilyliuyitest4csv","currentKafkaPartitions":"0","brokerList":"xxx.xx.xx.xxx:9092"}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"example_tbl_ordertest1_b05da08f-9b9d-4fe1-b1f2-25d7116d617c"}
Statistic: {"receivedBytes":313,"errorRows":0,"committedTaskNum":1,"loadedRows":6,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":6,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":699}
Progress: {"0":"5"}
TimestampProgress: {"0":"1686143856061"}
ReasonOfStateChanged:
ErrorLogUrls:
TrackingSQL:
OtherMsg:
LatestSourcePosition: {"0":"6"}
1 row in set (0.01 sec)
如果导入作业因异常而处于 PAUSED
或者 CANCELLED
状态,您可以根据返回结果中的 ReasonOfStateChanged
、ErrorLogUrls
、TrackingSQL
和 OtherMsg
排查具体原因。
MySQL [example_db]> SHOW ROUTINE LOAD FOR example_tbl_ordertest2\G
*************************** 1. row ***************************
Id: 10204
Name: example_tbl_ordertest2
CreateTime: 2023-12-22 12:13:18
PauseTime: 2023-12-22 12:13:38
EndTime: NULL
DbName: example_db
TableName: example_tbl
State: PAUSED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"\t","partial_update":"false","columnToColumnExpr":"order_id,pay_dt,customer_name,nationality,temp_gender,price","maxBatchIntervalS":"10","partial_update_mode":"null","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","log_rejected_record_num":"0","taskTimeoutSecond":"60","json_root":"","maxFilterRatio":"1.0","strict_mode":"false","jsonpaths":"","taskConsumeSecond":"15","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"mytest","currentKafkaPartitions":"0","brokerList":"xxx.xx.xx.xxx:9092"}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"example_tbl_ordertest2_b3fada0f-6721-4ad1-920d-e4bf6d6ea7f7"}
Statistic: {"receivedBytes":541,"errorRows":10,"committedTaskNum":1,"loadedRows":6,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":16,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":646}
Progress: {"0":"19"}
TimestampProgress: {"0":"1702623900871"}
ReasonOfStateChanged: ErrorReason{errCode = 102, msg='current error rows is more than max error num'}
ErrorLogUrls: http://xxx.xx.xx.xxx:8040/api/_load_error_log?file=error_log_b25dcc7e642344b2_b0b342b9de0567db
TrackingSQL: select tracking_log from information_schema.load_tracking_logs where job_id=10204
OtherMsg:
LatestSourcePosition: {"0":"20"}
1 row in set (0.00 sec)