Skip to main content
Version: Latest-3.4

Query Profile Structure and Metrics

Overview

A Query Profile is a detailed report that provides insights into the execution of a SQL query within StarRocks. It offers a comprehensive view of the query's performance, including the time spent on each operation, the amount of data processed, and other relevant metrics. This information is invaluable for optimizing query performance, identifying bottlenecks, and troubleshooting issues.

Why this matters

80% of real-world slow queries are solved by spotting one of three red-flag metrics. This cheat-sheet gets you there before you drown in numbers.

Quick-Start

Profile a recent query:

1. List recent query IDs

A query ID is needed to analyze a query profile. Use SHOW PROFILELIST;:

SHOW PROFILELIST;
tip

SHOW PROFILELIST is detailed in Text-based Query Profile Visualized Analysis. See that page if you are getting started.

2. Open the profile side-by-side with your SQL

Run ANALYZE PROFILE FOR <query_id>\G or click Profile in the CelerData Web UI.

3. Skim the “Execution Overview” banner

Examine key metrics for overall execution performance:

  • QueryExecutionWallTime: Total wall clock time for query execution
  • QueryPeakMemoryUsagePerNode: Peak memory usage per node, with values exceeding 80% of BE memory indicating potential risks of data spill or Out-of-Memory (OOM) errors
  • QueryCumulativeCpuTime / WallTime < 0.5 * num_cpu_cores means CPU is waiting (likely I/O or network)

If none fire, your query is usually fine—stop here.

4. Drill one level deeper

Identify the operators that consume the most time or the most memory, analyze their metrics, and determine the underlying cause to pinpoint performance bottlenecks.

The "Operator Metrics" section offers numerous guidelines to aid in identifying the root cause of performance issues.

Core Concepts

Query Execution Flow

The comprehensive execution flow of a SQL query involves the following stages:

  1. Planning: The query undergoes parsing, analysis, and optimization, culminating in the generation of a query plan.
  2. Scheduling: The scheduler and coordinator work together to distribute the query plan to all participating backend nodes.
  3. Execution: The query plan is executed using the pipeline execution engine.

SQL Execution Flow

Query Plan Structure

The StarRocks execution engine is designed to execute queries in a distributed manner, and the structure of a Query Profile reflects this design. The following components make up the distributed query plan:

  • Fragment: The highest level of the execution tree, representing a logical unit of work. A query can be divided into one or more fragments.
  • FragmentInstance: Each fragment is instantiated multiple times, with each instance (FragmentInstance) executed on a different computing node. This allows for parallel processing across nodes.
  • Pipeline: A FragmentInstance is further divided into multiple pipelines, which are sequences of connected Operator instances. Pipelines define the execution path for a FragmentInstance.
  • PipelineDriver: To maximize the utilization of computing resources, each pipeline can have multiple instances, known as PipelineDrivers. These drivers execute the pipeline in parallel, leveraging multiple computing cores.
  • Operator: The fundamental execution unit, an Operator instance is a part of a PipelineDriver. Operators implement specific algorithms, such as aggregation, join, or scan, to process data.

profile-3

Pipeline Execution Engine Concepts

The Pipeline Engine is a key component of the StarRocks execution engine. It is responsible for executing the query plan in a parallel and efficient manner. The Pipeline Engine is designed to handle complex query plans and large volumes of data, ensuring high performance and scalability.

Key concepts in the Pipeline Engine:

  • Operator: A fundamental unit of execution responsible for implementing specific algorithms (e.g., aggregation, join, scan)
  • Pipeline: A sequence of connected Operator instances representing the execution path
  • PipelineDriver: Multiple instances of a pipeline for parallel execution
  • Schedule: Non-blocking scheduling of pipelines using user-space time-slicing

pipeline_opeartors

Metric Merging Strategy

By default, StarRocks merges the FragmentInstance and PipelineDriver layers to reduce profile volume, resulting in a simplified three-layer structure:

  • Fragment
  • Pipeline
  • Operator

You can control this merging behavior through the session variable pipeline_profile_level:

  • 1 (Default): Merged three-layer structure
  • 2: Original five-layer structure
  • Other values: Treated as 1

When merging metrics, different strategies are used based on metric type:

  • Time-related metrics: Take the average

    • Example: OperatorTotalTime is the average time consumption
    • __MAX_OF_OperatorTotalTime and __MIN_OF_OperatorTotalTime record extremes
  • Non-time-related metrics: Sum the values

    • Example: PullChunkNum is the sum across all instances
    • __MAX_OF_PullChunkNum and __MIN_OF_PullChunkNum record extremes
  • Constant metrics: Same value across all instances (e.g., DegreeOfParallelism)

