executor: Support parallel distinct agg spill#69591
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughAdds spill-based serialization and deserialization support for DISTINCT aggregate functions (COUNT, AVG, SUM, VAR_POP, GROUP_CONCAT, APPROX_COUNT_DISTINCT) via new SerializePartialResult/DeserializePartialResult/deserializeForSpill methods and shared set helpers, enables parallel hash-agg spill for distinct aggregations, and adds corresponding round-trip and correctness tests. ChangesDistinct aggregation spill support
Estimated code review effort: 4 (Complex) | ~60 minutes Possibly related PRs
Suggested labels: Suggested reviewers: Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.12.2)level=error msg="Running error: context loading failed: failed to load packages: failed to load packages: failed to load with go/packages: context deadline exceeded" Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #69591 +/- ##
================================================
- Coverage 76.3196% 73.8876% -2.4321%
================================================
Files 2041 2047 +6
Lines 562141 588636 +26495
================================================
+ Hits 429024 434929 +5905
- Misses 132210 152449 +20239
- Partials 907 1258 +351
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (3)
pkg/executor/aggfuncs/spill_deserialize_helper.go (1)
254-267: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winDuplicate decimal deserialization logic.
deserializePartialResult4AvgDistinctDecimalanddeserializePartialResult4SumDistinctDecimalare identical. Consider extracting a shareddeserializeDecimalSethelper, mirroringdeserializeInt64Set/deserializeFloat64Set/deserializeStringSetbelow.♻️ Proposed refactor
+func (s *deserializeHelper) deserializeDecimalSet(insert func(key string, val *types.MyDecimal) int64) (bool, int64) { + if s.readRowIndex < s.totalRowCnt { + memDelta := int64(0) + s.pab.Reset(s.column, s.readRowIndex) + for range util.DeserializeInt(s.pab) { + key := util.DeserializeString(s.pab) + val := util.DeserializeMyDecimal(s.pab) + memDelta += insert(key, val.Clone()) + int64(len(key)) + types.MyDecimalStructSize + } + s.readRowIndex++ + return true, memDelta + } + return false, 0 +} + func (s *deserializeHelper) deserializePartialResult4AvgDistinctDecimal(dst *partialResult4AvgDistinctDecimal) (bool, int64) { - if s.readRowIndex < s.totalRowCnt { - memDelta := int64(0) - s.pab.Reset(s.column, s.readRowIndex) - for range util.DeserializeInt(s.pab) { - key := util.DeserializeString(s.pab) - val := util.DeserializeMyDecimal(s.pab) - memDelta += dst.valSet.Insert(key, val.Clone()) + int64(len(key)) + types.MyDecimalStructSize - } - s.readRowIndex++ - return true, memDelta - } - return false, 0 + return s.deserializeDecimalSet(func(key string, val *types.MyDecimal) int64 { + return dst.valSet.Insert(key, val) + }) } func (s *deserializeHelper) deserializePartialResult4SumDistinctDecimal(dst *partialResult4SumDistinctDecimal) (bool, int64) { - if s.readRowIndex < s.totalRowCnt { - memDelta := int64(0) - s.pab.Reset(s.column, s.readRowIndex) - for range util.DeserializeInt(s.pab) { - key := util.DeserializeString(s.pab) - val := util.DeserializeMyDecimal(s.pab) - memDelta += dst.valSet.Insert(key, val.Clone()) + int64(len(key)) + types.MyDecimalStructSize - } - s.readRowIndex++ - return true, memDelta - } - return false, 0 + return s.deserializeDecimalSet(func(key string, val *types.MyDecimal) int64 { + return dst.valSet.Insert(key, val) + }) }Also applies to: 275-288
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/executor/aggfuncs/spill_deserialize_helper.go` around lines 254 - 267, The decimal deserialization logic in deserializePartialResult4AvgDistinctDecimal is duplicated in deserializePartialResult4SumDistinctDecimal; extract the shared behavior into a reusable deserializeDecimalSet helper, following the same pattern used by deserializeInt64Set, deserializeFloat64Set, and deserializeStringSet. Update both decimal partial-result methods to delegate to that helper while keeping the existing memDelta accounting, row indexing, and valSet insertion behavior unchanged.pkg/executor/aggfuncs/spill_serialize_helper.go (1)
156-164: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winDuplicate decimal serialization logic.
serializePartialResult4AvgDistinctDecimalandserializePartialResult4SumDistinctDecimalare identical. Consider factoring out a sharedserializeDecimalSethelper alongsideserializeInt64Set/serializeFloat64Set/serializeStringSet, matching the suggesteddeserializeDecimalSetcounterpart inspill_deserialize_helper.go.♻️ Proposed refactor
+func (s *SerializeHelper) serializeDecimalSet(values map[string]*types.MyDecimal) []byte { + s.buf = s.buf[:0] + s.buf = util.SerializeInt(len(values), s.buf) + for key, val := range values { + s.buf = util.SerializeString(key, s.buf) + s.buf = util.SerializeMyDecimal(val, s.buf) + } + return s.buf +} + func (s *SerializeHelper) serializePartialResult4AvgDistinctDecimal(value partialResult4AvgDistinctDecimal) []byte { - s.buf = s.buf[:0] - s.buf = util.SerializeInt(len(value.valSet.M), s.buf) - for key, val := range value.valSet.M { - s.buf = util.SerializeString(key, s.buf) - s.buf = util.SerializeMyDecimal(val, s.buf) - } - return s.buf + return s.serializeDecimalSet(value.valSet.M) } func (s *SerializeHelper) serializePartialResult4SumDistinctDecimal(value partialResult4SumDistinctDecimal) []byte { - s.buf = s.buf[:0] - s.buf = util.SerializeInt(len(value.valSet.M), s.buf) - for key, val := range value.valSet.M { - s.buf = util.SerializeString(key, s.buf) - s.buf = util.SerializeMyDecimal(val, s.buf) - } - return s.buf + return s.serializeDecimalSet(value.valSet.M) }Note: this requires importing
github.com/pingcap/tidb/pkg/typesin this file if not already imported.Also applies to: 170-178
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/executor/aggfuncs/spill_serialize_helper.go` around lines 156 - 164, `serializePartialResult4AvgDistinctDecimal` and `serializePartialResult4SumDistinctDecimal` duplicate the same decimal-set serialization logic, so factor this into a shared `serializeDecimalSet` helper in `SerializeHelper` alongside the existing `serializeInt64Set`, `serializeFloat64Set`, and `serializeStringSet` helpers. Update both decimal partial-result serializers to call the new helper, and make sure `pkg/executor/aggfuncs/spill_serialize_helper.go` imports `github.com/pingcap/tidb/pkg/types` if needed to match the decimal type usage and the `deserializeDecimalSet` counterpart.pkg/executor/aggregate/agg_hash_executor.go (1)
420-420: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueConsider a brief comment explaining the removed
HasDistinctexclusion.This is a meaningful behavioral change (previously distinct aggregations were excluded from parallel spill tracking); a short comment noting why the exclusion was removed and referencing the new distinct-spill support would help future readers.
📝 Suggested comment
+ // Distinct aggregations are now supported for parallel hash-agg spill, + // so no longer excluded here (see `#64065`). if isTrackerEnabled && isParallelHashAggSpillEnabled {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/executor/aggregate/agg_hash_executor.go` at line 420, Add a brief inline comment near the isTrackerEnabled && isParallelHashAggSpillEnabled check in agg_hash_executor.go explaining that HasDistinct is no longer excluded because parallel spill tracking now supports distinct aggregations; mention the new distinct-spill support so future readers understand the behavior change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/executor/aggfuncs/spill_deserialize_helper.go`:
- Around line 254-267: The decimal deserialization logic in
deserializePartialResult4AvgDistinctDecimal is duplicated in
deserializePartialResult4SumDistinctDecimal; extract the shared behavior into a
reusable deserializeDecimalSet helper, following the same pattern used by
deserializeInt64Set, deserializeFloat64Set, and deserializeStringSet. Update
both decimal partial-result methods to delegate to that helper while keeping the
existing memDelta accounting, row indexing, and valSet insertion behavior
unchanged.
In `@pkg/executor/aggfuncs/spill_serialize_helper.go`:
- Around line 156-164: `serializePartialResult4AvgDistinctDecimal` and
`serializePartialResult4SumDistinctDecimal` duplicate the same decimal-set
serialization logic, so factor this into a shared `serializeDecimalSet` helper
in `SerializeHelper` alongside the existing `serializeInt64Set`,
`serializeFloat64Set`, and `serializeStringSet` helpers. Update both decimal
partial-result serializers to call the new helper, and make sure
`pkg/executor/aggfuncs/spill_serialize_helper.go` imports
`github.com/pingcap/tidb/pkg/types` if needed to match the decimal type usage
and the `deserializeDecimalSet` counterpart.
In `@pkg/executor/aggregate/agg_hash_executor.go`:
- Line 420: Add a brief inline comment near the isTrackerEnabled &&
isParallelHashAggSpillEnabled check in agg_hash_executor.go explaining that
HasDistinct is no longer excluded because parallel spill tracking now supports
distinct aggregations; mention the new distinct-spill support so future readers
understand the behavior change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: f701298c-c598-4e17-bf89-087531e6fdfb
📒 Files selected for processing (12)
pkg/executor/aggfuncs/aggfuncs.gopkg/executor/aggfuncs/func_avg.gopkg/executor/aggfuncs/func_count_distinct.gopkg/executor/aggfuncs/func_group_concat.gopkg/executor/aggfuncs/func_sum.gopkg/executor/aggfuncs/func_varpop.gopkg/executor/aggfuncs/spill_deserialize_helper.gopkg/executor/aggfuncs/spill_helper_test.gopkg/executor/aggfuncs/spill_serialize_helper.gopkg/executor/aggregate/BUILD.bazelpkg/executor/aggregate/agg_hash_executor.gopkg/executor/aggregate/agg_spill_test.go
|
@xzhangxian1008: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #64065
Problem Summary:
What changed and how does it work?
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.
Summary by CodeRabbit
New Features
DISTINCTcalculations can run reliably on larger data sets.Bug Fixes
DISTINCTaggregate results when execution spills to disk.Tests