Support referencing subquery with column aliases by table alias in merge into (#3380)
This commit is contained in:
@@ -175,7 +175,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
test("Test Merge With Complex Data Type") {
|
test("Test Merge With Complex Data Type") {
|
||||||
withTempDir{tmp =>
|
withTempDir { tmp =>
|
||||||
val tableName = generateTableName
|
val tableName = generateTableName
|
||||||
spark.sql(
|
spark.sql(
|
||||||
s"""
|
s"""
|
||||||
@@ -320,4 +320,59 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test MergeInto For Source Table With Column Aliases") {
|
||||||
|
withTempDir { tmp =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
// Create table
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| price double,
|
||||||
|
| ts long
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| options (
|
||||||
|
| primaryKey ='id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
// Merge with an extra input field 'flag' (insert a new record)
|
||||||
|
val mergeSql =
|
||||||
|
s"""
|
||||||
|
| merge into $tableName
|
||||||
|
| using (
|
||||||
|
| select 1, 'a1', 10, 1000, '1'
|
||||||
|
| ) s0(id,name,price,ts,flag)
|
||||||
|
| on s0.id = $tableName.id
|
||||||
|
| when matched and flag = '1' then update set
|
||||||
|
| id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
|
||||||
|
| when not matched and flag = '1' then insert *
|
||||||
|
|""".stripMargin
|
||||||
|
|
||||||
|
if (HoodieSqlUtils.isSpark3) {
|
||||||
|
checkException(mergeSql)(
|
||||||
|
"\nColumns aliases are not allowed in MERGE.(line 5, pos 5)\n\n" +
|
||||||
|
"== SQL ==\n\r\n" +
|
||||||
|
s" merge into $tableName\r\n" +
|
||||||
|
" using (\r\n" +
|
||||||
|
" select 1, 'a1', 10, 1000, '1'\r\n" +
|
||||||
|
" ) s0(id,name,price,ts,flag)\r\n" +
|
||||||
|
"-----^^^\n" +
|
||||||
|
s" on s0.id = $tableName.id\r\n" +
|
||||||
|
" when matched and flag = '1' then update set\r\n" +
|
||||||
|
" id = s0.id, name = s0.name, price = s0.price, ts = s0.ts\r\n" +
|
||||||
|
" when not matched and flag = '1' then insert *\r\n"
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
spark.sql(mergeSql)
|
||||||
|
checkAnswer(s"select id, name, price, ts from $tableName")(
|
||||||
|
Seq(1, "a1", 10.0, 1000)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -185,12 +185,12 @@ class HoodieSpark2ExtendedSqlAstBuilder(conf: SQLConf, delegate: ParserInterface
|
|||||||
*/
|
*/
|
||||||
protected def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = {
|
protected def mayApplyAliasPlan(tableAlias: TableAliasContext, plan: LogicalPlan): LogicalPlan = {
|
||||||
if (tableAlias.strictIdentifier != null) {
|
if (tableAlias.strictIdentifier != null) {
|
||||||
val subquery = SubqueryAlias(tableAlias.strictIdentifier.getText, plan)
|
val alias = tableAlias.strictIdentifier.getText
|
||||||
if (tableAlias.identifierList != null) {
|
if (tableAlias.identifierList != null) {
|
||||||
val columnNames = visitIdentifierList(tableAlias.identifierList)
|
val columnNames = visitIdentifierList(tableAlias.identifierList)
|
||||||
UnresolvedSubqueryColumnAliases(columnNames, subquery)
|
SubqueryAlias(alias, UnresolvedSubqueryColumnAliases(columnNames, plan))
|
||||||
} else {
|
} else {
|
||||||
subquery
|
SubqueryAlias(alias, plan)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
plan
|
plan
|
||||||
|
|||||||
Reference in New Issue
Block a user