[HUDI-4051] Allow nested field as primary key and preCombineField in spark sql (#5517)
* [HUDI-4051] Allow nested field as preCombineField in spark sql * relax validation for primary key
This commit is contained in:
@@ -18,6 +18,7 @@
|
|||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
import org.apache.hudi.DataSourceWriteOptions
|
import org.apache.hudi.DataSourceWriteOptions
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
|
||||||
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
|
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig
|
import org.apache.hudi.common.table.HoodieTableConfig
|
||||||
import org.apache.hudi.common.util.ValidationUtils
|
import org.apache.hudi.common.util.ValidationUtils
|
||||||
@@ -198,14 +199,14 @@ object HoodieOptionConfig {
|
|||||||
.map(_.split(",").filter(_.length > 0))
|
.map(_.split(",").filter(_.length > 0))
|
||||||
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
|
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
|
||||||
primaryKeys.get.foreach { primaryKey =>
|
primaryKeys.get.foreach { primaryKey =>
|
||||||
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, primaryKey)),
|
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))),
|
||||||
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
|
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate preCombine key
|
// validate preCombine key
|
||||||
val preCombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
|
val preCombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName)
|
||||||
if (preCombineKey.isDefined && preCombineKey.get.nonEmpty) {
|
if (preCombineKey.isDefined && preCombineKey.get.nonEmpty) {
|
||||||
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, preCombineKey.get)),
|
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(preCombineKey.get))),
|
||||||
s"Can't find preCombineKey `${preCombineKey.get}` in ${schema.treeString}.")
|
s"Can't find preCombineKey `${preCombineKey.get}` in ${schema.treeString}.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -663,4 +663,37 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test nested field as primaryKey and preCombineField") {
|
||||||
|
withTempDir { tmp =>
|
||||||
|
Seq("cow", "mor").foreach { tableType =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
// create table
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| name string,
|
||||||
|
| price double,
|
||||||
|
| ts long,
|
||||||
|
| nestedcol struct<a1:string, a2:struct<b1:string, b2:struct<c1:string, c2:int>>>
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| options (
|
||||||
|
| type = '$tableType',
|
||||||
|
| primaryKey = 'nestedcol.a1',
|
||||||
|
| preCombineField = 'nestedcol.a2.b2.c2'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
// insert data to table
|
||||||
|
spark.sql(
|
||||||
|
s"""insert into $tableName values
|
||||||
|
|('name_1', 10, 1000, struct('a', struct('b', struct('c', 999)))),
|
||||||
|
|('name_2', 20, 2000, struct('a', struct('b', struct('c', 333))))
|
||||||
|
|""".stripMargin)
|
||||||
|
checkAnswer(s"select name, price, ts, nestedcol.a1, nestedcol.a2.b2.c2 from $tableName")(
|
||||||
|
Seq("name_1", 10.0, 1000, "a", 999)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user