Significant differences between MIN and MAX values often indicate data skew, particularly in aggregation and join operations.

Query Profile Metrics

Summary Metrics

Basic information about the query execution:

MetricDescription
TotalThe total time consumed by the query, including Planning, Executing, and Profiling phase durations.
Query StateQuery state, possible states include Finished, Error, and Running.
Query IDUnique identifier for the query.
Start TimeTimestamp when the query started.
End TimeTimestamp when the query ended.
TotalTotal duration of the query.
Query TypeType of the query.
Query StateCurrent state of the query.
StarRocks VersionVersion of StarRocks used.
UserUser who executed the query.
Default DbDefault database used for the query.
Sql StatementSQL statement executed.
VariablesImportant variables used for the query.
NonDefaultSessionVariablesNon-default session variables used for the query.
Collect Profile TimeTime taken to collect the profile.
IsProfileAsyncIndicates if the profile collection was asynchronous.

Planner Metrics

It provides a comprehensive overview of the planner. Typically, if the total time spent on the planner is less than 10ms, it is not a cause for concern.

In certain scenarios, the planner may require more time:

  1. Complex queries may necessitate additional time for parsing and optimization to ensure an optimal execution plan.
  2. The presence of numerous materialized views can increase the time required for query rewriting.
  3. When multiple concurrent queries exhaust system resources and the query queue is utilized, the Pending time may be prolonged.
  4. Queries involving external tables may incur additional time for communication with the external metadata server.

Example:

     - -- Parser[1] 0
- -- Total[1] 3ms
- -- Analyzer[1] 0
- -- Lock[1] 0
- -- AnalyzeDatabase[1] 0
- -- AnalyzeTemporaryTable[1] 0
- -- AnalyzeTable[1] 0
- -- Transformer[1] 0
- -- Optimizer[1] 1ms
- -- MVPreprocess[1] 0
- -- MVTextRewrite[1] 0
- -- RuleBaseOptimize[1] 0
- -- CostBaseOptimize[1] 0
- -- PhysicalRewrite[1] 0
- -- DynamicRewrite[1] 0
- -- PlanValidate[1] 0
- -- InputDependenciesChecker[1] 0
- -- TypeChecker[1] 0
- -- CTEUniqueChecker[1] 0
- -- ColumnReuseChecker[1] 0
- -- ExecPlanBuild[1] 0
- -- Pending[1] 0
- -- Prepare[1] 0
- -- Deploy[1] 2ms
- -- DeployLockInternalTime[1] 2ms
- -- DeploySerializeConcurrencyTime[2] 0
- -- DeployStageByStageTime[6] 0
- -- DeployWaitTime[6] 1ms
- -- DeployAsyncSendTime[2] 0
- DeployDataSize: 10916
Reason:

Execution Overview Metrics

High-level execution statistics:

MetricDescriptionRule of Thumb
FrontendProfileMergeTimeFE-side profile processing time< 10ms normal
QueryAllocatedMemoryUsageTotal allocated memory across nodes
QueryDeallocatedMemoryUsageTotal deallocated memory across nodes
QueryPeakMemoryUsagePerNodeMaximum peak memory per node< 80% capacity normal
QuerySumMemoryUsageTotal peak memory across nodes
QueryExecutionWallTimeWall time of execution
QueryCumulativeCpuTimeTotal CPU time across nodesCompare with walltime * totalCpuCores
QueryCumulativeOperatorTimeTotal operator execution timeDenominator for operator time percentages
QueryCumulativeNetworkTimeTotal Exchange node network time
QueryCumulativeScanTimeTotal Scan node IO time
QueryPeakScheduleTimeMaximum Pipeline ScheduleTime< 1s normal for simple queries
QuerySpillBytesData spilled to disk< 1GB normal

Fragment Metrics

Fragment-level execution details:

MetricDescription
InstanceNumNumber of FragmentInstances
InstanceIdsIDs of all FragmentInstances
BackendNumNumber of participating BEs
BackendAddressesBE addresses
FragmentInstancePrepareTimeFragment Prepare phase duration
InstanceAllocatedMemoryUsageTotal allocated memory for instances
InstanceDeallocatedMemoryUsageTotal deallocated memory for instances
InstancePeakMemoryUsagePeak memory across instances

Pipeline Metrics

Pipeline execution details and relationships:

profile_pipeline_time_relationship

Key relationships:

  • DriverTotalTime = ActiveTime + PendingTime + ScheduleTime
  • ActiveTime = ∑ OperatorTotalTime + OverheadTime
  • PendingTime = InputEmptyTime + OutputFullTime + PreconditionBlockTime + PendingFinishTime
  • InputEmptyTime = FirstInputEmptyTime + FollowupInputEmptyTime
