Skip to content

[SPARK-57851][SQL] Shuffle-free single-task execution for small queries#56928

Open
viirya wants to merge 2 commits into
apache:masterfrom
viirya:single-node-execution
Open

[SPARK-57851][SQL] Shuffle-free single-task execution for small queries#56928
viirya wants to merge 2 commits into
apache:masterfrom
viirya:single-node-execution

Conversation

@viirya

@viirya viirya commented Jul 1, 2026

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

This adds a conservative optimizer rule MarkSingleTaskExecution that marks small single-partition scans, optionally with a shuffle-inducing operator on top (sort, aggregate, distinct, window, limit/offset, expand) or an in-memory LocalRelation, as candidates for single-task execution. Such a scan reports a SinglePartition output partitioning, allowing EnsureRequirements to elide the shuffle that would otherwise be inserted before the operator on top.

Details:

  • The rule runs as the last optimizer batch (so it sees the final plan shape) and marks eligible LogicalRelation/LocalRelation nodes with a TreeNodeTag.
  • FileSourceStrategy/SparkStrategies propagate the mark to FileSourceScanExec/LocalTableScanExec.
  • FileSourceScanExec additionally gates on file count and size thresholds using the generic ScanFileListing, reports SinglePartition, and coalesces its input RDD to a single partition as a correctness backstop when the estimate does not match the runtime partition count.
  • LocalTableScanExec reads its data in a single partition and reports SinglePartition.
  • ExpandExec forwards SinglePartition from its child, since Expand only replicates rows within a partition and never moves rows across partitions.

The feature is controlled by new internal configs under spark.sql.optimizer.singleTaskExecution.* and is disabled by default. Join is intentionally left out for now and can be added as a follow-up; union is already covered by the existing spark.sql.unionOutputPartitioning.

This is part of the SPIP umbrella SPARK-56978 (Faster queries in local laptop mode), covering the shuffle-free local execution for small queries category.

Why are the changes needed?

For small, low-latency queries the fixed cost of a shuffle (scheduling, serialization, network) dominates the total runtime. When the input is already a single small partition, the shuffle inserted before a sort/aggregate/window is unnecessary and can be removed to reduce latency, without affecting correctness.

Does this PR introduce any user-facing change?

No. The optimization is behind internal configs (spark.sql.optimizer.singleTaskExecution.*) and is disabled by default.

How was this patch tested?

New MarkSingleTaskExecutionSuite (14 tests) covering:

  • the marking decision for the supported plan shapes;
  • SinglePartition output with no shuffle in the final physical plan;
  • empty-scan correctness (a global aggregation over an empty scan still returns a single row);
  • disabled-flag negatives (master flag and per-operator sub-flags);
  • ineligibility of unsupported shapes (join) and subquery expressions;
  • the leaf-node parallelism override disabling the local-relation case.

SQLConfSuite passes as a config-wiring regression check.

Was this patch authored or co-authored using generative AI tooling?

Yes, using Claude Code.

@viirya viirya requested a review from dtenedor July 1, 2026 15:23
### What changes were proposed in this pull request?

This adds a conservative optimizer rule `MarkSingleTaskExecution` that marks
small single-partition scans, optionally with a shuffle-inducing operator on
top (sort, aggregate, distinct, window, limit/offset, expand) or an in-memory
`LocalRelation`, as candidates for single-task execution. Such a scan reports a
`SinglePartition` output partitioning, allowing `EnsureRequirements` to elide
the shuffle that would otherwise be inserted before the operator on top.

The rule runs as the last optimizer batch and marks eligible
`LogicalRelation`/`LocalRelation` nodes with a `TreeNodeTag`. The planning
strategies propagate the mark to `FileSourceScanExec`/`LocalTableScanExec`.
`FileSourceScanExec` additionally gates on file count and size thresholds using
the generic `ScanFileListing`, reports `SinglePartition`, and coalesces its
input RDD to a single partition as a correctness backstop. `ExpandExec`
forwards `SinglePartition` from its child, since Expand never moves rows across
partitions.

The feature is controlled by new internal configs under
`spark.sql.optimizer.singleTaskExecution.*` and is disabled by default. Join is
intentionally left out for now; union is already covered by the existing
`spark.sql.unionOutputPartitioning`.

This is part of the SPIP umbrella SPARK-56978 (Faster queries in local laptop
mode), covering the shuffle-free local execution category.

### Why are the changes needed?

For small, low-latency queries the fixed cost of a shuffle (scheduling,
serialization, network) dominates. When the input is already a single small
partition, the shuffle before a sort/aggregate/window is unnecessary and can be
removed to reduce latency.

### Does this PR introduce _any_ user-facing change?

No. The optimization is behind internal configs and is disabled by default.

### How was this patch tested?

New `MarkSingleTaskExecutionSuite` (14 tests) covering the marking decision,
`SinglePartition` output with no shuffle, empty-scan correctness, disabled-flag
negatives, join/subquery ineligibility, and the leaf-parallelism override.
`SQLConfSuite` passes as a config-wiring regression check.

Co-authored-by: Isaac
@viirya viirya force-pushed the single-node-execution branch from e2fff1a to 7e18780 Compare July 1, 2026 15:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant