Skip to content

Commit 658b37c

Browse files
ObjectHashAggregateExec Physical Operator
1 parent 08dbad7 commit 658b37c

File tree

5 files changed

+115
-23
lines changed

5 files changed

+115
-23
lines changed

docs/AggUtils.md

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,51 @@ createAggregate(
6060

6161
1. [SortAggregateExec](physical-operators/SortAggregateExec.md)
6262

63+
---
64+
6365
`createAggregate` is used when:
6466

65-
* `AggUtils` is used to [planAggregateWithoutDistinct](#planAggregateWithoutDistinct), [planAggregateWithOneDistinct](#planAggregateWithOneDistinct), and `planStreamingAggregation`
67+
* `AggUtils` is used to [createStreamingAggregate](#createStreamingAggregate), [planAggregateWithoutDistinct](#planAggregateWithoutDistinct), [planAggregateWithOneDistinct](#planAggregateWithOneDistinct)
68+
69+
## <span id="planStreamingAggregation"> Planning Execution of Streaming Aggregation
70+
71+
```scala
72+
planStreamingAggregation(
73+
groupingExpressions: Seq[NamedExpression],
74+
functionsWithoutDistinct: Seq[AggregateExpression],
75+
resultExpressions: Seq[NamedExpression],
76+
stateFormatVersion: Int,
77+
child: SparkPlan): Seq[SparkPlan]
78+
```
79+
80+
`planStreamingAggregation`...FIXME
81+
82+
---
83+
84+
`planStreamingAggregation` is used when:
85+
86+
* `StatefulAggregationStrategy` ([Spark Structured Streaming]({{ book.structured_streaming }}/StatefulAggregationStrategy)) execution planning strategy is requested to plan a logical plan of a streaming aggregation (a streaming query with [Aggregate](logical-operators/Aggregate.md) operator)
87+
88+
## <span id="createStreamingAggregate"> Creating Streaming Aggregate Physical Operator
89+
90+
```scala
91+
createStreamingAggregate(
92+
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
93+
groupingExpressions: Seq[NamedExpression] = Nil,
94+
aggregateExpressions: Seq[AggregateExpression] = Nil,
95+
aggregateAttributes: Seq[Attribute] = Nil,
96+
initialInputBufferOffset: Int = 0,
97+
resultExpressions: Seq[NamedExpression] = Nil,
98+
child: SparkPlan): SparkPlan
99+
```
100+
101+
`createStreamingAggregate` [creates an aggregate physical operator](#createAggregate) (with `isStreaming` flag enabled).
102+
103+
!!! note
104+
`createStreamingAggregate` is exactly [createAggregate](#createAggregate) with `isStreaming` flag enabled.
105+
106+
---
107+
108+
`createStreamingAggregate` is used when:
109+
110+
* `AggUtils` is requested to plan a [regular](#planStreamingAggregation) and [session-windowed](#planStreamingAggregationForSession) streaming aggregation

docs/ObjectAggregationIterator.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* <span id="newMutableProjection"> Function to create a new `MutableProjection` given expressions and attributes (`(Seq[Expression], Seq[Attribute]) => MutableProjection`)
1717
* <span id="originalInputAttributes"> Original Input [Attribute](expressions/Attribute.md)s
1818
* <span id="inputRows"> Input [InternalRow](InternalRow.md)s
19-
* <span id="fallbackCountThreshold"> `fallbackCountThreshold`
19+
* <span id="fallbackCountThreshold"> [spark.sql.objectHashAggregate.sortBased.fallbackThreshold](configuration-properties.md#spark.sql.objectHashAggregate.sortBased.fallbackThreshold)
2020
* <span id="numOutputRows"> `numOutputRows` [SQLMetric](physical-operators/SQLMetric.md)
2121

2222
`ObjectAggregationIterator` is created when:

docs/SQLConf.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,10 @@ Used when:
713713

714714
* [OptimizeSkewedJoin](physical-optimizations/OptimizeSkewedJoin.md) physical optimization is executed
715715

716+
## <span id="objectAggSortBasedFallbackThreshold"><span id="OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD"> objectAggSortBasedFallbackThreshold
717+
718+
[spark.sql.objectHashAggregate.sortBased.fallbackThreshold](configuration-properties.md#spark.sql.objectHashAggregate.sortBased.fallbackThreshold)
719+
716720
## <span id="offHeapColumnVectorEnabled"> offHeapColumnVectorEnabled
717721

718722
[spark.sql.columnVector.offheap.enabled](configuration-properties.md#spark.sql.columnVector.offheap.enabled)

docs/configuration-properties.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ Since: `3.2.0`
5050

5151
Use [SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS](SQLConf.md#ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) method to access the property (in a type-safe way).
5252

53+
## <span id="spark.sql.objectHashAggregate.sortBased.fallbackThreshold"> spark.sql.objectHashAggregate.sortBased.fallbackThreshold
54+
55+
**(internal)** The number of rows of an in-memory hash map (to store aggregation buffer) before [ObjectHashAggregateExec](physical-operators/ObjectHashAggregateExec.md) ([ObjectAggregationIterator](ObjectAggregationIterator.md#processInputs) precisely) falls back to sort-based aggregation
56+
57+
Default: `128`
58+
59+
Use [SQLConf.objectAggSortBasedFallbackThreshold](SQLConf.md#objectAggSortBasedFallbackThreshold) for the current value
60+
5361
## <span id="spark.sql.optimizer.decorrelateInnerQuery.enabled"> spark.sql.optimizer.decorrelateInnerQuery.enabled
5462

5563
**(internal)** Decorrelates inner queries by eliminating correlated references and build domain joins

docs/physical-operators/ObjectHashAggregateExec.md

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,17 @@
22

33
`ObjectHashAggregateExec` is an [aggregate unary physical operator](BaseAggregateExec.md) for **object aggregation**.
44

5-
![ObjectHashAggregateExec in web UI (Details for Query)](../images/ObjectHashAggregateExec-webui-details-for-query.png)
6-
7-
## <span id="supportsAggregate"> Selection Requirements
8-
9-
```scala
10-
supportsAggregate(
11-
aggregateExpressions: Seq[AggregateExpression]): Boolean
12-
```
13-
14-
`supportsAggregate` is enabled (`true`) when there is a `TypedImperativeAggregate` aggregate function among the [AggregateFunction](../expressions/AggregateFunction.md)s of the given [AggregateExpression](../expressions/AggregateExpression.md)s.
15-
16-
`supportsAggregate` is used when:
5+
`ObjectHashAggregateExec` uses [ObjectAggregationIterator](../ObjectAggregationIterator.md) for [aggregation](#doExecute) (one per partition).
176

18-
* `AggUtils` utility is used to [select an aggregate physical operator](../AggUtils.md#createAggregate)
7+
![ObjectHashAggregateExec in web UI (Details for Query)](../images/ObjectHashAggregateExec-webui-details-for-query.png)
198

209
## Creating Instance
2110

2211
`ObjectHashAggregateExec` takes the following to be created:
2312

24-
* <span id="requiredChildDistributionExpressions"> (optional) Required Child Distribution [Expression](../expressions/Expression.md)s
13+
* <span id="requiredChildDistributionExpressions"> Required Child Distribution [Expression](../expressions/Expression.md)s
14+
* [isStreaming](#isStreaming) flag
15+
* <span id="numShufflePartitions"> Number of Shuffle Partitions (always `None`)
2516
* <span id="groupingExpressions"> Grouping [NamedExpression](../expressions/NamedExpression.md)s
2617
* <span id="aggregateExpressions"> [AggregateExpression](../expressions/AggregateExpression.md)s
2718
* <span id="aggregateAttributes"> Aggregate [Attribute](../expressions/Attribute.md)s
@@ -31,14 +22,32 @@ supportsAggregate(
3122

3223
`ObjectHashAggregateExec` is created when:
3324

34-
* `AggUtils` utility is used to [create a physical operator for aggregation](../AggUtils.md#createAggregate)
25+
* `AggUtils` is requested to [create a physical operator for aggregation](../AggUtils.md#createAggregate)
26+
27+
### <span id="isStreaming"> isStreaming Flag
28+
29+
`ObjectHashAggregateExec` is given `isStreaming` flag when [created](#creating-instance).
30+
31+
The `isStreaming` is always `false` but when `AggUtils` is requested to [create a streaming aggregate physical operator](../AggUtils.md#createStreamingAggregate).
3532

3633
## <span id="metrics"> Performance Metrics
3734

38-
Key | Name (in web UI)
39-
----------------|--------------------------
40-
numOutputRows | number of output rows
41-
aggTime | time in aggregation build
35+
### <span id="aggTime"> time in aggregation build
36+
37+
The time to [doExecute](#doExecute) of a single partition.
38+
39+
### <span id="numOutputRows"> number of output rows
40+
41+
* `1` when there is no input rows in a partition and no [groupingExpressions](#groupingExpressions).
42+
* Used to create an [ObjectAggregationIterator](../ObjectAggregationIterator.md#numOutputRows).
43+
44+
### <span id="numTasksFallBacked"> number of sort fallback tasks
45+
46+
Used to create a [ObjectAggregationIterator](../ObjectAggregationIterator.md#numTasksFallBacked).
47+
48+
### <span id="spillSize"> spill size
49+
50+
Used to create a [ObjectAggregationIterator](../ObjectAggregationIterator.md#spillSize).
4251

4352
## <span id="doExecute"> Executing Physical Operator
4453

@@ -48,9 +57,35 @@ doExecute(): RDD[InternalRow]
4857

4958
`doExecute` is part of the [SparkPlan](SparkPlan.md#doExecute) abstraction.
5059

51-
`doExecute` uses [ObjectAggregationIterator](../ObjectAggregationIterator.md) for aggregation (one per partition).
60+
---
61+
62+
`doExecute` requests the [child physical operator](#child) to [execute](SparkPlan.md#execute) (and generate an `RDD[InternalRow]`) that is `mapPartitionsWithIndexInternal` to process partitions.
63+
64+
!!! note
65+
`doExecute` adds a new `MapPartitionsRDD` ([Spark Core]({{ book.spark_core }}/rdd/MapPartitionsRDD)) to the RDD lineage.
66+
67+
For no input records (in a partition) and non-empty [groupingExpressions](#groupingExpressions), `doExecute` returns an empty `Iterator`.
68+
69+
Otherwise, `doExecute` creates a [ObjectAggregationIterator](../ObjectAggregationIterator.md).
5270

53-
`doExecute`...FIXME
71+
For no input records (in a partition) and no [groupingExpressions](#groupingExpressions), `doExecute` increments the [numOutputRows](#numOutputRows) metric (so it's just `1`) and requests the `ObjectAggregationIterator` for [outputForEmptyGroupingKeyWithoutInput](../ObjectAggregationIterator.md#outputForEmptyGroupingKeyWithoutInput).
72+
73+
Otherwise, `doExecute` returns the `ObjectAggregationIterator`.
74+
75+
## <span id="supportsAggregate"> Selection Requirements
76+
77+
```scala
78+
supportsAggregate(
79+
aggregateExpressions: Seq[AggregateExpression]): Boolean
80+
```
81+
82+
`supportsAggregate` is enabled (`true`) when there is a `TypedImperativeAggregate` aggregate function among the [AggregateFunction](../expressions/AggregateFunction.md)s of the given [AggregateExpression](../expressions/AggregateExpression.md)s.
83+
84+
---
85+
86+
`supportsAggregate` is used when:
87+
88+
* `AggUtils` utility is used to [select an aggregate physical operator](../AggUtils.md#createAggregate)
5489

5590
## Demo
5691

0 commit comments

Comments
 (0)