[HUDI-3933] Add UT cases to cover different key gen (#5638)
This commit is contained in:
@@ -26,14 +26,14 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
|
|||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||||
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
|
import org.apache.hudi.keygen.TimestampBasedKeyGenerator
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
|
||||||
import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator}
|
|
||||||
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
|
||||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.functions.{col, lit}
|
import org.apache.spark.sql.functions.{col, lit}
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
|
||||||
import org.junit.jupiter.api.{Disabled, Tag}
|
import org.junit.jupiter.api.Tag
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
|
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
|
||||||
|
|
||||||
@@ -51,31 +51,33 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
|
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
|
||||||
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
|
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
|
||||||
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
|
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
|
||||||
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
|
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
|
||||||
|
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false"
|
||||||
)
|
)
|
||||||
|
|
||||||
val verificationCol: String = "driver"
|
val verificationCol: String = "driver"
|
||||||
val updatedVerificationVal: String = "driver_update"
|
val updatedVerificationVal: String = "driver_update"
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@CsvSource(Array(
|
@CsvSource(value = Array(
|
||||||
"true,org.apache.hudi.keygen.SimpleKeyGenerator",
|
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
|
||||||
"true,org.apache.hudi.keygen.ComplexKeyGenerator",
|
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes",
|
||||||
"true,org.apache.hudi.keygen.TimestampBasedKeyGenerator",
|
"true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
|
||||||
"false,org.apache.hudi.keygen.SimpleKeyGenerator",
|
"false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
|
||||||
"false,org.apache.hudi.keygen.ComplexKeyGenerator",
|
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes",
|
||||||
"false,org.apache.hudi.keygen.TimestampBasedKeyGenerator"
|
"false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
|
||||||
))
|
), delimiter = '|')
|
||||||
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String): Unit = {
|
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String): Unit = {
|
||||||
commonOpts += DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass
|
var options: Map[String, String] = commonOpts +
|
||||||
if (classOf[ComplexKeyGenerator].getName.equals(keyGenClass)) {
|
(HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) +
|
||||||
commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key, pii_col"
|
(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
|
||||||
}
|
(DataSourceWriteOptions.RECORDKEY_FIELD.key() -> recordKeys)
|
||||||
if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
|
val isTimestampBasedKeyGen: Boolean = classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
|
||||||
commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
|
if (isTimestampBasedKeyGen) {
|
||||||
commonOpts += DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "current_ts"
|
options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
|
||||||
commonOpts += Config.TIMESTAMP_TYPE_FIELD_PROP -> "EPOCHMILLISECONDS"
|
options += Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING"
|
||||||
commonOpts += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
|
options += Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd"
|
||||||
|
options += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
|
||||||
}
|
}
|
||||||
val dataGen = new HoodieTestDataGenerator(0xDEED)
|
val dataGen = new HoodieTestDataGenerator(0xDEED)
|
||||||
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
|
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
|
||||||
@@ -83,14 +85,12 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
|
val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
|
||||||
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
|
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
|
||||||
inputDF0.write.format("org.apache.hudi")
|
inputDF0.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(options)
|
||||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||||
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
|
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||||
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
|
||||||
|
|
||||||
// Snapshot query
|
// Snapshot query
|
||||||
val snapshotDF1 = spark.read.format("org.apache.hudi")
|
val snapshotDF1 = spark.read.format("org.apache.hudi")
|
||||||
@@ -102,7 +102,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||||
val verificationRowKey = inputDF1.limit(1).select("_row_key").first.getString(0)
|
val verificationRowKey = inputDF1.limit(1).select("_row_key").first.getString(0)
|
||||||
var updateDf: DataFrame = null
|
var updateDf: DataFrame = null
|
||||||
if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
|
if (isTimestampBasedKeyGen) {
|
||||||
// update current_ts to be same as original record so that partition path does not change with timestamp based key gen
|
// update current_ts to be same as original record so that partition path does not change with timestamp based key gen
|
||||||
val originalRow = snapshotDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0)
|
val originalRow = snapshotDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0)
|
||||||
updateDf = inputDF1.filter(col("_row_key") === verificationRowKey)
|
updateDf = inputDF1.filter(col("_row_key") === verificationRowKey)
|
||||||
@@ -116,8 +116,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
updateDf.write.format("org.apache.hudi")
|
updateDf.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(options)
|
||||||
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
|
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
|
||||||
@@ -132,7 +131,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val records2 = recordsToStrings(dataGen.generateUpdates("002", 100)).toList
|
val records2 = recordsToStrings(dataGen.generateUpdates("002", 100)).toList
|
||||||
var inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
var inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||||
|
|
||||||
if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
|
if (isTimestampBasedKeyGen) {
|
||||||
// incase of Timestamp based key gen, current_ts should not be updated. but dataGen.generateUpdates() would have updated
|
// incase of Timestamp based key gen, current_ts should not be updated. but dataGen.generateUpdates() would have updated
|
||||||
// the value of current_ts. So, we need to revert it back to original value.
|
// the value of current_ts. So, we need to revert it back to original value.
|
||||||
// here is what we are going to do. Copy values to temp columns, join with original df and update the current_ts
|
// here is what we are going to do. Copy values to temp columns, join with original df and update the current_ts
|
||||||
@@ -152,8 +151,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
|
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
|
||||||
|
|
||||||
inputDF2.write.format("org.apache.hudi")
|
inputDF2.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(options)
|
||||||
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
|
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
@@ -191,8 +189,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val emptyRecords = recordsToStrings(dataGen.generateUpdates("003", 0)).toList
|
val emptyRecords = recordsToStrings(dataGen.generateUpdates("003", 0)).toList
|
||||||
val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
|
val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
|
||||||
emptyDF.write.format("org.apache.hudi")
|
emptyDF.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(options)
|
||||||
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
|
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
@@ -211,9 +208,10 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
|
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
|
||||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
|
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
|
||||||
.option(DataSourceReadOptions.INCR_PATH_GLOB.key, "/2016/*/*/*")
|
.option(DataSourceReadOptions.INCR_PATH_GLOB.key, if (isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
|
||||||
.load(basePath)
|
.load(basePath)
|
||||||
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
|
assertEquals(hoodieIncViewDF2
|
||||||
|
.filter(col("_hoodie_partition_path").startsWith("2016")).count(), hoodieIncViewDF3.count())
|
||||||
|
|
||||||
val timeTravelDF = spark.read.format("org.apache.hudi")
|
val timeTravelDF = spark.read.format("org.apache.hudi")
|
||||||
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
|
||||||
|
|||||||
Reference in New Issue
Block a user