MetricDescription
DegreeOfParallelismDegree of pipeline execution parallelism.
TotalDegreeOfParallelismSum of degrees of parallelism. Since the same Pipeline may execute on multiple machines, this item aggregates all values.
DriverPrepareTimeTime taken by the Prepare phase. This metric is not included in DriverTotalTime.
DriverTotalTimeTotal execution time of the Pipeline, excluding the time spent in the Prepare phase.
ActiveTimeExecution time of the Pipeline, including the execution time of each operator and the overall framework overhead, such as time spent in invoking methods like has_output, need_input, etc.
PendingTimeTime the Pipeline is blocked from being scheduled for various reasons.
InputEmptyTimeTime the Pipeline is blocked due to an empty input queue.
FirstInputEmptyTimeTime the Pipeline is first blocked due to an empty input queue. The first blocking time is separately calculated because the first blocking is mainly caused by Pipeline dependencies.
FollowupInputEmptyTimeTime the Pipeline is subsequently blocked due to an empty input queue.
OutputFullTimeTime the Pipeline is blocked due to a full output queue.
PreconditionBlockTimeTime the Pipeline is blocked due to unmet dependencies.
PendingFinishTimeTime the Pipeline is blocked waiting for asynchronous tasks to finish.
ScheduleTimeScheduling time of the Pipeline, from entering the ready queue to being scheduled for execution.
BlockByInputEmptyNumber of times the pipeline is blocked due to InputEmpty.
BlockByOutputFullNumber of times the pipeline is blocked due to OutputFull.
BlockByPreconditionNumber of times the pipeline is blocked due to unmet preconditions.

Operator Metrics

MetricDescription
PrepareTimeTime spent on preparation.
OperatorTotalTimeTotal time consumed by the Operator. It satisfies the equation: OperatorTotalTime = PullTotalTime + PushTotalTime + SetFinishingTime + SetFinishedTime + CloseTime. It excludes time spent on preparation.
PullTotalTimeTotal time the Operator spends executing push_chunk.
PushTotalTimeTotal time the Operator spends executing pull_chunk.
SetFinishingTimeTotal time the Operator spends executing set_finishing.
SetFinishedTimeTotal time the Operator spends executing set_finished.
PushRowNumCumulative number of input rows for the Operator.
PullRowNumCumulative number of output rows for the Operator.
JoinRuntimeFilterEvaluateNumber of times Join Runtime Filter is evaluated.
JoinRuntimeFilterHashTimeTime spent computing hash for Join Runtime Filter.
JoinRuntimeFilterInputRowsNumber of input rows for Join Runtime Filter.
JoinRuntimeFilterOutputRowsNumber of output rows for Join Runtime Filter.
JoinRuntimeFilterTimeTime spent on Join Runtime Filter.

Scan Operator

To facilitate a better understanding of the various metrics within the Scan Operator, the following diagram demonstrates the associations between these metrics and storage structures.

profile_scan_relationship

To retrieve data from disk and apply the predicates, the storage engine utilize several techniques:

  1. Data Storage: Encoded and compressed data is stored on disk in segments, accompanied by various indices.
  2. Index Filtering: The engine leverages indices such as BitmapIndex, BloomfilterIndex, ZonemapIndex, ShortKeyIndex, and NGramIndex to skip unnecessary data.
  3. Pushdown Predicates: Simple predicates, like a > 1, are pushed down to evaluate on specific columns.
  4. Late Materialization: Only the required columns and filtered rows are retrieved from disk.
  5. Non-Pushdown Predicates: Predicates that cannot be pushed down are evaluated.
  6. Projection Expression: Expressions, such as SELECT a + 1, are computed.

Common Performance Bottlenecks and Their Solutions:

Heavy Raw I/O or Slow Storage

Red-flag metrics / symptoms: BytesRead, RawRowsRead, CompressedBytesRead, ScanTime, IOTaskExecTime dominate

Why it slows down OLAP scan: Disk (or object-store) read bandwidth becomes the constraint

Solutions: Put hot data on NVMe/SSD, Enable storage cache

Poor Predicate

Red-flag metrics / symptoms: PushdownPredicates≈0; ExprFilterRows dominates; LIKE '%x%' or other complicated predicates

Why it slows down OLAP scan: More rows flow into CPU thread-pool because filters aren’t applied in storage layer

Solutions: Rewrite filters to simple comparisons, Build targeted MVs/indexes

Low DOP or Thread-pool Saturation

Red-flag metrics / symptoms: High IOTaskWaitTime; low PeakIOTasks

