Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, Expression, ExprId}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EmptyRow, Expression, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.BinByOutputAliases
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -154,4 +155,22 @@ object BinByResolution {
timeZoneId = if (isLTZ) Some(sessionZone) else None
)
}

/**
* Builds the three appended output attributes (`bin_start`, `bin_end`, `bin_distribute_ratio`),
* applying `aliases`; `rangeType` is the type of `bin_start` / `bin_end`.
*/
def appendedAttributesWithAliases(
rangeType: DataType,
aliases: BinByOutputAliases): Seq[Attribute] = Seq(
AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(),
AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(),
AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)())

/**
* Mints a produced output attribute for each DISTRIBUTE input column: same name, type, and
* nullability, but a fresh `ExprId` so the rescaled value is a distinct attribute from the input.
*/
def scaledDistributeAttributes(distributeColumns: Seq[Attribute]): Seq[Attribute] =
distributeColumns.map(a => AttributeReference(a.name, a.dataType, a.nullable)())
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
existingRelations,
b,
_.producedAttributes.map(_.exprId.id).toSeq,
newBinBy => newBinBy.copy(appendedAttributes =
newBinBy.appendedAttributes.map(_.newInstance())))
newBinBy => newBinBy.copy(
scaledDistributeColumns = newBinBy.scaledDistributeColumns.map(_.newInstance()),
appendedAttributes = newBinBy.appendedAttributes.map(_.newInstance())))

case e: Expand =>
deduplicateAndRenew[Expand](
Expand Down Expand Up @@ -470,6 +471,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
case oldVersion: BinBy
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.copy(
scaledDistributeColumns = oldVersion.scaledDistributeColumns.map(_.newInstance()),
appendedAttributes = oldVersion.appendedAttributes.map(_.newInstance()))
newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ object ResolveBinBy extends Rule[LogicalPlan] {
originExpr = b.originExpr)

val appendedAttributes =
BinBy.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases)
BinByResolution.appendedAttributesWithAliases(parameters.rangeType, b.outputAliases)
val scaledDistributeColumns = BinByResolution.scaledDistributeAttributes(distributeAttributes)

