1
0

[HUDI-3889] Do not validate table config if save mode is set to Overwrite (#5619)

Co-authored-by: xicm <xicm@asiainfo.com>
This commit is contained in:
xi chaomin
2022-06-10 07:23:51 +08:00
committed by GitHub
parent ba47904fa2
commit 2b3a85528a
3 changed files with 65 additions and 33 deletions

View File

@@ -83,9 +83,9 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters) val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator( val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, parameters) originKeyGeneratorClassName, parameters)
@@ -408,9 +408,9 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE) val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH, val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -734,14 +734,14 @@ object HoodieSparkSqlWriter {
} }
private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String], private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = { tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = {
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams) val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions) val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
&& mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) { && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key) mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
} }
if (null != tableConfig) { if (null != tableConfig && mode != SaveMode.Overwrite) {
tableConfig.getProps.foreach { case (key, value) => tableConfig.getProps.foreach { case (key, value) =>
mergedParams(key) = value mergedParams(key) = value
} }

View File

@@ -118,47 +118,56 @@ object HoodieWriterUtils {
} }
} }
def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieConfig): Unit = {
validateTableConfig(spark, params, tableConfig, false)
}
/** /**
* Detects conflicts between new parameters and existing table configurations * Detects conflicts between new parameters and existing table configurations
*/ */
def validateTableConfig(spark: SparkSession, params: Map[String, String], def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieConfig): Unit = { tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = {
val resolver = spark.sessionState.conf.resolver // If Overwrite is set as save mode, we don't need to do table config validation.
val diffConfigs = StringBuilder.newBuilder if (!isOverWriteMode) {
params.foreach { case (key, value) => val resolver = spark.sessionState.conf.resolver
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key) val diffConfigs = StringBuilder.newBuilder
if (null != existingValue && !resolver(existingValue, value)) { params.foreach { case (key, value) =>
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n") val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
}
} }
}
if (null != tableConfig) { if (null != tableConfig) {
val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null) val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS) val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
if (null != datasourceRecordKey && null != tableConfigRecordKey if (null != datasourceRecordKey && null != tableConfigRecordKey
&& datasourceRecordKey != tableConfigRecordKey) { && datasourceRecordKey != tableConfigRecordKey) {
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n") diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
} }
val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null) val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD) val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
&& datasourcePreCombineKey != tableConfigPreCombineKey) { && datasourcePreCombineKey != tableConfigPreCombineKey) {
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n") diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
} }
val datasourceKeyGen = getOriginKeyGenerator(params) val datasourceKeyGen = getOriginKeyGenerator(params)
val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
if (null != datasourceKeyGen && null != tableConfigKeyGen if (null != datasourceKeyGen && null != tableConfigKeyGen
&& datasourceKeyGen != tableConfigKeyGen) { && datasourceKeyGen != tableConfigKeyGen) {
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n") diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
}
}
if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
} }
} }
if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
// Check schema evolution for bootstrap table. // Check schema evolution for bootstrap table.
// now we do not support bootstrap table. // now we do not support bootstrap table.
if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL) if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)

View File

@@ -272,6 +272,29 @@ class TestHoodieSparkSqlWriter {
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl")) assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
} }
/**
* Test case for Do not validate table config if save mode is set to Overwrite
*/
@Test
def testValidateTableConfigWithOverwriteSaveMode(): Unit = {
//create a new table
val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.datasource.write.recordkey.field" -> "uuid")
val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame)
//on same path try write with different RECORDKEY_FIELD_NAME and Append SaveMode should throw an exception
val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.datasource.write.recordkey.field" -> "ts")
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2))
assert(hoodieException.getMessage.contains("Config conflict"))
assert(hoodieException.getMessage.contains(s"RecordKey:\tts\tuuid"))
//on same path try write with different RECORDKEY_FIELD_NAME and Overwrite SaveMode should be successful.
assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier2, dataFrame2)._1)
}
/** /**
* Test case for each bulk insert sort mode * Test case for each bulk insert sort mode
* *