Why it slows down OLAP scan: Too few parallel scan tasks or threads blocked waiting for I/O slots

Solutions: Increase the disk bandwidth or enlarge the cache

Tablet / Data Skew Across BEs

Red-flag metrics / symptoms: Large max-min gap for OperatorTotalTime or BytesRead; one tablet own most data

Why it slows down OLAP scan: One thread does disproportionate work, all others idle

Solutions: Hash-bucket on high-cardinality key; Increase number of buckets

Fragmented Rowsets & Tiny Segments

Red-flag metrics / symptoms: High RowsetsReadCount / SegmentsReadCount; long SegmentInit time

Why it slows down OLAP scan: Many small files force frequent open/seek calls

Solutions: Increase compactions threads or execute manual compacctions; Batch mini-loads

High number of soft-deleted records

Red-flag metrics / symptoms: High DeleteFilterRows

Why it slows down OLAP scan: Soft-delete will apply the delete predicate when reading

Solutions: Compact the data; Reduce the frequency of delete operations

The Scan Operator utilizes an additional thread pool for executing IO tasks. Therefore, the relationship between time metrics for this node is illustrated below:

profile_scan_time_relationship

OLAP Scan Operator

The OLAP_SCAN Operator is responsible for reading data from StarRocks native tables.

MetricDescription
TableTable name.
RollupMaterialized view name. If no materialized view is hit, it is equivalent to the table name.
SharedScanWhether the enable_shared_scan session variable is enabled.
TabletCountNumber of tablets.
MorselsCountNumber of morsels, which is the basic IO execution unit.
PushdownPredicatesNumber of pushdown predicates.
PredicatesPredicate expressions.
BytesReadSize of data read.
CompressedBytesReadSize of compressed data read from disk.
UncompressedBytesReadSize of uncompressed data read from disk.
RowsReadNumber of rows read (after predicate filtering).
RawRowsReadNumber of raw rows read (before predicate filtering).
ReadPagesNumNumber of pages read.
CachedPagesNumNumber of cached pages.
ChunkBufferCapacityCapacity of the Chunk Buffer.
DefaultChunkBufferCapacityDefault capacity of the Chunk Buffer.
PeakChunkBufferMemoryUsagePeak memory usage of the Chunk Buffer.
PeakChunkBufferSizePeak size of the Chunk Buffer.
PrepareChunkSourceTimeTime spent preparing the Chunk Source.
ScanTimeCumulative scan time. Scan operations are completed in an asynchronous I/O thread pool.
IOTaskExecTimeExecution time of IO tasks.
IOTaskWaitTimeWaiting time from successful submission to scheduled execution of IO tasks.
SubmitTaskCountNumber of times IO tasks are submitted.
SubmitTaskTimeTime spent on task submission.
PeakIOTasksPeak number of IO tasks.
PeakScanTaskQueueSizePeak size of the IO task queue.

Connector Scan Operator

It's similar to OLAP_SCAN operator but used for scan external tables like Iceberg/Hive/Hudi/Detal.

MetricDescription
DataSourceTypeData source type, can be HiveDataSource, ESDataSource, and so on.
TableTable name.
TabletCountNumber of tablets.
MorselsCountNumber of morsels.
PredicatesPredicate expression.
PredicatesPartitionPredicate expression applied to partitions.
SharedScanWhether the enable_shared_scan Session variable is enabled.
ChunkBufferCapacityCapacity of the Chunk Buffer.
DefaultChunkBufferCapacityDefault capacity of the Chunk Buffer.
PeakChunkBufferMemoryUsagePeak memory usage of the Chunk Buffer.
PeakChunkBufferSizePeak size of the Chunk Buffer.
PrepareChunkSourceTimeTime taken to prepare the Chunk Source.
ScanTimeCumulative time for scanning. Scan operation is completed in the asynchronous I/O thread pool.
IOTaskExecTimeExecution time of I/O tasks.
IOTaskWaitTimeWaiting time from successful submission to scheduled execution of IO tasks.
SubmitTaskCountNumber of times IO tasks are submitted.
SubmitTaskTimeTime taken to submit tasks.
PeakIOTasksPeak number of IO tasks.
PeakScanTaskQueueSizePeak size of the IO task queue.

Exchange Operator

Exchange Operator is responsible for transmitting data between BE nodes. There can be several kinds of exchange operations: GATHER/BROADCAST/SHUFFLE.

Typical scenarios that can make Exchange Operator the bottleneck of a query:

  1. Broadcast Join: This is a suitable method for a small table. However, in exceptional cases when the optimizer chooses a suboptimal query plan, it can lead to a significant increase in network bandwidth.
  2. Shuffle Aggregation/Join: Shuffling a large table can result in a significant increase in network bandwidth.

