1
0

[HUDI-3272] If mode==ignore && tableExists, do not execute write logic and sync hive (#4632)

This commit is contained in:
董可伦
2022-02-14 11:52:00 +08:00
committed by GitHub
parent 93ee09fee8
commit 94806d5cf7
2 changed files with 45 additions and 36 deletions

View File

@@ -377,51 +377,54 @@ object HoodieSparkSqlWriter {
schema = HoodieAvroUtils.getNullSchema.toString schema = HoodieAvroUtils.getNullSchema.toString
} }
// Handle various save modes
if (mode == SaveMode.Ignore && tableExists) { if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
if (!hoodieWriteClient.isEmpty) {
hoodieWriteClient.get.close()
}
false false
} else { } else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs) handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)
}
if (!tableExists) { if (!tableExists) {
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.valueOf(tableType)) .setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName) .setTableName(tableName)
.setRecordKeyFields(recordKeyFields) .setRecordKeyFields(recordKeyFields)
.setArchiveLogFolder(archiveLogFolder) .setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)) .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
.setBootstrapIndexClass(bootstrapIndexClass) .setBootstrapIndexClass(bootstrapIndexClass)
.setBaseFileFormat(baseFileFormat) .setBaseFileFormat(baseFileFormat)
.setBootstrapBasePath(bootstrapBasePath) .setBootstrapBasePath(bootstrapBasePath)
.setPartitionFields(partitionColumns) .setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields) .setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(keyGenProp) .setKeyGeneratorClassProp(keyGenProp)
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path) .initTable(sparkContext.hadoopConfiguration, path)
} }
val jsc = new JavaSparkContext(sqlContext.sparkContext) val jsc = new JavaSparkContext(sqlContext.sparkContext)
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schema, path, tableName, mapAsJavaMap(parameters))) schema, path, tableName, mapAsJavaMap(parameters)))
try { try {
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty()) writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
} finally { } finally {
writeClient.close() writeClient.close()
}
val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
metaSyncSuccess
} }
val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
metaSyncSuccess
} }
def bulkInsertAsRow(sqlContext: SQLContext, def bulkInsertAsRow(sqlContext: SQLContext,

View File

@@ -543,6 +543,12 @@ class TestHoodieSparkSqlWriter {
// Verify that HoodieWriteClient is closed correctly // Verify that HoodieWriteClient is closed correctly
verify(client, times(1)).close() verify(client, times(1)).close()
val ignoreResult = HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Ignore, fooTableModifier, spark.emptyDataFrame, Option.empty,
Option(client))
assertFalse(ignoreResult)
verify(client, times(2)).close()
// fetch all records from parquet files generated from write to hudi // fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(tempBasePath) val actualDf = sqlContext.read.parquet(tempBasePath)
assert(actualDf.count == 100) assert(actualDf.count == 100)