1
0

[HUDI-3726] Switching from non-partitioned to partitioned key gen does not throw any exception (#5205)

This commit is contained in:
rkkalluri
2022-04-06 12:35:32 -05:00
committed by GitHub
parent ca273274b0
commit 939b3d1b07
3 changed files with 168 additions and 14 deletions

View File

@@ -40,6 +40,8 @@ import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRo
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -52,11 +54,6 @@ import org.apache.spark.sql._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.SparkContext
import java.util.Properties
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -93,6 +90,8 @@ object HoodieSparkSqlWriter {
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
//validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim

View File

@@ -18,14 +18,16 @@
package org.apache.hudi
import java.util.Properties
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
@@ -167,6 +169,29 @@ object HoodieWriterUtils {
}
}
/**
* Detects conflicts between datasourceKeyGen and existing table configuration keyGen
*/
def validateKeyGeneratorConfig(datasourceKeyGen: String, tableConfig: HoodieConfig): Unit = {
val diffConfigs = StringBuilder.newBuilder
if (null != tableConfig) {
val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
if (null != tableConfigKeyGen && null != datasourceKeyGen) {
val nonPartitionedTableConfig = tableConfigKeyGen.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
val simpleKeyDataSourceConfig = datasourceKeyGen.equals(classOf[SimpleKeyGenerator].getCanonicalName)
if (nonPartitionedTableConfig && simpleKeyDataSourceConfig) {
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
}
}
}
if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
}
private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = {
if (null == tableConfig) {
null

View File

@@ -17,11 +17,13 @@
package org.apache.hudi
import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -29,7 +31,6 @@ import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.hudi.functional.TestBootstrap
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.api.java.JavaSparkContext
@@ -37,7 +38,6 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -46,11 +46,8 @@ import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Assertions.assertThrows
import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, intercept}
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept}
import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters
@@ -887,6 +884,139 @@ class TestHoodieSparkSqlWriter {
assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16")
}
@Test
def testNonpartitonedToDefaultKeyGen(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
)
// case 1: When commit C1 specificies a key generator and commit C2 does not specify key generator
val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")
// the first write need to specify KEYGENERATOR_CLASS_NAME params
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Overwrite).save(tablePath1)
val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// raise exception when no KEYGENERATOR_CLASS_NAME is specified and it is expected to default to SimpleKeyGenerator
val configConflictException = intercept[HoodieException] {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Append).save(tablePath1)
}
assert(configConflictException.getMessage.contains("Config conflict"))
assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[SimpleKeyGenerator].getName}\t${classOf[NonpartitionedKeyGenerator].getName}"))
}
@Test
def testDefaultKeyGenToNonpartitoned(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
)
// case 1: When commit C1 does not specify key generator and commit C2 specificies a key generator
val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")
// the first write need to specify KEYGENERATOR_CLASS_NAME params
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Overwrite).save(tablePath1)
val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// raise exception when NonpartitionedKeyGenerator is specified
val configConflictException = intercept[HoodieException] {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Append).save(tablePath1)
}
assert(configConflictException.getMessage.contains("Config conflict"))
assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[NonpartitionedKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
}
@Test
def testNoKeyGenToSimpleKeyGen(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
)
// case 1: When commit C1 specificies a key generator and commkt C2 does not specify key generator
val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")
// the first write need to specify KEYGENERATOR_CLASS_NAME params
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Overwrite).save(tablePath1)
val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// No Exception Should be raised
try {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.mode(SaveMode.Append).save(tablePath1)
} catch {
case _ => fail("Switching from no keygen to explicit SimpleKeyGenerator should not fail");
}
}
@Test
def testSimpleKeyGenToNoKeyGen(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
)
// case 1: When commit C1 specificies a key generator and commkt C2 does not specify key generator
val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")
// the first write need to specify KEYGENERATOR_CLASS_NAME params
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.mode(SaveMode.Overwrite).save(tablePath1)
val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// No Exception Should be raised when default keygen is used
try {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Append).save(tablePath1)
} catch {
case _ => fail("Switching from explicit SimpleKeyGenerator to default keygen should not fail");
}
}
@Test
def testGetOriginKeyGenerator(): Unit = {
// for dataframe write