Exchange Sink Operator

MetricDescription
ChannelNumNumber of channels. Generally, the number of channels is equal to the number of receivers.
DestFragmentsList of destination FragmentInstance IDs.
DestIDDestination node ID.
PartTypeData distribution mode, including: UNPARTITIONED, RANDOM, HASH_PARTITIONED, and BUCKET_SHUFFLE_HASH_PARTITIONED.
SerializeChunkTimeTime taken to serialize chunks.
SerializedBytesSize of serialized data.
ShuffleChunkAppendCounterNumber of Chunk Append operations when PartType is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED.
ShuffleChunkAppendTimeTime taken for Chunk Append operations when PartType is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED.
ShuffleHashTimeTime taken to calculate hash when PartType is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED.
RequestSentNumber of data packets sent.
RequestUnsentNumber of unsent data packets. This metric is non-zero when there is a short-circuit logic; otherwise, it is zero.
BytesSentSize of sent data.
BytesUnsentSize of unsent data. This metric is non-zero when there is a short-circuit logic; otherwise, it is zero.
BytesPassThroughIf the destination node is the current node, data will not be transmitted over the network, which is called passthrough data. This metric indicates the size of such passthrough data. Passthrough is controlled by enable_exchange_pass_through.
PassThroughBufferPeakMemoryUsagePeak memory usage of the PassThrough Buffer.
CompressTimeCompression time.
CompressedBytesSize of compressed data.
OverallThroughputThroughput rate.
NetworkTimeTime taken for data packet transmission (excluding post-reception processing time).
NetworkBandwidthEstimated network bandwidth.
WaitTimeWaiting time due to a full sender queue.
OverallTimeTotal time for the entire transmission process, i.e., from sending the first data packet to confirming the correct reception of the last data packet.
RpcAvgTimeAverage time for RPC.
RpcCountTotal number of RPCs.

Exchange Source Operator

MetricDescription
RequestReceivedSize of received data packets.
BytesReceivedSize of received data.
DecompressChunkTimeTime taken to decompress chunks.
DeserializeChunkTimeTime taken to deserialize chunks.
ClosureBlockCountNumber of blocked Closures.
ClosureBlockTimeBlocked time for Closures.
ReceiverProcessTotalTimeTotal time taken for receiver-side processing.
WaitLockTimeLock waiting time.

Aggregate Operator

aggregation_operator Aggregate Operator is responsible for executing aggregation functions, GROUP BY, and DISTINCT.

Multi forms of aggregation algorithm

FormWhen the planner chooses itInternal data structureHighlights / caveats
Hash aggregationkeys fit into memory; cardinality not extremeCompact hash table with SIMD probingdefault path, excellent for modest key counts
Sorted aggregationinput already ordered on the GROUP BY keysSimple row comparison + running statezero hash table cost, often 2-3× faster on probing heavy skews
Spillable aggregation (3.2+)hash table outsizes memory limitHybrid hash/merge with disk spill partitionsprevents OOM, preserves pipeline parallelism

Multi-Stage Distributed Aggregation

In StarRocks the aggregation is implemented in distributed manner, which can be multi-stage depends on the query pattern and optimizer decision.

┌─────────┐        ┌──────────┐        ┌────────────┐        ┌────────────┐
│ Stage 0 │ local │ Stage 1 │ shard/ │ Stage 2 │ gather/│ Stage 3 │ final
│ Partial │───► │ Update │ hash │ Merge │ shard │ Finalize │ output
└─────────┘ └──────────┘ └────────────┘ └────────────┘
StagesWhen UsedWhat Happens
One-stageThe DISTRIBUTED BY is a subset of GROUP BY, the partitions are colocatedPartial aggregates immediately become the final result.
Two-stage (local + global)Typical distributed GROUP BYStage 0 inside each BE collapses duplicates adaptively; Stage 1 shuffles data based on GROUP BY then perform global aggregation
Three-stage (local + shuffle + final)Heavy DISTINCT and high-cardianlity GROUP BYStage 0 as above; Stage 1 shuffles by GROUP BY, then aggregate by GROUP BY and DISTINCT; Stage 2 merges partial state as GROUP BY
Four-stage (local + partial + intermediate + final)Heavy DISTINCT and low-cardinality GROUP BYIntroduce an additional stage to shuffle by GROUP BY and DISTINCT to avoid single-point bottleneck

Common Performance Bottlenecks and Their Solutions:

Bottleneck Cause

High-cardinality GROUP BY → oversize hash table

Red-flag metrics / symptoms: HashTableSize, HashTableMemoryUsage, and AggComputeTime balloon; query gets close to memory limit

