[HUDI-2343]Fix the exception for mergeInto when the primaryKey and preCombineField of source table and target table differ in case only (#3517)
This commit is contained in:
@@ -196,9 +196,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
|
|||||||
}
|
}
|
||||||
|
|
||||||
private def isEqualToTarget(targetColumnName: String, sourceExpression: Expression): Boolean = {
|
private def isEqualToTarget(targetColumnName: String, sourceExpression: Expression): Boolean = {
|
||||||
|
val sourceColNameMap = sourceDFOutput.map(attr => (attr.name.toLowerCase, attr.name)).toMap
|
||||||
|
|
||||||
sourceExpression match {
|
sourceExpression match {
|
||||||
case attr: AttributeReference if attr.name.equalsIgnoreCase(targetColumnName) => true
|
case attr: AttributeReference if sourceColNameMap(attr.name.toLowerCase).equals(targetColumnName) => true
|
||||||
case Cast(attr: AttributeReference, _, _) if attr.name.equalsIgnoreCase(targetColumnName) => true
|
case Cast(attr: AttributeReference, _, _) if sourceColNameMap(attr.name.toLowerCase).equals(targetColumnName) => true
|
||||||
case _=> false
|
case _=> false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -375,4 +375,73 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test MergeInto When PrimaryKey And PreCombineField Of Source Table And Target Table Differ In Case Only") {
|
||||||
|
withTempDir { tmp =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
// Create table
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| price double,
|
||||||
|
| ts long
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| options (
|
||||||
|
| primaryKey ='id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| merge into $tableName
|
||||||
|
| using (
|
||||||
|
| select 1 as ID, 'a1' as NAME, 10 as PRICE, 1000 as TS, '1' as FLAG
|
||||||
|
| ) s0
|
||||||
|
| on s0.ID = $tableName.id
|
||||||
|
| when matched and FLAG = '1' then update set
|
||||||
|
| id = s0.ID, name = s0.NAME, price = s0.PRICE, ts = s0.TS
|
||||||
|
| when not matched and FLAG = '1' then insert *
|
||||||
|
|""".stripMargin)
|
||||||
|
checkAnswer(s"select id, name, price, ts from $tableName")(
|
||||||
|
Seq(1, "a1", 10.0, 1000)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test the case of the column names of condition and action is different from that of source table
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| merge into $tableName
|
||||||
|
| using (
|
||||||
|
| select 1 as ID, 'a1' as NAME, 11 as PRICE, 1001 as TS, '1' as FLAG
|
||||||
|
| ) s0
|
||||||
|
| on s0.id = $tableName.id
|
||||||
|
| when matched and FLAG = '1' then update set
|
||||||
|
| id = s0.id, name = s0.NAME, price = s0.PRICE, ts = s0.ts
|
||||||
|
| when not matched and FLAG = '1' then insert *
|
||||||
|
|""".stripMargin)
|
||||||
|
checkAnswer(s"select id, name, price, ts from $tableName")(
|
||||||
|
Seq(1, "a1", 11.0, 1001)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test the case of the column names of cast condition is different from that of source table
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| merge into $tableName
|
||||||
|
| using (
|
||||||
|
| select 2 as ID, 'a2' as NAME, 12 as PRICE, 1002 as TS, '1' as FLAG
|
||||||
|
| ) s0
|
||||||
|
| on cast(s0.id as int) = $tableName.id
|
||||||
|
| when matched and FLAG = '1' then update set
|
||||||
|
| id = s0.id, name = s0.NAME, price = s0.PRICE, ts = s0.ts
|
||||||
|
| when not matched and FLAG = '1' then insert *
|
||||||
|
|""".stripMargin)
|
||||||
|
checkAnswer(s"select id, name, price, ts from $tableName")(
|
||||||
|
Seq(1, "a1", 11.0, 1001),
|
||||||
|
Seq(2, "a2", 12.0, 1002)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user