diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala index 8f15acb536e51..8a370e9a9dc78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala @@ -747,7 +747,7 @@ case class ListAgg( private def isCastEqualityPreserving(dt: DataType): Boolean = dt match { case _: IntegerType | LongType | ShortType | ByteType => true case _: DecimalType => true - case _: DateType | TimestampNTZType => true + case _: DateType | TimestampNTZType | _: TimestampNTZNanosType => true case _: TimeType => true case _: CalendarIntervalType => true case _: YearMonthIntervalType => true @@ -757,8 +757,8 @@ case class ListAgg( case st: StringType => st.isUTF8BinaryCollation case _: DoubleType | FloatType => false // During DST fall-back, two distinct UTC epochs can format to the same local time string - // because the default format omits the timezone offset. TimestampNTZType is safe (uses UTC). - case _: TimestampType => false + // because the default format omits the timezone offset. NTZ types are safe (use UTC). + case _: TimestampType | _: TimestampLTZNanosType => false case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala index f19f1741479cc..ee350f5c5e2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala @@ -608,6 +608,64 @@ abstract class TimestampNanosFunctionsSuiteBase extends SharedSparkSession { checkAnswer(df.select(timestamp_nanos(col("n"))), Row(null)) checkAnswer(df.selectExpr("timestamp_nanos(n)"), Row(null)) } + + test("SPARK-57809: listagg(distinct cast(ts as string)) within group (order by ts) " + + "over nanosecond-precision timestamps") { + // isCastEqualityPreserving: NTZ nanos is safe (UTC, no DST ambiguity), LTZ nanos is unsafe + // (same DST fall-back risk as micro TIMESTAMP_LTZ). This mirrors the micro-precision behavior: + // TimestampNTZType -> true, TimestampType -> false. + Seq(7, 8, 9).foreach { p => + val ntzDF = spark.createDataFrame( + spark.sparkContext.parallelize(Seq( + Row(LocalDateTime.parse("2020-01-01T12:00:00.100000000")), + Row(LocalDateTime.parse("2020-01-02T12:00:00.200000000")))), + new StructType().add("ts", TimestampNTZNanosType(p))) + + // NTZ nanos: cast to string is equality-preserving, so LISTAGG(DISTINCT ...) is allowed. + withSQLConf(SQLConf.LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER.key -> "true") { + val result = ntzDF.selectExpr( + "listagg(distinct cast(ts as string), ', ') within group (order by ts)").collect() + assert(result.length == 1 && result.head.getString(0) != null, + s"NTZ nanos p=$p: listagg should succeed with a non-null result") + } + + val ltzDF = spark.createDataFrame( + spark.sparkContext.parallelize(Seq( + Row(Instant.parse("2020-01-01T20:00:00.100000000Z")), + Row(Instant.parse("2020-01-02T20:00:00.200000000Z")))), + new StructType().add("ts", TimestampLTZNanosType(p))) + + withSQLConf(SQLConf.LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER.key -> "true") { + checkError( + exception = intercept[AnalysisException] { + ltzDF.selectExpr( + "listagg(distinct cast(ts as string)) within group (order by ts)") + }, + condition = + "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT_UNSAFE_CAST", + parameters = Map( + "funcName" -> "`listagg`", + "inputType" -> s""""TIMESTAMP_LTZ($p)"""", + "castType" -> "\"STRING\"" + ) + ) + } + withSQLConf(SQLConf.LISTAGG_ALLOW_DISTINCT_CAST_WITH_ORDER.key -> "false") { + checkError( + exception = intercept[AnalysisException] { + ltzDF.selectExpr( + "listagg(distinct cast(ts as string)) within group (order by ts)") + }, + condition = "INVALID_WITHIN_GROUP_EXPRESSION.MISMATCH_WITH_DISTINCT_INPUT", + parameters = Map( + "funcName" -> "`listagg`", + "funcArg" -> "\"CAST(ts AS STRING)\"", + "orderingExpr" -> "\"ts\"" + ) + ) + } + } + } } // Runs the nanosecond timestamp function tests with ANSI mode enabled explicitly.