Why it hurts Agg operators: Hash-aggregate builds one entry per group; if millions of groups land in RAM the hash table becomes CPU- and memory-bound and can even spill

Solutions: Enable sorted streaming aggregate; Add pre-aggregated MVs or roll-ups; Reduce key width / cast to INT

Data-skewed shuffle between partial → final stages

Red-flag metrics / symptoms: Huge gap in HashTableSize or InputRowCount across instances; one fragment’s AggComputeTime dwarfs others

Why it hurts Agg operators: A single backend receives most of the hot keys and blocks the pipeline

Solutions: Add salt column in the aggregation; Use DISTINCT [skew] hint

Expensive or DISTINCT-style aggregate functions (e.g., ARRAY_AGG, HLL_UNION, BITMAP_UNION, COUNT(DISTINCT))

Red-flag metrics / symptoms: AggregateFunctions dominates operator time; CPU still near 100 % after hash build finishes

Why it hurts Agg operators: State-heavy aggregation functions keep sizable sketches and run SIMD-heavy loops each row

Solutions: Pre-compute HLL/Bitmap columns at ingest; Use approx_count_distinct or multi_distinct_* where accuracy allows;

Poor first-stage (partial) aggregation

Red-flag metrics / symptoms: Very large InputRowCount, but AggComputeTime is modest; PassThroughRowCount is high; upstream EXCHANGE shows massive BytesSent

Why it hurts Agg operators: If partial aggregation on each BE doesn't pre-aggregate the dataset well, most raw rows traverse the network and pile up in the final AGG

Solutions: Confirm plan shows two- or three-stage aggregation; Rewrite query to simple GROUP BY keys so optimizer can push partial AGG; set streaming_preaggregation_mode = 'force_preaggregation'

Heavy expression evaluation on GROUP BY keys

Red-flag metrics / symptoms: ExprComputeTime high relative to AggComputeTime

Why it hurts Agg operators: Complex functions on every row before hashing dominate CPU

Solutions: Materialize computed keys in a sub-query or generated column; Use column dictionary / pre-encoded values; Project downstream instead

Metrics List

MetricDescription
GroupingKeysGROUP BY columns.
AggregateFunctionsTime taken for aggregate function calculations.
AggComputeTimeTime for AggregateFunctions + Group By.
ChunkBufferPeakMemPeak memory usage of the Chunk Buffer.
ChunkBufferPeakSizePeak size of the Chunk Buffer.
ExprComputeTimeTime for expression computation.
ExprReleaseTimeTime for expression release.
GetResultsTimeTime to extract aggregate results.
HashTableSizeSize of the Hash Table.
HashTableMemoryUsageMemory size of the Hash Table.
InputRowCountNumber of input rows.
PassThroughRowCountIn Auto mode, the number of data rows processed in streaming mode after low aggregation leads to degradation to streaming mode.
ResultAggAppendTimeTime taken to append aggregate result columns.
ResultGroupByAppendTimeTime taken to append Group By columns.
ResultIteratorTimeTime to iterate over the Hash Table.
StreamingTimeProcessing time in streaming mode.

Join Operator

join_operator

Join Operator is responsible for implementing explicit join or implicit joins.

During execution the join operator is split into Build (hash-table construction) and Probe phases that run in parallel inside the pipeline engine. Vector chunks (up to 4096 rows) are batch-hashed with SIMD; consumed keys generate runtime filters—Bloom or IN filters—pushed back to upstream scans to cut probe input early.

Join Strategies

StarRocks relies on a vectorized, pipeline-friendly hash-join core that can be wired into four physical strategies the cost-based optimizer weighs at plan time:

StrategyWhen the optimizer picks itWhat makes it fast
Colocate JoinBoth tables belong to the same colocation group (identical bucket keys, bucket count, and replica layout). No network shuffle: each BE joins only its local buckets.
Bucket-Shuffle JoinOne of join tables has the same buckket key with join keyOnly need to shuffle one join table, which can reduce the network cost
Broadcast JoinBuild side is very small (row/byte thresholds or explicit hint). Small table is replicated to every probe node; avoids shuffling large table.
Shuffle (Hash) JoinGeneral case, keys don’t align.Hash-partition each row on the join key so probes are balanced across BEs.

When joins become the bottleneck

Build-side table too large for RAM

Profile symptoms / hot metrics: BuildRows, HashTableSize, BuildHashTableTime dominate; memory near limit or spills

Why it hurts: Hash table has to live in memory, can become slow if it cannot fit into CPU cache

Solutions:

  • smaller table as build side
  • Add pre-aggregation or selective projection
  • Boost query/BE memory or enable hash-spill

