Skip to content

executor: Support parallel distinct agg spill#69591

Open
xzhangxian1008 wants to merge 2 commits into
pingcap:masterfrom
xzhangxian1008:agg-distinct-spill
Open

executor: Support parallel distinct agg spill#69591
xzhangxian1008 wants to merge 2 commits into
pingcap:masterfrom
xzhangxian1008:agg-distinct-spill

Conversation

@xzhangxian1008

@xzhangxian1008 xzhangxian1008 commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

What problem does this PR solve?

Issue Number: close #64065

Problem Summary:

What changed and how does it work?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No need to test
    • I checked and no code files have been changed.

Side effects

  • Performance regression: Consumes more CPU
  • Performance regression: Consumes more Memory
  • Breaking backward compatibility

Documentation

  • Affects user behaviors
  • Contains syntax changes
  • Contains variable changes
  • Contains experimental features
  • Changes MySQL compatibility

Release note

Please refer to Release Notes Language Style Guide to write a quality release note.

None

Summary by CodeRabbit

  • New Features

    • Improved spill support for aggregation so more DISTINCT calculations can run reliably on larger data sets.
    • Added support for additional statistical aggregations, including standard deviation and variance variants.
    • Expanded spill handling for approximate distinct counts and grouped string concatenation.
  • Bug Fixes

    • Improved correctness for DISTINCT aggregate results when execution spills to disk.
    • Enhanced restoration of partial aggregation state after spill, reducing the risk of incorrect results or failures under memory pressure.
  • Tests

    • Added broader validation for spill behavior and result accuracy across multiple distinct aggregation types.

@ti-chi-bot ti-chi-bot Bot added the release-note-none Denotes a PR that doesn't merit a release note. label Jul 2, 2026
@ti-chi-bot

ti-chi-bot Bot commented Jul 2, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign xuhuaiyu for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jul 2, 2026
@coderabbitai

coderabbitai Bot commented Jul 2, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Adds spill-based serialization and deserialization support for DISTINCT aggregate functions (COUNT, AVG, SUM, VAR_POP, GROUP_CONCAT, APPROX_COUNT_DISTINCT) via new SerializePartialResult/DeserializePartialResult/deserializeForSpill methods and shared set helpers, enables parallel hash-agg spill for distinct aggregations, and adds corresponding round-trip and correctness tests.

Changes

Distinct aggregation spill support

Layer / File(s) Summary
Core serialize/deserialize helper infrastructure
pkg/executor/aggfuncs/spill_serialize_helper.go, pkg/executor/aggfuncs/spill_deserialize_helper.go
Adds SerializeHelper/deserializeHelper methods for distinct partial results across count/avg/sum/var_pop/approx_count_distinct/group_concat, plus shared serialize/deserializeInt64Set, Float64Set, StringSet helpers and decimal memory-delta handling via types.MyDecimalStructSize.
AVG DISTINCT spill wiring
pkg/executor/aggfuncs/func_avg.go
Adds SerializePartialResult, DeserializePartialResult, deserializeForSpill for baseAvgDistinct4Decimal and baseAvgDistinct4Float64.
COUNT DISTINCT / approx-count-distinct spill wiring
pkg/executor/aggfuncs/func_count_distinct.go
Adds the same three spill methods for int/real/decimal/duration/string/multi-arg distinct counts and baseApproxCountDistinct.
SUM, VAR_POP, GROUP_CONCAT DISTINCT spill wiring & type assertions
pkg/executor/aggfuncs/func_sum.go, pkg/executor/aggfuncs/func_varpop.go, pkg/executor/aggfuncs/func_group_concat.go, pkg/executor/aggfuncs/aggfuncs.go
Adds spill methods for baseSumDistinct4Float64/Decimal, varPopOriginal4DistinctFloat64, baseGroupConcatDistinct4String, plus compile-time AggFunc assertions for STDDEV_POP/STDDEV_SAMP/VAR_POP/VAR_SAMP.
Enable parallel spill tracking for distinct hash aggregation
pkg/executor/aggregate/agg_hash_executor.go, pkg/executor/aggregate/BUILD.bazel
Removes the !e.HasDistinct guard in initForParallelExec so spill tracking applies to distinct aggregations; updates test shard count from 3 to 4.
Round-trip and correctness tests
pkg/executor/aggfuncs/spill_helper_test.go, pkg/executor/aggregate/agg_spill_test.go
Adds TestPartialResult4DistinctAgg with serialize/deserialize round-trip helpers, and TestDistinctAggGetCorrectResult with a distinct hash-agg executor builder, schema helpers, and tolerance-based result checking.

