Skip to content

Commit

Permalink
[SPARK-50689][SQL] Enforce deterministic ordering in LCA project lists
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Using `Set` to produce project lists may result in those projects being non-deterministic. Instead we switch to using `LinkedHashSet`.

### Why are the changes needed?

It's better for the analyzer to produce stable query plans, regardless of the java/scala versions.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #49319 from mihailotim-db/mihailotim-db/linked_set_lca.

Authored-by: Mihailo Timotic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
mihailotim-db authored and cloud-fan committed Dec 27, 2024
1 parent 939129e commit af53ee4
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.analysis

import java.util.LinkedHashSet

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.WindowExpression.hasWindowExpression
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand Down Expand Up @@ -147,7 +149,7 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
&& pOriginal.projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
val p @ Project(projectList, child) = pOriginal.mapChildren(apply0)
var aliasMap = AttributeMap.empty[AliasEntry]
val referencedAliases = collection.mutable.Set.empty[AliasEntry]
val referencedAliases = new LinkedHashSet[AliasEntry]
def unwrapLCAReference(e: NamedExpression): NamedExpression = {
e.transformWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.a) =>
Expand All @@ -156,7 +158,7 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
// and unwrap the LateralColumnAliasReference to the NamedExpression inside
// If there is chaining, don't resolve and save to future rounds
if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
referencedAliases += aliasEntry
referencedAliases.add(aliasEntry)
lcaRef.ne
} else {
lcaRef
Expand All @@ -182,7 +184,7 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
val outerProjectList = collection.mutable.Seq(newProjectList: _*)
val innerProjectList =
collection.mutable.ArrayBuffer(child.output.map(_.asInstanceOf[NamedExpression]): _*)
referencedAliases.foreach { case AliasEntry(alias: Alias, idx) =>
referencedAliases.forEach { case AliasEntry(alias: Alias, idx) =>
outerProjectList.update(idx, alias.toAttribute)
innerProjectList += alias
}
Expand Down

0 comments on commit af53ee4

Please sign in to comment.