-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57851][SQL] Shuffle-free single-task execution for small queries #56928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7e18780
eec3110
98796e2
0ff263a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow, TableIdent | |
| import org.apache.spark.sql.catalyst.catalog.BucketSpec | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
| import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} | ||
| import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition, UnknownPartitioning} | ||
| import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap} | ||
| import org.apache.spark.sql.connector.read.streaming.SparkDataStream | ||
| import org.apache.spark.sql.errors.QueryExecutionErrors | ||
|
|
@@ -320,6 +320,34 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper { | |
| def requiredSchema: StructType | ||
| // Identifier for the table in the metastore. | ||
| def tableIdentifier: Option[TableIdentifier] | ||
| // When true, the `MarkSingleTaskExecution` optimizer rule has marked this scan's plan shape as a | ||
| // candidate for single-task execution. The scan is only actually executed in a single task when | ||
| // it additionally passes the file count and size thresholds (see `useSingleTaskExecution`). | ||
| def markedForSingleTaskExecution: Boolean | ||
|
|
||
| /** | ||
| * Whether this file scan should run in a single task, reporting a `SinglePartition` output | ||
| * partitioning so that a following shuffle can be elided. This is true when the plan shape was | ||
| * marked eligible by the optimizer and the statically-selected files fall within the configured | ||
| * count and size bounds. Bucketed scans are excluded: they report a `HashPartitioning` over the | ||
| * bucket columns, which coalescing to a single partition would invalidate. It relies on | ||
| * `selectedPartitions`, so it must not be evaluated before the scan's file listing is available. | ||
| */ | ||
| lazy val useSingleTaskExecution: Boolean = { | ||
| if (!markedForSingleTaskExecution || bucketedScan) { | ||
| false | ||
| } else { | ||
| val sqlConf = getSqlConf(relation.sparkSession) | ||
| val minNumFiles = sqlConf.getConf(SQLConf.SINGLE_TASK_EXECUTION_MIN_NUM_FILES) | ||
| val maxNumFiles = sqlConf.getConf(SQLConf.SINGLE_TASK_EXECUTION_MAX_NUM_FILES) | ||
| val minNumBytes = sqlConf.getConf(SQLConf.SINGLE_TASK_EXECUTION_MIN_NUM_BYTES) | ||
| val maxPartitionBytes = sqlConf.getConf(SQLConf.FILES_MAX_PARTITION_BYTES) | ||
| val numFiles = selectedPartitions.totalNumberOfFiles | ||
| val numBytes = selectedPartitions.totalFileSize | ||
| numFiles >= minNumFiles && numFiles <= maxNumFiles && | ||
| numBytes >= minNumBytes && numBytes <= maxPartitionBytes | ||
| } | ||
| } | ||
|
|
||
|
|
||
| lazy val fileConstantMetadataColumns: Seq[AttributeReference] = output.collect { | ||
|
|
@@ -478,6 +506,8 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper { | |
| Nil | ||
| } | ||
| (partitioning, sortOrder) | ||
| } else if (useSingleTaskExecution) { | ||
| (SinglePartition, Nil) | ||
| } else { | ||
| (UnknownPartitioning(0), Nil) | ||
| } | ||
|
|
@@ -696,7 +726,8 @@ case class FileSourceScanExec( | |
| override val optionalNumCoalescedBuckets: Option[Int], | ||
| override val dataFilters: Seq[Expression], | ||
| override val tableIdentifier: Option[TableIdentifier], | ||
| override val disableBucketedScan: Boolean = false) | ||
| override val disableBucketedScan: Boolean = false, | ||
| override val markedForSingleTaskExecution: Boolean = false) | ||
| extends FileSourceScanLike { | ||
|
|
||
| // Note that some vals referring the file-based relation are lazy intentionally | ||
|
|
@@ -744,10 +775,28 @@ case class FileSourceScanExec( | |
| inputRDD :: Nil | ||
| } | ||
|
|
||
| /** | ||
| * The input RDD, coalesced to a single partition when this scan runs in single-task mode. This | ||
| * enforces the `SinglePartition` output partitioning reported by `outputPartitioning`, which is | ||
| * estimated from the statically-selected files and may not correspond exactly to the number of | ||
| * partitions the input RDD produces after dynamic pruning. Coalescing here keeps the query | ||
| * correct in either case. | ||
| */ | ||
| private[spark] lazy val maybeCoalesceInputRDD: RDD[InternalRow] = { | ||
| if (useSingleTaskExecution && inputRDD.getNumPartitions > 1) { | ||
| inputRDD.coalesce(1) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this consistent with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch — the combination was inconsistent: a marked bucketed scan would advertise |
||
| } else if (useSingleTaskExecution && inputRDD.getNumPartitions == 0) { | ||
| // All files were pruned away; produce a single empty partition to match `SinglePartition`. | ||
| sparkContext.parallelize[InternalRow](Nil, 1) | ||
| } else { | ||
| inputRDD | ||
| } | ||
| } | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| val numOutputRows = longMetric("numOutputRows") | ||
| if (needsUnsafeRowConversion) { | ||
| inputRDD.mapPartitionsWithIndexInternal { (index, iter) => | ||
| maybeCoalesceInputRDD.mapPartitionsWithIndexInternal { (index, iter) => | ||
| val toUnsafe = UnsafeProjection.create(schema) | ||
| toUnsafe.initialize(index) | ||
| iter.map { row => | ||
|
|
@@ -756,7 +805,7 @@ case class FileSourceScanExec( | |
| } | ||
| } | ||
| } else { | ||
| inputRDD.mapPartitionsInternal { iter => | ||
| maybeCoalesceInputRDD.mapPartitionsInternal { iter => | ||
| iter.map { row => | ||
| numOutputRows += 1 | ||
| row | ||
|
|
@@ -768,7 +817,7 @@ case class FileSourceScanExec( | |
| protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { | ||
| val numOutputRows = longMetric("numOutputRows") | ||
| val scanTime = longMetric("scanTime") | ||
| inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => | ||
| maybeCoalesceInputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => | ||
| new Iterator[ColumnarBatch] { | ||
|
|
||
| override def hasNext: Boolean = { | ||
|
|
@@ -921,7 +970,8 @@ case class FileSourceScanExec( | |
| optionalNumCoalescedBuckets, | ||
| QueryPlan.normalizePredicates(dataFilters, output), | ||
| None, | ||
| disableBucketedScan) | ||
| disableBucketedScan, | ||
| markedForSingleTaskExecution) | ||
| } | ||
|
|
||
| override def getStream: Option[SparkDataStream] = stream | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution | |
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} | ||
| import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition, UnknownPartitioning} | ||
| import org.apache.spark.sql.connector.read.streaming.SparkDataStream | ||
| import org.apache.spark.sql.execution.metric.SQLMetrics | ||
| import org.apache.spark.util.ArrayImplicits._ | ||
|
|
@@ -34,7 +35,10 @@ import org.apache.spark.util.ArrayImplicits._ | |
| case class LocalTableScanExec( | ||
| output: Seq[Attribute], | ||
| @transient rows: Seq[InternalRow], | ||
| @transient stream: Option[SparkDataStream]) | ||
| @transient stream: Option[SparkDataStream], | ||
| // When true, the relation is scanned in a single partition, so this node reports a | ||
| // `SinglePartition` output partitioning. Set by the `MarkSingleTaskExecution` optimizer rule. | ||
| useSingleTask: Boolean = false) | ||
| extends LeafExecNode | ||
| with StreamSourceAwareSparkPlan | ||
| with InputRDDCodegen { | ||
|
|
@@ -53,14 +57,33 @@ case class LocalTableScanExec( | |
|
|
||
| @transient private lazy val rdd: RDD[InternalRow] = { | ||
| if (rows.isEmpty) { | ||
| sparkContext.emptyRDD | ||
| if (useSingleTask) { | ||
| // Produce a single empty partition to match the `SinglePartition` reported by | ||
| // `outputPartitioning`. `emptyRDD` has zero partitions, and running e.g. a global | ||
| // aggregation on a zero-partition RDD with the shuffle elided would return no rows | ||
| // instead of the single row expected on empty input. | ||
| sparkContext.parallelize(Seq.empty[InternalRow], 1) | ||
| } else { | ||
| sparkContext.emptyRDD | ||
| } | ||
| } else { | ||
| val numSlices = math.min( | ||
| unsafeRows.length, session.leafNodeDefaultParallelism) | ||
| val numSlices = if (useSingleTask) { | ||
| 1 | ||
| } else { | ||
| math.min(unsafeRows.length, session.leafNodeDefaultParallelism) | ||
| } | ||
| sparkContext.parallelize(unsafeRows.toImmutableArraySeq, numSlices) | ||
| } | ||
| } | ||
|
|
||
| override def outputPartitioning: Partitioning = { | ||
| if (useSingleTask) { | ||
| SinglePartition | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch — this was a real correctness issue. The advertised |
||
| } else { | ||
| UnknownPartitioning(0) | ||
| } | ||
| } | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| val numOutputRows = longMetric("numOutputRows") | ||
| rdd.map { r => | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
isLocalRelationEligible,get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM)is considered in a way. Do we need to consider SQLConf.LEAF_NODE_DEFAULT_PARALLELISM here too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, file scans should respect it too. Moved the check up to
apply()so the whole rule is skipped when a leaf-node parallelism override is set.