Large join probe time

Profile symptoms / hot metrics: High SearchHashTableTime

Why it hurts: Inefficient data clustering can lead to poor CPU cache locality

Solutions: Optimize data clustering by sorting the probe table on join keys

Excessive Output Columns

Profile symptoms / hot metrics: High OutputBuildColumnTime or OutputProbeColumnTime

Why it hurts: The processing of numerous output columns necessitates substantial data copying, which can be CPU-intensive

Solutions: Optimize output columns by reducing their number; Exclude heavy columns from output; Consider retrieving unnecessary columns post-join

Data skew after shuffle

Profile symptoms / hot metrics: One fragment’s ProbeRows ≫ others; OperatorTotalTime highly unbalanced

Why it hurts: A single BE receives most hot keys; others go idle

Solutions:

  • Use higher-cardinality key
  • pad composite key (concat(key,'-',mod(id,16)))

Broadcasting a not-so-small table

Profile symptoms / hot metrics: Join type is BROADCAST; BytesSent and SendBatchTime soar on every BE

Why it hurts: O(N²) network traffic and deserialisation

Solutions:

  • Let optimizer pick shuffle (SET broadcast_row_limit = lower)
  • Force shuffle hint
  • Analyze table to collect statistics.

Missing or ineffective runtime filters

Profile symptoms / hot metrics: JoinRuntimeFilterEvaluate small, scans still read full table

Why it hurts: Scans push all rows into probe side, wasting CPU & I/O

Solutions: Rewrite join predicate to pure equality so RF can be generated; Avoid doing type casting in join key

Non-equi (nested-loop) join sneak-in

Profile symptoms / hot metrics: Join node shows CROSS or NESTLOOP; ProbeRows*BuildRows skyrockets

Why it hurts: O(rows×rows) comparisons; no hash key

Solutions:

  • Add proper equality predicate or pre-filter
  • Materialise predicate result in temporary table, then re-join

Hash-key casting / expression cost

Profile symptoms / hot metrics: High ExprComputeTime; hash function time rivals probe time

Why it hurts: Keys must be cast or evaluated per row before hashing

Solutions:

  • Store keys with matching types
  • Pre-compute complex expressions into generated columns

No colocation on large join

Profile symptoms / hot metrics: Shuffle join between fact and dimension though buckets match

Why it hurts: Random placement forces shuffle every query

Solutions:

  • Put two tables in the same colocation group
  • Verify identical bucket count/key before ingest

Metrics List

MetricDescription
DistributionModeDistribution type, including: BROADCAST, PARTITIONED, COLOCATE, etc.
JoinPredicatesJoin predicates.
JoinTypeJoin type.
BuildBucketsNumber of buckets in the Hash Table.
BuildKeysPerBucketNumber of keys per bucket in the Hash Table.
BuildConjunctEvaluateTimeTime taken for conjunct evaluation during build phase.
BuildHashTableTimeTime taken to build the Hash Table.
ProbeConjunctEvaluateTimeTime taken for conjunct evaluation during probe phase.
SearchHashTableTimerTime taken to search the Hash Table.
CopyRightTableChunkTimeTime taken to copy chunks from the right table.
OutputBuildColumnTimeTime taken to output the column of build side.
OutputProbeColumnTimeTime taken to output the column of probe side.
HashTableMemoryUsageMemory usage of the Hash Table.
RuntimeFilterBuildTimeTime taken to build runtime filters.
RuntimeFilterNumNumber of runtime filters.

Window Function Operator

MetricDescription
ProcessModeExecution mode, including two parts: the first part includes Materializing and Streaming; the second part includes Cumulative, RemovableCumulative, ByDefinition.
ComputeTimeTime taken for window function calculations.
PartitionKeysPartition columns.
AggregateFunctionsAggregate functions.
ColumnResizeTimeTime taken for column resizing.
PartitionSearchTimeTime taken to search partition boundaries.
PeerGroupSearchTimeTime taken to search Peer Group boundaries. Meaningful only when the window type is RANGE.
PeakBufferedRowsPeak number of rows in the buffer.
RemoveUnusedRowsCountNumber of times unused buffers are removed.
RemoveUnusedTotalRowsTotal number of rows removed from unused buffers.

Sort Operator

MetricDescription
SortKeysSorting keys.
SortTypeQuery result sorting method: full sorting or sorting the top N results.
MaxBufferedBytesPeak size of buffered data.
MaxBufferedRowsPeak number of buffered rows.
NumSortedRunsNumber of sorted runs.
BuildingTimeTime taken to maintain internal data structures during sorting.
MergingTimeTime taken to merge sorted runs during sorting.
SortingTimeTime taken for sorting.
OutputTimeTime taken to build the output sorted sequence.

Merge Operator

For ease of understanding various metrics, Merge can be represented as the following state mechanism:

               ┌────────── PENDING ◄──────────┐
│ │
│ │
├──────────────◄───────────────┤
│ │
▼ │
INIT ──► PREPARE ──► SPLIT_CHUNK ──► FETCH_CHUNK ──► FINISHED

|
| one traverse from leaf to root
|

PROCESS
MetricDescriptionLevel
LimitLimit.Primary
OffsetOffset.Primary
StreamingBatchSizeSize of data processed per Merge operation when Merge is performed in Streaming modePrimary
LateMaterializationMaxBufferChunkNumMaximum number of chunks in the buffer when late materialization is enabled.Primary
OverallStageCountTotal execution count of all stages.Primary
OverallStageTimeTotal execution time for each stage.Primary
1-InitStageCountExecution count of the Init stage.Secondary
2-PrepareStageCountExecution count of the Prepare stage.Secondary
3-ProcessStageCountExecution count of the Process stage.Secondary
4-SplitChunkStageCountExecution count of the SplitChunk stage.Secondary
5-FetchChunkStageCountExecution count of the FetchChunk stage.Secondary
6-PendingStageCountExecution count of the Pending stage.Secondary
7-FinishedStageCountExecution count of the Finished stage.Secondary
1-InitStageTimeExecution time for the Init stage.Secondary
2-PrepareStageTimeExecution time for the Prepare stage.Secondary
3-ProcessStageTimeExecution time for the Process stage.Secondary
4-SplitChunkStageTimeTime taken for the Split stage.Secondary
5-FetchChunkStageTimeTime taken for the Fetch stage.Secondary
6-PendingStageTimeTime taken for the Pending stage.Secondary
7-FinishedStageTimeTime taken for the Finished stage.Secondary
LateMaterializationGenerateOrdinalTimeTime taken for generating ordinal columns during late materialization.Tertiary
SortedRunProviderTimeTime taken to retrieve data from the provider during the Process stage.Tertiary

TableFunction Operator

MetricDescription
TableFunctionExecTimeComputation time for the Table Function.
TableFunctionExecCountNumber of executions for the Table Function.

Project Operator

Project Operator is responsible for performing SELECT <expr>. If there're some expensive expressions in the query, this operator can take significant time.

MetricDescription
ExprComputeTimeComputation time for expressions.
CommonSubExprComputeTimeComputation time for common sub-expressions.

LocalExchange Operator

MetricDescription
TypeType of Local Exchange, including: Passthrough, Partition, and Broadcast.
ShuffleNumNumber of shuffles. This metric is only valid when Type is Partition.
LocalExchangePeakMemoryUsagePeak memory usage.
LocalExchangePeakBufferSizePeak size of the buffer.
LocalExchangePeakBufferMemoryUsagePeak memory usage of the buffer.
LocalExchangePeakBufferChunkNumPeak number of chunks in the buffer.
LocalExchangePeakBufferRowNumPeak number of rows in the buffer.
LocalExchangePeakBufferBytesPeak size of data in the buffer.
LocalExchangePeakBufferChunkSizePeak size of chunks in the buffer.
LocalExchangePeakBufferChunkRowNumPeak number of rows per chunk in the buffer.
LocalExchangePeakBufferChunkBytesPeak size of data per chunk in the buffer.

OlapTableSink Operator

OlapTableSink Operator is responsible for performing the INSERT INTO <table> operation.

tip
  • An excessive difference between the Max and Min values of the PushChunkNum metric of OlapTableSink indicates data skew in the upstream operators, which may lead to a bottleneck in loading performance.
  • RpcClientSideTime equals RpcServerSideTime plus network transmission time plus RPC framework processing time. If there is a significant difference between RpcClientSideTime and RpcServerSideTime, consider enabling compression to reduce transmission time.
MetricDescription
IndexNumNumber of the synchronous materialized views created for the destination table.
ReplicatedStorageWhether Single Leader Replication is enabled.
TxnIDID of the loading transaction.
RowsReadNumber of rows read from upstream operators.
RowsFilteredNumber of rows filtered out due to inadequate data quality.
RowsReturnedNumber of rows written to the destination table.
RpcClientSideTimeTotal RPC time consumption for loading recorded by the client side.
RpcServerSideTimeTotal RPC time consumption for loading recorded by the server side.
PrepareDataTimeTotal time consumption for the data preparation phase, including data format conversion and data quality check.
SendDataTimeLocal time consumption for sending the data, including time for serializing and compressing data, and for submitting tasks to the sender queue.