Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7350,6 +7350,112 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val SINGLE_TASK_EXECUTION_ENABLED =
buildConf("spark.sql.optimizer.singleTaskExecution.enabled")
.doc("When true, eligible query fragments that read a small single-partition scan can run " +
"in a single task, skipping the shuffle that would otherwise be inserted before an " +
"operator such as a sort or aggregation. This avoids the scheduling overhead of an " +
"unnecessary shuffle for small, low-latency queries.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.booleanConf
.createWithDefault(false)

val SINGLE_TASK_EXECUTION_AGGREGATION =
buildConf("spark.sql.optimizer.singleTaskExecution.aggregation")
.internal()
.doc("When true, and 'spark.sql.optimizer.singleTaskExecution.enabled' is also true, " +
"enable the single-task optimization for query plans with aggregation operators.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.fallbackConf(SINGLE_TASK_EXECUTION_ENABLED)

val SINGLE_TASK_EXECUTION_EXPAND =
buildConf("spark.sql.optimizer.singleTaskExecution.expand")
.internal()
.doc("When true, and 'spark.sql.optimizer.singleTaskExecution.enabled' is also true, " +
"enable the single-task optimization for query plans with expand operators.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.fallbackConf(SINGLE_TASK_EXECUTION_ENABLED)

val SINGLE_TASK_EXECUTION_LIMIT_OFFSET =
buildConf("spark.sql.optimizer.singleTaskExecution.limitOffset")
.internal()
.doc("When true, and 'spark.sql.optimizer.singleTaskExecution.enabled' is also true, " +
"enable the single-task optimization for query plans with limit or offset operators.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.fallbackConf(SINGLE_TASK_EXECUTION_ENABLED)

val SINGLE_TASK_EXECUTION_SORT =
buildConf("spark.sql.optimizer.singleTaskExecution.sort")
.internal()
.doc("When true, and 'spark.sql.optimizer.singleTaskExecution.enabled' is also true, " +
"enable the single-task optimization for query plans with sort operators.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.fallbackConf(SINGLE_TASK_EXECUTION_ENABLED)

val SINGLE_TASK_EXECUTION_WINDOW =
buildConf("spark.sql.optimizer.singleTaskExecution.window")
.internal()
.doc("When true, and 'spark.sql.optimizer.singleTaskExecution.enabled' is also true, " +
"enable the single-task optimization for query plans with window operators.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.fallbackConf(SINGLE_TASK_EXECUTION_ENABLED)

val SINGLE_TASK_EXECUTION_MAX_NUM_FILES =
buildConf("spark.sql.optimizer.singleTaskExecution.maxNumFiles")
.internal()
.doc("The maximum number of files that a file scan may have for the single-task " +
"optimization to apply to it.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.intConf
.createWithDefault(1)

val SINGLE_TASK_EXECUTION_MIN_NUM_FILES =
buildConf("spark.sql.optimizer.singleTaskExecution.minNumFiles")
.internal()
.doc("The minimum number of files that a file scan may have for the single-task " +
"optimization to apply to it.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.intConf
.createWithDefault(1)

val SINGLE_TASK_EXECUTION_MIN_NUM_BYTES =
buildConf("spark.sql.optimizer.singleTaskExecution.minNumBytes")
.internal()
.doc("The minimum total size in bytes that a file scan may have for the single-task " +
"optimization to apply to it.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.longConf
.createWithDefault(1)

val SINGLE_TASK_EXECUTION_LOCAL_TABLE_SCAN_MIN_ROWS =
buildConf("spark.sql.optimizer.singleTaskExecution.localTableScan.minRows")
.internal()
.doc("The minimum number of rows that a local in-memory relation may have for the " +
"single-task optimization to apply to it.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.intConf
.createWithDefault(1)

val SINGLE_TASK_EXECUTION_LOCAL_TABLE_SCAN_THRESHOLD =
buildConf("spark.sql.optimizer.singleTaskExecution.localTableScan.threshold")
.internal()
.doc("The maximum number of rows that a local in-memory relation may have for the " +
"single-task optimization to apply to it.")
.version("4.3.0")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.intConf
.createWithDefault(1000)

val LEGACY_PARSE_QUERY_WITHOUT_EOF = buildConf("spark.sql.legacy.parseQueryWithoutEof")
.internal()
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder)
}
}
dataframe.queryExecution.executedPlan match {
case LocalTableScanExec(_, rows, _) =>
case LocalTableScanExec(_, rows, _, _) =>
executePlan.eventsManager.postFinished(Some(rows.length))
var offset = 0L
converter(rows.iterator).foreach { case (bytes, count) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow, TableIdent
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -320,6 +320,34 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper {
def requiredSchema: StructType
// Identifier for the table in the metastore.
def tableIdentifier: Option[TableIdentifier]
// When true, the `MarkSingleTaskExecution` optimizer rule has marked this scan's plan shape as a
// candidate for single-task execution. The scan is only actually executed in a single task when
// it additionally passes the file count and size thresholds (see `useSingleTaskExecution`).
def markedForSingleTaskExecution: Boolean

/**
* Whether this file scan should run in a single task, reporting a `SinglePartition` output
* partitioning so that a following shuffle can be elided. This is true when the plan shape was
* marked eligible by the optimizer and the statically-selected files fall within the configured
* count and size bounds. Bucketed scans are excluded: they report a `HashPartitioning` over the
* bucket columns, which coalescing to a single partition would invalidate. It relies on
* `selectedPartitions`, so it must not be evaluated before the scan's file listing is available.
*/
lazy val useSingleTaskExecution: Boolean = {
if (!markedForSingleTaskExecution || bucketedScan) {
false
} else {
val sqlConf = getSqlConf(relation.sparkSession)
val minNumFiles = sqlConf.getConf(SQLConf.SINGLE_TASK_EXECUTION_MIN_NUM_FILES)
val maxNumFiles = sqlConf.getConf(SQLConf.SINGLE_TASK_EXECUTION_MAX_NUM_FILES)
val minNumBytes = sqlConf.getConf(SQLConf.SINGLE_TASK_EXECUTION_MIN_NUM_BYTES)
val maxPartitionBytes = sqlConf.getConf(SQLConf.FILES_MAX_PARTITION_BYTES)
val numFiles = selectedPartitions.totalNumberOfFiles
val numBytes = selectedPartitions.totalFileSize
numFiles >= minNumFiles && numFiles <= maxNumFiles &&
numBytes >= minNumBytes && numBytes <= maxPartitionBytes

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In isLocalRelationEligible, get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM) is considered in a way. Do we need to consider SQLConf.LEAF_NODE_DEFAULT_PARALLELISM here too?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, file scans should respect it too. Moved the check up to apply() so the whole rule is skipped when a leaf-node parallelism override is set.

}
}


lazy val fileConstantMetadataColumns: Seq[AttributeReference] = output.collect {
Expand Down Expand Up @@ -478,6 +506,8 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper {
Nil
}
(partitioning, sortOrder)
} else if (useSingleTaskExecution) {
(SinglePartition, Nil)
} else {
(UnknownPartitioning(0), Nil)
}
Expand Down Expand Up @@ -696,7 +726,8 @@ case class FileSourceScanExec(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
override val disableBucketedScan: Boolean = false)
override val disableBucketedScan: Boolean = false,
override val markedForSingleTaskExecution: Boolean = false)
extends FileSourceScanLike {

// Note that some vals referring the file-based relation are lazy intentionally
Expand Down Expand Up @@ -744,10 +775,28 @@ case class FileSourceScanExec(
inputRDD :: Nil
}

/**
* The input RDD, coalesced to a single partition when this scan runs in single-task mode. This
* enforces the `SinglePartition` output partitioning reported by `outputPartitioning`, which is
* estimated from the statically-selected files and may not correspond exactly to the number of
* partitions the input RDD produces after dynamic pruning. Coalescing here keeps the query
* correct in either case.
*/
private[spark] lazy val maybeCoalesceInputRDD: RDD[InternalRow] = {
if (useSingleTaskExecution && inputRDD.getNumPartitions > 1) {
inputRDD.coalesce(1)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this consistent with outputPartitioning code path? In outputPartitioning code, bucketedScan is handled before useSingleTaskExecution.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — the combination was inconsistent: a marked bucketed scan would advertise HashPartitioning while maybeCoalesceInputRDD coalesced it to one partition. useSingleTaskExecution now returns false for bucketed scans, which keeps both code paths consistent by construction. Added a test.

} else if (useSingleTaskExecution && inputRDD.getNumPartitions == 0) {
// All files were pruned away; produce a single empty partition to match `SinglePartition`.
sparkContext.parallelize[InternalRow](Nil, 1)
} else {
inputRDD
}
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
if (needsUnsafeRowConversion) {
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
maybeCoalesceInputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
val toUnsafe = UnsafeProjection.create(schema)
toUnsafe.initialize(index)
iter.map { row =>
Expand All @@ -756,7 +805,7 @@ case class FileSourceScanExec(
}
}
} else {
inputRDD.mapPartitionsInternal { iter =>
maybeCoalesceInputRDD.mapPartitionsInternal { iter =>
iter.map { row =>
numOutputRows += 1
row
Expand All @@ -768,7 +817,7 @@ case class FileSourceScanExec(
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
val scanTime = longMetric("scanTime")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches =>
maybeCoalesceInputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches =>
new Iterator[ColumnarBatch] {

override def hasNext: Boolean = {
Expand Down Expand Up @@ -921,7 +970,8 @@ case class FileSourceScanExec(
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None,
disableBucketedScan)
disableBucketedScan,
markedForSingleTaskExecution)
}

override def getStream: Option[SparkDataStream] = stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -36,15 +36,28 @@ import org.apache.spark.sql.internal.SQLConf
case class ExpandExec(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: SparkPlan)
child: SparkPlan,
// When true, this Expand is part of a plan marked for single-task execution by the
// `MarkSingleTaskExecution` optimizer rule, and forwards the child's `SinglePartition`
// output partitioning (see `outputPartitioning`).
useSingleTask: Boolean = false)
extends UnaryExecNode with CodegenSupport {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

// The GroupExpressions can output data with arbitrary partitioning, so set it
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
// as UNKNOWN partitioning. Expand only replicates rows within a partition and never moves rows
// across partitions, so when this Expand is part of a plan marked for single-task execution
// and the child produces a single partition, we can forward the `SinglePartition` property to
// avoid an unneeded shuffle.
override def outputPartitioning: Partitioning = {
if (useSingleTask && child.outputPartitioning == SinglePartition) {
SinglePartition
} else {
UnknownPartitioning(0)
}
}

@transient
override lazy val references: AttributeSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.ArrayImplicits._
Expand All @@ -34,7 +35,10 @@ import org.apache.spark.util.ArrayImplicits._
case class LocalTableScanExec(
output: Seq[Attribute],
@transient rows: Seq[InternalRow],
@transient stream: Option[SparkDataStream])
@transient stream: Option[SparkDataStream],
// When true, the relation is scanned in a single partition, so this node reports a
// `SinglePartition` output partitioning. Set by the `MarkSingleTaskExecution` optimizer rule.
useSingleTask: Boolean = false)
extends LeafExecNode
with StreamSourceAwareSparkPlan
with InputRDDCodegen {
Expand All @@ -53,14 +57,33 @@ case class LocalTableScanExec(

@transient private lazy val rdd: RDD[InternalRow] = {
if (rows.isEmpty) {
sparkContext.emptyRDD
if (useSingleTask) {
// Produce a single empty partition to match the `SinglePartition` reported by
// `outputPartitioning`. `emptyRDD` has zero partitions, and running e.g. a global
// aggregation on a zero-partition RDD with the shuffle elided would return no rows
// instead of the single row expected on empty input.
sparkContext.parallelize(Seq.empty[InternalRow], 1)
} else {
sparkContext.emptyRDD
}
} else {
val numSlices = math.min(
unsafeRows.length, session.leafNodeDefaultParallelism)
val numSlices = if (useSingleTask) {
1
} else {
math.min(unsafeRows.length, session.leafNodeDefaultParallelism)
}
sparkContext.parallelize(unsafeRows.toImmutableArraySeq, numSlices)
}
}

override def outputPartitioning: Partitioning = {
if (useSingleTask) {
SinglePartition

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When rows.isEmpty=true, we need to return 0 partition instead of SinglePartition, don't we?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this was a real correctness issue. The advertised SinglePartition did not match the zero-partition emptyRDD: with the shuffle elided, a global aggregation over an empty marked relation ran zero tasks and returned no rows instead of the single row expected on empty input. Fixed by producing one empty partition when the scan is marked, matching how maybeCoalesceInputRDD handles the all-files-pruned case in FileSourceScanExec, and added a test that failed before the fix.

} else {
UnknownPartitioning(0)
}
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
rdd.map { r =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.datasources.{PruneFileSourcePartitions, PushVariantIntoScan, SchemaPruning, V1Writes}
import org.apache.spark.sql.execution.datasources.{MarkSingleTaskExecution, PruneFileSourcePartitions, PushVariantIntoScan, SchemaPruning, V1Writes}
import org.apache.spark.sql.execution.datasources.v2.{GroupBasedRowLevelOperationScanPlanning, OptimizeMetadataOnlyDeleteFromTable, V2ScanPartitioningAndOrdering, V2ScanRelationPushDown, V2Writes}
import org.apache.spark.sql.execution.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning, RowLevelOperationRuntimeGroupFiltering}
import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs, ExtractPythonUDTFs}
Expand Down Expand Up @@ -100,7 +100,10 @@ class SparkOptimizer(
ConstantFolding,
EliminateLimits),
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*),
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition)))
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition),
// Must run last: it inspects the final plan shape to mark scans that can run in a single task,
// and no subsequent rule should reshape the plan or copy the marked scan nodes.
Batch("MarkSingleTaskExecution", Once, MarkSingleTaskExecution)))

override def nonExcludableRules: Seq[String] = super.nonExcludableRules ++
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case f: logical.TypedFilter =>
execution.FilterExec(f.typedCondition(f.deserializer), planLater(f.child)) :: Nil
case e @ logical.Expand(_, _, child) =>
execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
val useSingleTask = e.getTagValue(
datasources.MarkSingleTaskExecution.markTag).getOrElse(false)
execution.ExpandExec(e.projections, e.output, planLater(child), useSingleTask) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child, sampleMethod) =>
if (sampleMethod == logical.SampleMethod.System) {
// V2ScanRelationPushDown is non-excludable and always handles SYSTEM samples
Expand All @@ -1161,8 +1163,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
"TABLESAMPLE SYSTEM node was not properly handled by V2ScanRelationPushDown.")
}
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data, _, stream) =>
LocalTableScanExec(output, data, stream) :: Nil
case r @ logical.LocalRelation(output, data, _, stream) =>
val useSingleTask = r.getTagValue(
datasources.MarkSingleTaskExecution.markTag).getOrElse(false)
LocalTableScanExec(output, data, stream, useSingleTask) :: Nil
case logical.EmptyRelation(l) => EmptyRelationExec(l) :: Nil
case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil
// We should match the combination of limit and offset first, to get the optimal physical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
bucketSet,
None,
rebindFileSourceMetadataAttributesInFilters(expandedDataFilters),
table.map(_.identifier))
table.map(_.identifier),
markedForSingleTaskExecution =
l.getTagValue(MarkSingleTaskExecution.markTag).getOrElse(false))

// extra Project node: wrap flat metadata columns to a metadata struct
val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
Expand Down
Loading