Estimated code review effort: 4 (Complex) | ~60 minutes

Possibly related PRs

  • pingcap/tidb#64686: Builds directly on the same distinct partial-result spill serialization/deserialization structures added here.

Suggested labels: approved, lgtm

Suggested reviewers: gengliqi, wshwsh12, windtalker

Poem

🐰 With buffers packed and sets serialized,
Distinct results now spill, memorized,
Count, sum, and avg hop to disk with grace,
Var_pop and group_concat join the race,
A rabbit cheers—no memory overflow in this place!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The template is mostly intact, but the Problem Summary and What changed/how it works sections are empty, so the description is incomplete. Add a short problem summary and implementation overview, and include any key testing details or release-note context.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: enabling distinct aggregation spill support in parallel executor paths.
Linked Issues check ✅ Passed The code adds spill/serialization support and executor test coverage for distinct aggregates in parallel hash agg, matching #64065.
Out of Scope Changes check ✅ Passed All code changes appear related to distinct aggregation spill and parallel hash agg support; no clear unrelated additions stand out.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.12.2)

level=error msg="Running error: context loading failed: failed to load packages: failed to load packages: failed to load with go/packages: context deadline exceeded"
level=error msg="Timeout exceeded: try increasing it by passing --timeout option"


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@codecov

codecov Bot commented Jul 2, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 0.27701% with 360 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.8876%. Comparing base (3adc8ec) to head (263162c).
⚠️ Report is 79 commits behind head on master.

Additional details and impacted files
@@               Coverage Diff                @@
##             master     #69591        +/-   ##
================================================
- Coverage   76.3196%   73.8876%   -2.4321%     
================================================
  Files          2041       2047         +6     
  Lines        562141     588636     +26495     
================================================
+ Hits         429024     434929      +5905     
- Misses       132210     152449     +20239     
- Partials        907       1258       +351     
Flag Coverage Δ
integration 40.8874% <0.2770%> (+1.2295%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
dumpling 60.4610% <ø> (ø)
parser ∅ <ø> (∅)
br 44.7563% <ø> (-18.0132%) ⬇️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
pkg/executor/aggfuncs/spill_deserialize_helper.go (1)

254-267: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Duplicate decimal deserialization logic.

deserializePartialResult4AvgDistinctDecimal and deserializePartialResult4SumDistinctDecimal are identical. Consider extracting a shared deserializeDecimalSet helper, mirroring deserializeInt64Set/deserializeFloat64Set/deserializeStringSet below.

♻️ Proposed refactor
+func (s *deserializeHelper) deserializeDecimalSet(insert func(key string, val *types.MyDecimal) int64) (bool, int64) {
+	if s.readRowIndex < s.totalRowCnt {
+		memDelta := int64(0)
+		s.pab.Reset(s.column, s.readRowIndex)
+		for range util.DeserializeInt(s.pab) {
+			key := util.DeserializeString(s.pab)
+			val := util.DeserializeMyDecimal(s.pab)
+			memDelta += insert(key, val.Clone()) + int64(len(key)) + types.MyDecimalStructSize
+		}
+		s.readRowIndex++
+		return true, memDelta
+	}
+	return false, 0
+}
+
 func (s *deserializeHelper) deserializePartialResult4AvgDistinctDecimal(dst *partialResult4AvgDistinctDecimal) (bool, int64) {
-	if s.readRowIndex < s.totalRowCnt {
-		memDelta := int64(0)
-		s.pab.Reset(s.column, s.readRowIndex)
-		for range util.DeserializeInt(s.pab) {
-			key := util.DeserializeString(s.pab)
-			val := util.DeserializeMyDecimal(s.pab)
-			memDelta += dst.valSet.Insert(key, val.Clone()) + int64(len(key)) + types.MyDecimalStructSize
-		}
-		s.readRowIndex++
-		return true, memDelta
-	}
-	return false, 0
+	return s.deserializeDecimalSet(func(key string, val *types.MyDecimal) int64 {
+		return dst.valSet.Insert(key, val)
+	})
 }
 
 func (s *deserializeHelper) deserializePartialResult4SumDistinctDecimal(dst *partialResult4SumDistinctDecimal) (bool, int64) {
-	if s.readRowIndex < s.totalRowCnt {
-		memDelta := int64(0)
-		s.pab.Reset(s.column, s.readRowIndex)
-		for range util.DeserializeInt(s.pab) {
-			key := util.DeserializeString(s.pab)
-			val := util.DeserializeMyDecimal(s.pab)
-			memDelta += dst.valSet.Insert(key, val.Clone()) + int64(len(key)) + types.MyDecimalStructSize
-		}
-		s.readRowIndex++
-		return true, memDelta
-	}
-	return false, 0
+	return s.deserializeDecimalSet(func(key string, val *types.MyDecimal) int64 {
+		return dst.valSet.Insert(key, val)
+	})
 }

Also applies to: 275-288

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/executor/aggfuncs/spill_deserialize_helper.go` around lines 254 - 267,
The decimal deserialization logic in deserializePartialResult4AvgDistinctDecimal
is duplicated in deserializePartialResult4SumDistinctDecimal; extract the shared
behavior into a reusable deserializeDecimalSet helper, following the same
pattern used by deserializeInt64Set, deserializeFloat64Set, and
deserializeStringSet. Update both decimal partial-result methods to delegate to
that helper while keeping the existing memDelta accounting, row indexing, and
valSet insertion behavior unchanged.
pkg/executor/aggfuncs/spill_serialize_helper.go (1)

156-164: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Duplicate decimal serialization logic.

serializePartialResult4AvgDistinctDecimal and serializePartialResult4SumDistinctDecimal are identical. Consider factoring out a shared serializeDecimalSet helper alongside serializeInt64Set/serializeFloat64Set/serializeStringSet, matching the suggested deserializeDecimalSet counterpart in spill_deserialize_helper.go.

♻️ Proposed refactor
+func (s *SerializeHelper) serializeDecimalSet(values map[string]*types.MyDecimal) []byte {
+	s.buf = s.buf[:0]
+	s.buf = util.SerializeInt(len(values), s.buf)
+	for key, val := range values {
+		s.buf = util.SerializeString(key, s.buf)
+		s.buf = util.SerializeMyDecimal(val, s.buf)
+	}
+	return s.buf
+}
+
 func (s *SerializeHelper) serializePartialResult4AvgDistinctDecimal(value partialResult4AvgDistinctDecimal) []byte {
-	s.buf = s.buf[:0]
-	s.buf = util.SerializeInt(len(value.valSet.M), s.buf)
-	for key, val := range value.valSet.M {
-		s.buf = util.SerializeString(key, s.buf)
-		s.buf = util.SerializeMyDecimal(val, s.buf)
-	}
-	return s.buf
+	return s.serializeDecimalSet(value.valSet.M)
 }
 
 func (s *SerializeHelper) serializePartialResult4SumDistinctDecimal(value partialResult4SumDistinctDecimal) []byte {
-	s.buf = s.buf[:0]
-	s.buf = util.SerializeInt(len(value.valSet.M), s.buf)
-	for key, val := range value.valSet.M {
-		s.buf = util.SerializeString(key, s.buf)
-		s.buf = util.SerializeMyDecimal(val, s.buf)
-	}
-	return s.buf
+	return s.serializeDecimalSet(value.valSet.M)
 }

Note: this requires importing github.com/pingcap/tidb/pkg/types in this file if not already imported.

Also applies to: 170-178

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/executor/aggfuncs/spill_serialize_helper.go` around lines 156 - 164,
`serializePartialResult4AvgDistinctDecimal` and
`serializePartialResult4SumDistinctDecimal` duplicate the same decimal-set
serialization logic, so factor this into a shared `serializeDecimalSet` helper
in `SerializeHelper` alongside the existing `serializeInt64Set`,
`serializeFloat64Set`, and `serializeStringSet` helpers. Update both decimal
partial-result serializers to call the new helper, and make sure
`pkg/executor/aggfuncs/spill_serialize_helper.go` imports
`github.com/pingcap/tidb/pkg/types` if needed to match the decimal type usage
and the `deserializeDecimalSet` counterpart.
pkg/executor/aggregate/agg_hash_executor.go (1)

420-420: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low value

Consider a brief comment explaining the removed HasDistinct exclusion.

This is a meaningful behavioral change (previously distinct aggregations were excluded from parallel spill tracking); a short comment noting why the exclusion was removed and referencing the new distinct-spill support would help future readers.

📝 Suggested comment
+	// Distinct aggregations are now supported for parallel hash-agg spill,
+	// so no longer excluded here (see `#64065`).
 	if isTrackerEnabled && isParallelHashAggSpillEnabled {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/executor/aggregate/agg_hash_executor.go` at line 420, Add a brief inline
comment near the isTrackerEnabled && isParallelHashAggSpillEnabled check in
agg_hash_executor.go explaining that HasDistinct is no longer excluded because
parallel spill tracking now supports distinct aggregations; mention the new
distinct-spill support so future readers understand the behavior change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@pkg/executor/aggfuncs/spill_deserialize_helper.go`:
- Around line 254-267: The decimal deserialization logic in
deserializePartialResult4AvgDistinctDecimal is duplicated in
deserializePartialResult4SumDistinctDecimal; extract the shared behavior into a
reusable deserializeDecimalSet helper, following the same pattern used by
deserializeInt64Set, deserializeFloat64Set, and deserializeStringSet. Update
both decimal partial-result methods to delegate to that helper while keeping the
existing memDelta accounting, row indexing, and valSet insertion behavior
unchanged.

In `@pkg/executor/aggfuncs/spill_serialize_helper.go`:
- Around line 156-164: `serializePartialResult4AvgDistinctDecimal` and
`serializePartialResult4SumDistinctDecimal` duplicate the same decimal-set
serialization logic, so factor this into a shared `serializeDecimalSet` helper
in `SerializeHelper` alongside the existing `serializeInt64Set`,
`serializeFloat64Set`, and `serializeStringSet` helpers. Update both decimal
partial-result serializers to call the new helper, and make sure
`pkg/executor/aggfuncs/spill_serialize_helper.go` imports
`github.com/pingcap/tidb/pkg/types` if needed to match the decimal type usage
and the `deserializeDecimalSet` counterpart.

In `@pkg/executor/aggregate/agg_hash_executor.go`:
- Line 420: Add a brief inline comment near the isTrackerEnabled &&
isParallelHashAggSpillEnabled check in agg_hash_executor.go explaining that
HasDistinct is no longer excluded because parallel spill tracking now supports
distinct aggregations; mention the new distinct-spill support so future readers
understand the behavior change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f701298c-c598-4e17-bf89-087531e6fdfb

📥 Commits

Reviewing files that changed from the base of the PR and between 5663a07 and 263162c.

📒 Files selected for processing (12)
  • pkg/executor/aggfuncs/aggfuncs.go
  • pkg/executor/aggfuncs/func_avg.go
  • pkg/executor/aggfuncs/func_count_distinct.go
  • pkg/executor/aggfuncs/func_group_concat.go
  • pkg/executor/aggfuncs/func_sum.go
  • pkg/executor/aggfuncs/func_varpop.go
  • pkg/executor/aggfuncs/spill_deserialize_helper.go
  • pkg/executor/aggfuncs/spill_helper_test.go
  • pkg/executor/aggfuncs/spill_serialize_helper.go
  • pkg/executor/aggregate/BUILD.bazel
  • pkg/executor/aggregate/agg_hash_executor.go
  • pkg/executor/aggregate/agg_spill_test.go

@ti-chi-bot

ti-chi-bot Bot commented Jul 2, 2026

Copy link
Copy Markdown

@xzhangxian1008: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
idc-jenkins-ci-tidb/check_dev_2 263162c link true /test check-dev2
pull-unit-test-next-gen 263162c link true /test pull-unit-test-next-gen

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support distinct aggregation function for parallel hash aggregation

1 participant