BinBy(
binWidthMicros = parameters.binWidthMicros,
rangeStart = rangeStart,
rangeEnd = rangeEnd,
originMicros = parameters.originMicros,
distributeColumns = distributeAttributes,
scaledDistributeColumns = scaledDistributeColumns,
appendedAttributes = appendedAttributes,
child = child,
timeZoneId = parameters.timeZoneId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.{AliasIdentifier, InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode, WidenStatefulOpNullability}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
Expand Down Expand Up @@ -1798,8 +1797,13 @@ case class UnresolvedBinBy(
* @param rangeEnd Resolved attribute holding each row's window-end timestamp.
* @param originMicros Alignment anchor in microseconds since the epoch: the folded value of
* `ALIGN TO`, or the type-specific default when the clause is omitted.
* @param distributeColumns Resolved columns to proportionally redistribute.
* @param appendedAttributes The three output attributes appended after `child.output`.
* @param distributeColumns Resolved input columns to proportionally redistribute. Read by the
* operator to compute the rescaled values; not part of `output`.
* @param scaledDistributeColumns
* Produced output attributes holding the rescaled values (fresh
* `ExprId`s, same names/types as `distributeColumns`); they replace
* `distributeColumns` in `output`.
* @param appendedAttributes The three output attributes appended after the child columns.
* @param child Input relation.
* @param timeZoneId Captured session local time zone for LTZ inputs; `None` for NTZ.
* Required when `rangeStart.dataType` is `TimestampType`; must be
Expand All @@ -1811,36 +1815,37 @@ case class BinBy(
rangeEnd: Attribute,
originMicros: Long,
distributeColumns: Seq[Attribute],
scaledDistributeColumns: Seq[Attribute],
appendedAttributes: Seq[Attribute],
child: LogicalPlan,
timeZoneId: Option[String])
extends UnaryNode {

if (timeZoneId.isDefined != rangeStart.dataType.isInstanceOf[TimestampType]) {
throw SparkException.internalError(
s"timeZoneId must be set iff rangeStart is TIMESTAMP (LTZ); got rangeStart.dataType=" +
s"${rangeStart.dataType}, timeZoneId=$timeZoneId")
}
assert(timeZoneId.isDefined == rangeStart.dataType.isInstanceOf[TimestampType],
s"timeZoneId must be set iff rangeStart is TIMESTAMP (LTZ); got rangeStart.dataType=" +
s"${rangeStart.dataType}, timeZoneId=$timeZoneId")

assert(distributeColumns.length == scaledDistributeColumns.length,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: the timeZoneId invariant right above (line 1826) throws SparkException.internalError, while this equal-length invariant uses assert. Both are reasonable, but two internal-invariant mechanisms side by side in the same constructor reads slightly inconsistently — consider matching the neighbor for uniformity.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the timeZoneId guard to assert too. Neither can actually fire unless a bug is introduced so assert is more appropriate. Thanks!

"BinBy requires one scaled attribute per DISTRIBUTE column, got " +
s"${distributeColumns.length} distribute columns and " +
s"${scaledDistributeColumns.length} scaled attributes")

// In `output`, each DISTRIBUTE input is replaced by its scaled produced counterpart.
private lazy val distributeReplacements: AttributeMap[Attribute] =
AttributeMap(distributeColumns.zip(scaledDistributeColumns))

override def output: Seq[Attribute] = child.output ++ appendedAttributes
override def output: Seq[Attribute] =
child.output.map(a => distributeReplacements.getOrElse(a, a)) ++ appendedAttributes

override def producedAttributes: AttributeSet = AttributeSet(appendedAttributes)
override def producedAttributes: AttributeSet =
AttributeSet(scaledDistributeColumns ++ appendedAttributes)

final override val nodePatterns: Seq[TreePattern] = Seq(BIN_BY)

override protected def withNewChildInternal(newChild: LogicalPlan): BinBy =
copy(child = newChild)
}

object BinBy {
def appendedAttributesWithAliases(
rangeType: DataType,
aliases: BinByOutputAliases): Seq[Attribute] = Seq(
AttributeReference(aliases.effectiveBinStart, rangeType, nullable = true)(),
AttributeReference(aliases.effectiveBinEnd, rangeType, nullable = true)(),
AttributeReference(aliases.effectiveBinRatio, DoubleType, nullable = true)())
}

/**
* A logical plan node for creating a logical limit, which is split into two separate logical nodes:
* a [[LocalLimit]], which is a partition local limit, followed by a [[GlobalLimit]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ResolveBinBySuite extends AnalysisTest {
private val tsEndNtz = $"ts_end".timestampNTZ
private val value = $"value".double
private val label = $"label".string
private val label2 = $"label2".string

private val ltzChild: LogicalPlan = LocalRelation(tsStart, tsEnd, value, label)
private val ntzChild: LogicalPlan = LocalRelation(tsStartNtz, tsEndNtz, value)
Expand Down Expand Up @@ -202,6 +203,67 @@ class ResolveBinBySuite extends AnalysisTest {
assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId))
}

test("resolved BinBy emits the DISTRIBUTE column as a produced attribute replacing the input") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description lists "multiple DISTRIBUTE columns are each replaced in place with distinct fresh ids" as a tested case, but this suite has no such test: this one resolves a single column and asserts scaledDistributeColumns.map(_.exprId) == Seq(outValue.exprId), and the only multi-element distribute list (Seq(value, value) at line 322) is the duplicate-rejection negative test.

Multi-column is the case distributeReplacements (an AttributeMap over a zip) and child.output.map(getOrElse) exist to serve — worth a test with two distinct DISTRIBUTE columns at different schema positions, asserting each is replaced in place with a distinct fresh id. Non-blocking, but it closes the gap and the stale description claim.

@vranes vranes Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test which covers the distributeReplacements multi-column path. Thanks!

// `value` sits mid-schema (not last) and carries a qualifier + metadata, so this covers
// in-place replacement, produced identity, and the qualifier/metadata drop in one go.
val md = new MetadataBuilder().putString("comment", "a measure").build()
val valueMd = AttributeReference("value", DoubleType, nullable = true, md)()
val child = SubqueryAlias("m", LocalRelation(tsStart, tsEnd, valueMd, label))
val bi = ResolveBinBy.apply(
unresolved(child = child, distribute = Seq(UnresolvedAttribute(Seq("m", "value")))))
.asInstanceOf[BinBy]

// The input is read (held in distributeColumns) but not forwarded by identity.
assert(bi.distributeColumns.head.qualifier == Seq("m"))
assert(bi.distributeColumns.head.metadata == md)
assert(!bi.output.exists(_.exprId == valueMd.exprId))

// It is replaced at its own position by a fresh-id, same-name produced attribute.
val outValue = bi.output(child.output.indexWhere(_.exprId == valueMd.exprId))
assert(outValue.name == "value" && outValue.exprId != valueMd.exprId)
assert(bi.scaledDistributeColumns.map(_.exprId) == Seq(outValue.exprId))
assert(bi.producedAttributes.contains(outValue))

// The produced (computed) value drops the input's qualifier and metadata.
assert(outValue.qualifier.isEmpty && outValue.metadata == Metadata.empty)

// Forwarded (non-distribute) columns keep their identity.
assert(bi.output.exists(_.exprId == label.exprId))
assert(bi.output.exists(_.exprId == tsStart.exprId))
}

test("resolved BinBy emits each of multiple DISTRIBUTE columns as a produced attribute " +
"replacing the input") {
// `v1`, `v2`, `v3` sit at non-adjacent schema positions with forwarded columns between
// them, so this covers per-slot in-place replacement with distinct fresh ids.
val v1 = AttributeReference("v1", DoubleType, nullable = true)()
val v2 = AttributeReference("v2", DoubleType, nullable = true)()
val v3 = AttributeReference("v3", DoubleType, nullable = true)()
val child = LocalRelation(tsStart, tsEnd, v1, label, v2, label2, v3)
val distribute = Seq(v1, v2, v3)
val bi = ResolveBinBy.apply(
unresolved(child = child, distribute = distribute)).asInstanceOf[BinBy]

// The inputs are read (held in distributeColumns) but none is forwarded by identity.
assert(bi.distributeColumns.map(_.exprId) == distribute.map(_.exprId))
assert(distribute.forall(v => !bi.output.exists(_.exprId == v.exprId)))

// Each is replaced at its own position by a fresh-id, same-name produced attribute.
val outputs = distribute.map(v => bi.output(child.output.indexWhere(_.exprId == v.exprId)))
outputs.zip(distribute).foreach { case (out, in) =>
assert(out.name == in.name && out.exprId != in.exprId)
}
assert(outputs.map(_.exprId).distinct.length == distribute.length)
assert(bi.scaledDistributeColumns.map(_.exprId) == outputs.map(_.exprId))
assert(outputs.forall(bi.producedAttributes.contains))

// The child portion of `output` keeps the child's column order and names.
assert(bi.output.take(child.output.length).map(_.name) == child.output.map(_.name))
assert(bi.output.exists(_.exprId == label.exprId))
assert(bi.output.exists(_.exprId == label2.exprId))
assert(bi.output.exists(_.exprId == tsStart.exprId))
}

test("multipart identifiers disambiguate same-name columns across a JOIN") {
val t1Start = AttributeReference("ts_start", TimestampType, nullable = true)()
val t1End = AttributeReference("ts_end", TimestampType, nullable = true)()
Expand Down Expand Up @@ -330,9 +392,10 @@ class ResolveBinBySuite extends AnalysisTest {

val binBys = analyzed.collect { case b: BinBy => b }
assert(binBys.size == 2, s"expected two BinBy nodes, got ${binBys.size}")
val appendedExprIds = binBys.flatMap(_.appendedAttributes.map(_.exprId))
assert(appendedExprIds.distinct.size == appendedExprIds.size,
"appended BinBy attributes must have distinct exprIds across the two join sides")
val producedExprIds = binBys.flatMap(b =>
(b.scaledDistributeColumns ++ b.appendedAttributes).map(_.exprId))
assert(producedExprIds.distinct.size == producedExprIds.size,
"produced BinBy attributes must have distinct exprIds across the two join sides")
}

// `super.test` escapes the suite-wide flag-on wrapper; pin the flag off explicitly.
Expand Down