[HUDI-3972] Fixing hoodie.properties/tableConfig for no preCombine field with writes (#5424)
Fixed instantiation of new table to set the null for preCombine if not explicitly set by the user.
This commit is contained in:
committed by
GitHub
parent
f2ba0fead2
commit
762623a15c
@@ -150,7 +150,9 @@ object HoodieSparkSqlWriter {
|
||||
.setBaseFileFormat(baseFileFormat)
|
||||
.setArchiveLogFolder(archiveLogFolder)
|
||||
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
|
||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
|
||||
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
|
||||
// but we are interested in what user has set, hence fetching from optParams.
|
||||
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
|
||||
.setPartitionFields(partitionColumns)
|
||||
.setPopulateMetaFields(populateMetaFields)
|
||||
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
|
||||
|
||||
@@ -96,6 +96,26 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||
}
|
||||
|
||||
@Test def testNoPrecombine() {
|
||||
// Insert Operation
|
||||
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
|
||||
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
|
||||
|
||||
val commonOptsNoPreCombine = Map(
|
||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||
"hoodie.upsert.shuffle.parallelism" -> "4",
|
||||
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
|
||||
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
|
||||
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
|
||||
)
|
||||
inputDF.write.format("hudi")
|
||||
.options(commonOptsNoPreCombine)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
spark.read.format("org.apache.hudi").load(basePath).count()
|
||||
}
|
||||
|
||||
@Test def testHoodieIsDeletedNonBooleanField() {
|
||||
// Insert Operation
|
||||
|
||||
@@ -489,6 +489,28 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
|
||||
hudiSnapshotDF2.show(1)
|
||||
}
|
||||
|
||||
@Test def testNoPrecombine() {
|
||||
// Insert Operation
|
||||
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
|
||||
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
|
||||
|
||||
val commonOptsNoPreCombine = Map(
|
||||
"hoodie.insert.shuffle.parallelism" -> "4",
|
||||
"hoodie.upsert.shuffle.parallelism" -> "4",
|
||||
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
|
||||
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
|
||||
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
|
||||
)
|
||||
inputDF.write.format("hudi")
|
||||
.options(commonOptsNoPreCombine)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key(), "MERGE_ON_READ")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
|
||||
spark.read.format("org.apache.hudi").load(basePath).count()
|
||||
}
|
||||
|
||||
@Test
|
||||
def testPreCombineFiledForReadMOR(): Unit = {
|
||||
writeData((1, "a0", 10, 100, false))
|
||||
|
||||
@@ -263,83 +263,100 @@ class TestCreateTable extends TestHoodieSqlBase {
|
||||
|
||||
test("Test Create Table As Select") {
|
||||
withTempDir { tmp =>
|
||||
// Create Non-Partitioned table
|
||||
val tableName1 = generateTableName
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName1 using hudi
|
||||
| tblproperties(primaryKey = 'id')
|
||||
| location '${tmp.getCanonicalPath}/$tableName1'
|
||||
| AS
|
||||
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
|
||||
Seq("cow", "mor").foreach { tableType =>
|
||||
// Create Non-Partitioned table
|
||||
val tableName1 = generateTableName
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName1 using hudi
|
||||
| tblproperties(
|
||||
| primaryKey = 'id',
|
||||
| type = '$tableType'
|
||||
| )
|
||||
| location '${tmp.getCanonicalPath}/$tableName1'
|
||||
| AS
|
||||
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
|
||||
""".stripMargin)
|
||||
checkAnswer(s"select id, name, price, ts from $tableName1")(
|
||||
Seq(1, "a1", 10.0, 1000)
|
||||
)
|
||||
checkAnswer(s"select id, name, price, ts from $tableName1")(
|
||||
Seq(1, "a1", 10.0, 1000)
|
||||
)
|
||||
|
||||
// Create Partitioned table
|
||||
val tableName2 = generateTableName
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName2 using hudi
|
||||
| partitioned by (dt)
|
||||
| tblproperties(primaryKey = 'id')
|
||||
| location '${tmp.getCanonicalPath}/$tableName2'
|
||||
| AS
|
||||
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
|
||||
// Create Partitioned table
|
||||
val tableName2 = generateTableName
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName2 using hudi
|
||||
| partitioned by (dt)
|
||||
| tblproperties(
|
||||
| primaryKey = 'id',
|
||||
| type = '$tableType'
|
||||
| )
|
||||
| location '${tmp.getCanonicalPath}/$tableName2'
|
||||
| AS
|
||||
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
|
||||
""".stripMargin
|
||||
)
|
||||
checkAnswer(s"select id, name, price, dt from $tableName2")(
|
||||
Seq(1, "a1", 10, "2021-04-01")
|
||||
)
|
||||
)
|
||||
checkAnswer(s"select id, name, price, dt from $tableName2")(
|
||||
Seq(1, "a1", 10, "2021-04-01")
|
||||
)
|
||||
|
||||
// Create Partitioned table with timestamp data type
|
||||
val tableName3 = generateTableName
|
||||
// CTAS failed with null primaryKey
|
||||
assertThrows[Exception] {
|
||||
// Create Partitioned table with timestamp data type
|
||||
val tableName3 = generateTableName
|
||||
// CTAS failed with null primaryKey
|
||||
assertThrows[Exception] {
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName3 using hudi
|
||||
| partitioned by (dt)
|
||||
| tblproperties(
|
||||
| primaryKey = 'id',
|
||||
| type = '$tableType'
|
||||
| )
|
||||
| location '${tmp.getCanonicalPath}/$tableName3'
|
||||
| AS
|
||||
| select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt
|
||||
|
|
||||
""".stripMargin
|
||||
)
|
||||
}
|
||||
// Create table with timestamp type partition
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName3 using hudi
|
||||
| partitioned by (dt)
|
||||
| tblproperties(primaryKey = 'id')
|
||||
| tblproperties(
|
||||
| primaryKey = 'id',
|
||||
| type = '$tableType'
|
||||
| )
|
||||
| location '${tmp.getCanonicalPath}/$tableName3'
|
||||
| AS
|
||||
| select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt
|
||||
|
|
||||
""".stripMargin
|
||||
| select cast('2021-05-06 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as
|
||||
| price
|
||||
""".stripMargin
|
||||
)
|
||||
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")(
|
||||
Seq(1, "a1", 10, "2021-05-06 00:00:00")
|
||||
)
|
||||
// Create table with date type partition
|
||||
val tableName4 = generateTableName
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName4 using hudi
|
||||
| partitioned by (dt)
|
||||
| tblproperties(
|
||||
| primaryKey = 'id',
|
||||
| type = '$tableType'
|
||||
| )
|
||||
| location '${tmp.getCanonicalPath}/$tableName4'
|
||||
| AS
|
||||
| select cast('2021-05-06' as date) as dt, 1 as id, 'a1' as name, 10 as
|
||||
| price
|
||||
""".stripMargin
|
||||
)
|
||||
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")(
|
||||
Seq(1, "a1", 10, "2021-05-06")
|
||||
)
|
||||
}
|
||||
// Create table with timestamp type partition
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName3 using hudi
|
||||
| partitioned by (dt)
|
||||
| tblproperties(primaryKey = 'id')
|
||||
| location '${tmp.getCanonicalPath}/$tableName3'
|
||||
| AS
|
||||
| select cast('2021-05-06 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as
|
||||
| price
|
||||
""".stripMargin
|
||||
)
|
||||
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")(
|
||||
Seq(1, "a1", 10, "2021-05-06 00:00:00")
|
||||
)
|
||||
// Create table with date type partition
|
||||
val tableName4 = generateTableName
|
||||
spark.sql(
|
||||
s"""
|
||||
| create table $tableName4 using hudi
|
||||
| partitioned by (dt)
|
||||
| tblproperties(primaryKey = 'id')
|
||||
| location '${tmp.getCanonicalPath}/$tableName4'
|
||||
| AS
|
||||
| select cast('2021-05-06' as date) as dt, 1 as id, 'a1' as name, 10 as
|
||||
| price
|
||||
""".stripMargin
|
||||
)
|
||||
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")(
|
||||
Seq(1, "a1", 10, "2021-05-06")
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user