-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57858][SQL] Emit BIN BY scaled DISTRIBUTE columns as produced attributes #56930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,6 +47,7 @@ class ResolveBinBySuite extends AnalysisTest { | |
| private val tsEndNtz = $"ts_end".timestampNTZ | ||
| private val value = $"value".double | ||
| private val label = $"label".string | ||
| private val label2 = $"label2".string | ||
|
|
||
| private val ltzChild: LogicalPlan = LocalRelation(tsStart, tsEnd, value, label) | ||
| private val ntzChild: LogicalPlan = LocalRelation(tsStartNtz, tsEndNtz, value) | ||
|
|
@@ -202,6 +203,67 @@ class ResolveBinBySuite extends AnalysisTest { | |
| assert(bi.distributeColumns.map(_.exprId) == Seq(value.exprId)) | ||
| } | ||
|
|
||
| test("resolved BinBy emits the DISTRIBUTE column as a produced attribute replacing the input") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The PR description lists "multiple Multi-column is the case
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a test which covers the distributeReplacements multi-column path. Thanks! |
||
| // `value` sits mid-schema (not last) and carries a qualifier + metadata, so this covers | ||
| // in-place replacement, produced identity, and the qualifier/metadata drop in one go. | ||
| val md = new MetadataBuilder().putString("comment", "a measure").build() | ||
| val valueMd = AttributeReference("value", DoubleType, nullable = true, md)() | ||
| val child = SubqueryAlias("m", LocalRelation(tsStart, tsEnd, valueMd, label)) | ||
| val bi = ResolveBinBy.apply( | ||
| unresolved(child = child, distribute = Seq(UnresolvedAttribute(Seq("m", "value"))))) | ||
| .asInstanceOf[BinBy] | ||
|
|
||
| // The input is read (held in distributeColumns) but not forwarded by identity. | ||
| assert(bi.distributeColumns.head.qualifier == Seq("m")) | ||
| assert(bi.distributeColumns.head.metadata == md) | ||
| assert(!bi.output.exists(_.exprId == valueMd.exprId)) | ||
|
|
||
| // It is replaced at its own position by a fresh-id, same-name produced attribute. | ||
| val outValue = bi.output(child.output.indexWhere(_.exprId == valueMd.exprId)) | ||
| assert(outValue.name == "value" && outValue.exprId != valueMd.exprId) | ||
| assert(bi.scaledDistributeColumns.map(_.exprId) == Seq(outValue.exprId)) | ||
| assert(bi.producedAttributes.contains(outValue)) | ||
|
|
||
| // The produced (computed) value drops the input's qualifier and metadata. | ||
| assert(outValue.qualifier.isEmpty && outValue.metadata == Metadata.empty) | ||
|
|
||
| // Forwarded (non-distribute) columns keep their identity. | ||
| assert(bi.output.exists(_.exprId == label.exprId)) | ||
| assert(bi.output.exists(_.exprId == tsStart.exprId)) | ||
| } | ||
|
|
||
| test("resolved BinBy emits each of multiple DISTRIBUTE columns as a produced attribute " + | ||
| "replacing the input") { | ||
| // `v1`, `v2`, `v3` sit at non-adjacent schema positions with forwarded columns between | ||
| // them, so this covers per-slot in-place replacement with distinct fresh ids. | ||
| val v1 = AttributeReference("v1", DoubleType, nullable = true)() | ||
| val v2 = AttributeReference("v2", DoubleType, nullable = true)() | ||
| val v3 = AttributeReference("v3", DoubleType, nullable = true)() | ||
| val child = LocalRelation(tsStart, tsEnd, v1, label, v2, label2, v3) | ||
| val distribute = Seq(v1, v2, v3) | ||
| val bi = ResolveBinBy.apply( | ||
| unresolved(child = child, distribute = distribute)).asInstanceOf[BinBy] | ||
|
|
||
| // The inputs are read (held in distributeColumns) but none is forwarded by identity. | ||
| assert(bi.distributeColumns.map(_.exprId) == distribute.map(_.exprId)) | ||
| assert(distribute.forall(v => !bi.output.exists(_.exprId == v.exprId))) | ||
|
|
||
| // Each is replaced at its own position by a fresh-id, same-name produced attribute. | ||
| val outputs = distribute.map(v => bi.output(child.output.indexWhere(_.exprId == v.exprId))) | ||
| outputs.zip(distribute).foreach { case (out, in) => | ||
| assert(out.name == in.name && out.exprId != in.exprId) | ||
| } | ||
| assert(outputs.map(_.exprId).distinct.length == distribute.length) | ||
| assert(bi.scaledDistributeColumns.map(_.exprId) == outputs.map(_.exprId)) | ||
| assert(outputs.forall(bi.producedAttributes.contains)) | ||
|
|
||
| // The child portion of `output` keeps the child's column order and names. | ||
| assert(bi.output.take(child.output.length).map(_.name) == child.output.map(_.name)) | ||
| assert(bi.output.exists(_.exprId == label.exprId)) | ||
| assert(bi.output.exists(_.exprId == label2.exprId)) | ||
| assert(bi.output.exists(_.exprId == tsStart.exprId)) | ||
| } | ||
|
|
||
| test("multipart identifiers disambiguate same-name columns across a JOIN") { | ||
| val t1Start = AttributeReference("ts_start", TimestampType, nullable = true)() | ||
| val t1End = AttributeReference("ts_end", TimestampType, nullable = true)() | ||
|
|
@@ -330,9 +392,10 @@ class ResolveBinBySuite extends AnalysisTest { | |
|
|
||
| val binBys = analyzed.collect { case b: BinBy => b } | ||
| assert(binBys.size == 2, s"expected two BinBy nodes, got ${binBys.size}") | ||
| val appendedExprIds = binBys.flatMap(_.appendedAttributes.map(_.exprId)) | ||
| assert(appendedExprIds.distinct.size == appendedExprIds.size, | ||
| "appended BinBy attributes must have distinct exprIds across the two join sides") | ||
| val producedExprIds = binBys.flatMap(b => | ||
| (b.scaledDistributeColumns ++ b.appendedAttributes).map(_.exprId)) | ||
| assert(producedExprIds.distinct.size == producedExprIds.size, | ||
| "produced BinBy attributes must have distinct exprIds across the two join sides") | ||
| } | ||
|
|
||
| // `super.test` escapes the suite-wide flag-on wrapper; pin the flag off explicitly. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: the
timeZoneIdinvariant right above (line 1826) throwsSparkException.internalError, while this equal-length invariant usesassert. Both are reasonable, but two internal-invariant mechanisms side by side in the same constructor reads slightly inconsistently — consider matching the neighbor for uniformity.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the timeZoneId guard to assert too. Neither can actually fire unless a bug is introduced so assert is more appropriate. Thanks!