-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57839][SQL] Support the nanosecond-precision timestamp types in CBO statistics estimation #56941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-57839][SQL] Support the nanosecond-precision timestamp types in CBO statistics estimation #56941
Changes from all commits
660f2c8
9b8f2c9
b82965d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ColumnStatsMa | |
| import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.unsafe.types.TimestampNanosVal | ||
|
|
||
| /** | ||
| * In this test suite, we test predicates containing the following operators: | ||
|
|
@@ -114,6 +115,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase { | |
| val colStatIntSkewHgm = ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(10), | ||
| nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew)) | ||
|
|
||
| // column ctsnanos has 10 nanosecond-precision timestamp values. | ||
| // Values span from epochMicros=1000 nanosWithinMicro=0 to epochMicros=1009 nanosWithinMicro=0 | ||
| // i.e. total epoch nanos 1000000..1009000 (range = 9000 nanos) | ||
| val tsNanosMin = TimestampNanosVal.fromParts(1000L, 0.toShort) | ||
| val tsNanosMax = TimestampNanosVal.fromParts(1009L, 0.toShort) | ||
| val attrTsNanos = AttributeReference("ctsnanos", TimestampNTZNanosType(9))() | ||
| val colStatTsNanos = ColumnStat(distinctCount = Some(10), | ||
| min = Some(tsNanosMin), max = Some(tsNanosMax), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) | ||
|
|
||
| val attributeMap = AttributeMap(Seq( | ||
| attrInt -> colStatInt, | ||
| attrBool -> colStatBool, | ||
|
|
@@ -125,7 +136,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase { | |
| attrInt3 -> colStatInt3, | ||
| attrInt4 -> colStatInt4, | ||
| attrIntHgm -> colStatIntHgm, | ||
| attrIntSkewHgm -> colStatIntSkewHgm | ||
| attrIntSkewHgm -> colStatIntSkewHgm, | ||
| attrTsNanos -> colStatTsNanos | ||
| )) | ||
|
|
||
| test("true") { | ||
|
|
@@ -1020,4 +1032,136 @@ class FilterEstimationSuite extends StatsEstimationTestBase { | |
| } | ||
| } | ||
|
|
||
| // Tests for nanosecond-precision timestamp types (SPARK-57839) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Non-blocking test-coverage suggestion. These nanos cases use
Two optional hardening cases: a |
||
| test("ctsnanos = TimestampNanosVal(1003, 0)") { | ||
| val tsVal = TimestampNanosVal.fromParts(1003L, 0.toShort) | ||
| validateEstimatedStats( | ||
| Filter(EqualTo(attrTsNanos, Literal(tsVal, TimestampNTZNanosType(9))), | ||
| childStatsTestPlan(Seq(attrTsNanos), 10L)), | ||
| Seq(attrTsNanos -> ColumnStat(distinctCount = Some(1), | ||
| min = Some(tsVal), max = Some(tsVal), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), | ||
| expectedRowCount = 1) | ||
| } | ||
|
|
||
| test("ctsnanos < TimestampNanosVal(1003, 0)") { | ||
| // Range [1000000, 1009000], value = 1003000, fraction = 3000/9000 = 1/3 | ||
| // Rows = ceil(10 * 3/9) = 4 (3.33 rounds up), ndv = ceil(10 * 3/9) = 4 | ||
| val tsVal = TimestampNanosVal.fromParts(1003L, 0.toShort) | ||
| validateEstimatedStats( | ||
| Filter(LessThan(attrTsNanos, Literal(tsVal, TimestampNTZNanosType(9))), | ||
| childStatsTestPlan(Seq(attrTsNanos), 10L)), | ||
| Seq(attrTsNanos -> ColumnStat(distinctCount = Some(4), | ||
| min = Some(tsNanosMin), max = Some(tsVal), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), | ||
| expectedRowCount = 4) | ||
| } | ||
|
|
||
| test("ctsnanos IN (TimestampNanosVal(1003, 0), TimestampNanosVal(1005, 0))") { | ||
| val ts3 = TimestampNanosVal.fromParts(1003L, 0.toShort) | ||
| val ts5 = TimestampNanosVal.fromParts(1005L, 0.toShort) | ||
| validateEstimatedStats( | ||
| Filter(In(attrTsNanos, Seq( | ||
| Literal(ts3, TimestampNTZNanosType(9)), | ||
| Literal(ts5, TimestampNTZNanosType(9)))), | ||
| childStatsTestPlan(Seq(attrTsNanos), 10L)), | ||
| Seq(attrTsNanos -> ColumnStat(distinctCount = Some(2), | ||
| min = Some(ts3), max = Some(ts5), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), | ||
| expectedRowCount = 2) | ||
| } | ||
|
|
||
| // column ctsLtzNanos: LTZ nanos type with sub-microsecond values (nanosWithinMicro != 0). | ||
| // Values span epochMicros=1000 nanos=500 to epochMicros=1009 nanos=500 | ||
| // i.e. total epoch nanos 1000500..1009500 (range = 9000 nanos) | ||
| val tsLtzNanosMin = TimestampNanosVal.fromParts(1000L, 500.toShort) | ||
| val tsLtzNanosMax = TimestampNanosVal.fromParts(1009L, 500.toShort) | ||
| val attrTsLtzNanos = AttributeReference("ctsltznanos", TimestampLTZNanosType(9))() | ||
| val colStatTsLtzNanos = ColumnStat(distinctCount = Some(10), | ||
| min = Some(tsLtzNanosMin), max = Some(tsLtzNanosMax), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) | ||
|
|
||
| test("ctsltznanos < TimestampNanosVal(1003, 500) - LTZ type with sub-microsecond") { | ||
| // Range [1000500, 1009500], value = 1003500, fraction = 3000/9000 = 1/3 | ||
| // Rows = ceil(10 * 3/9) = 4, ndv = ceil(10 * 3/9) = 4 | ||
| val tsVal = TimestampNanosVal.fromParts(1003L, 500.toShort) | ||
| val ltzMap = AttributeMap(Seq(attrTsLtzNanos -> colStatTsLtzNanos)) | ||
| validateEstimatedStats( | ||
| Filter(LessThan(attrTsLtzNanos, Literal(tsVal, TimestampLTZNanosType(9))), | ||
| childStatsTestPlan(Seq(attrTsLtzNanos), 10L, ltzMap)), | ||
| Seq(attrTsLtzNanos -> ColumnStat(distinctCount = Some(4), | ||
| min = Some(tsLtzNanosMin), max = Some(tsVal), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), | ||
| expectedRowCount = 4) | ||
| } | ||
|
|
||
| test("ctsnanos = TimestampNanosVal(1003, 456) - sub-microsecond nanosWithinMicro") { | ||
| // Test that sub-microsecond nanosWithinMicro term is exercised in toDouble estimation. | ||
| // The value 1003*1000+456 = 1003456 falls within range [1000000, 1009000]. | ||
| val subMicroMin = TimestampNanosVal.fromParts(1000L, 0.toShort) | ||
| val subMicroMax = TimestampNanosVal.fromParts(1009L, 0.toShort) | ||
| val attrSubMicro = AttributeReference("ctssubmicro", TimestampNTZNanosType(9))() | ||
| val colStatSubMicro = ColumnStat(distinctCount = Some(10), | ||
| min = Some(subMicroMin), max = Some(subMicroMax), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) | ||
| val subMicroMap = AttributeMap(Seq(attrSubMicro -> colStatSubMicro)) | ||
| val tsVal = TimestampNanosVal.fromParts(1003L, 456.toShort) | ||
| validateEstimatedStats( | ||
| Filter(EqualTo(attrSubMicro, Literal(tsVal, TimestampNTZNanosType(9))), | ||
| childStatsTestPlan(Seq(attrSubMicro), 10L, subMicroMap)), | ||
| Seq(attrSubMicro -> ColumnStat(distinctCount = Some(1), | ||
| min = Some(tsVal), max = Some(tsVal), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), | ||
| expectedRowCount = 1) | ||
| } | ||
|
|
||
| // High-magnitude nanos tests: exercise the lossy toDouble/fromDouble path where | ||
| // epochMicros*1000 exceeds Double's 2^53 exact-integer range (real-world 2024 timestamps). | ||
| // epochMicros ~1.7e15 => epoch nanos ~1.7e18 >> 2^53 (~9e15). | ||
| // The point is to verify estimation remains sensible despite floating-point rounding. | ||
|
|
||
| test("NTZ nanos filter estimation at high magnitude (2024 timestamps)") { | ||
| // 2024-01-01T00:00:00Z => epochMicros = 1704067200000000L | ||
| // 2024-01-01T00:00:09Z => epochMicros = 1704067209000000L | ||
| // Range in nanos = 9000000 nanos (9 ms) | ||
| val hiMin = TimestampNanosVal.fromParts(1704067200000000L, 0.toShort) | ||
| val hiMax = TimestampNanosVal.fromParts(1704067209000000L, 0.toShort) | ||
| val attrHiNtz = AttributeReference("ctsnanos_hi", TimestampNTZNanosType(9))() | ||
| val colStatHi = ColumnStat(distinctCount = Some(10), | ||
| min = Some(hiMin), max = Some(hiMax), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) | ||
| val hiMap = AttributeMap(Seq(attrHiNtz -> colStatHi)) | ||
| // Filter value at ~1/3 of the range | ||
| val tsVal = TimestampNanosVal.fromParts(1704067203000000L, 0.toShort) | ||
| // Expected: fraction ~ 3/9 = 1/3, rows = ceil(10 * 1/3) = 4, ndv = 4 | ||
| validateEstimatedStats( | ||
| Filter(LessThan(attrHiNtz, Literal(tsVal, TimestampNTZNanosType(9))), | ||
| childStatsTestPlan(Seq(attrHiNtz), 10L, hiMap)), | ||
| Seq(attrHiNtz -> ColumnStat(distinctCount = Some(4), | ||
| min = Some(hiMin), max = Some(tsVal), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), | ||
| expectedRowCount = 4) | ||
| } | ||
|
|
||
| test("LTZ nanos filter estimation at high magnitude (2024 timestamps)") { | ||
| // Same magnitude as above but using TimestampLTZNanosType. | ||
| val hiMin = TimestampNanosVal.fromParts(1704067200000000L, 0.toShort) | ||
| val hiMax = TimestampNanosVal.fromParts(1704067209000000L, 0.toShort) | ||
| val attrHiLtz = AttributeReference("ctsltznanos_hi", TimestampLTZNanosType(9))() | ||
| val colStatHi = ColumnStat(distinctCount = Some(10), | ||
| min = Some(hiMin), max = Some(hiMax), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10)) | ||
| val hiMap = AttributeMap(Seq(attrHiLtz -> colStatHi)) | ||
| // Filter value at ~1/3 of the range | ||
| val tsVal = TimestampNanosVal.fromParts(1704067203000000L, 0.toShort) | ||
| // Expected: fraction ~ 3/9 = 1/3, rows = ceil(10 * 1/3) = 4, ndv = 4 | ||
| validateEstimatedStats( | ||
| Filter(LessThan(attrHiLtz, Literal(tsVal, TimestampLTZNanosType(9))), | ||
| childStatsTestPlan(Seq(attrHiLtz), 10L, hiMap)), | ||
| Seq(attrHiLtz -> ColumnStat(distinctCount = Some(4), | ||
| min = Some(hiMin), max = Some(tsVal), | ||
| nullCount = Some(0), avgLen = Some(10), maxLen = Some(10))), | ||
| expectedRowCount = 4) | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-blocking. This
fromDoublenanos arm is lossy at the nanos scale (epochMicros*1000+nanosWithinMicroexceeds 2^53 for real-world dates; ULP ~256ns at 2024), which thetoDoublecomment above notes. On the Filter paths that's harmless — the updated min/max are the original values, notfromDoublereconstructions. But the join path does round-trip through here:JoinEstimation→ValueInterval.intersect(ValueInterval.scala:90-91) callsfromDoubleon the intersected nanos min/max, and those flow into the join output's estimatedColumnStat(JoinEstimation.scala:253), so a joined nanos key's estimated bounds can be perturbed by up to ~256ns. MicroTimestampType(double.toLong) is exact there (1.7e15 < 2^53), so this is a nanos-only divergence.This is estimation-only (never a query result, never persisted to the catalog — catalog min/max come from
ANALYZEvia the lossless formatter), well below CBO's noise floor, and matches your "Acceptable for estimation only" note — so not a blocker. Consider a one-line comment on the join/intersect path (or here, noting the reconstructed value is used for join-stat propagation) so the approximation is documented where the value is actually persisted into stats, not only at the conversion site.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in the latest commit: added a matching lossy-precision comment on the fromDouble nanos arm (referencing the toDouble comment above), and added two high-magnitude filter estimation tests (NTZ + LTZ) using a 2024 timestamp (epochMicros ~1.7e15, so epoch-nanos ~1.7e18, well past 2^53) that exercise the lossy toDouble/fromDouble path and assert range selectivity is still computed correctly.