From 94806d5cf7650976892c24d7e77c3bba22cee53d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=91=A3=E5=8F=AF=E4=BC=A6?= Date: Mon, 14 Feb 2022 11:52:00 +0800 Subject: [PATCH] [HUDI-3272] If `mode==ignore && tableExists`, do not execute write logic and sync hive (#4632) --- .../apache/hudi/HoodieSparkSqlWriter.scala | 75 ++++++++++--------- .../hudi/TestHoodieSparkSqlWriter.scala | 6 ++ 2 files changed, 45 insertions(+), 36 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e2ba3d595..510102af7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -377,51 +377,54 @@ object HoodieSparkSqlWriter { schema = HoodieAvroUtils.getNullSchema.toString } - // Handle various save modes if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") + if (!hoodieWriteClient.isEmpty) { + hoodieWriteClient.get.close() + } false } else { + // Handle various save modes handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs) - } - if (!tableExists) { - val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) - val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) - val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) - val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) - val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean - val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) + if (!tableExists) { + val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) + val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) + val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) + val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean + val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) - HoodieTableMetaClient.withPropertyBuilder() - .setTableType(HoodieTableType.valueOf(tableType)) - .setTableName(tableName) - .setRecordKeyFields(recordKeyFields) - .setArchiveLogFolder(archiveLogFolder) - .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)) - .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) - .setBootstrapIndexClass(bootstrapIndexClass) - .setBaseFileFormat(baseFileFormat) - .setBootstrapBasePath(bootstrapBasePath) - .setPartitionFields(partitionColumns) - .setPopulateMetaFields(populateMetaFields) - .setKeyGeneratorClassProp(keyGenProp) - .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) - .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) - .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) - .initTable(sparkContext.hadoopConfiguration, path) - } + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.valueOf(tableType)) + .setTableName(tableName) + .setRecordKeyFields(recordKeyFields) + .setArchiveLogFolder(archiveLogFolder) + .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)) + .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) + .setBootstrapIndexClass(bootstrapIndexClass) + .setBaseFileFormat(baseFileFormat) + .setBootstrapBasePath(bootstrapBasePath) + .setPartitionFields(partitionColumns) + .setPopulateMetaFields(populateMetaFields) + .setKeyGeneratorClassProp(keyGenProp) + .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) + .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) + .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) + .initTable(sparkContext.hadoopConfiguration, path) + } - val jsc = new JavaSparkContext(sqlContext.sparkContext) - val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, - schema, path, tableName, mapAsJavaMap(parameters))) - try { - writeClient.bootstrap(org.apache.hudi.common.util.Option.empty()) - } finally { - writeClient.close() + val jsc = new JavaSparkContext(sqlContext.sparkContext) + val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, + schema, path, tableName, mapAsJavaMap(parameters))) + try { + writeClient.bootstrap(org.apache.hudi.common.util.Option.empty()) + } finally { + writeClient.close() + } + val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema) + metaSyncSuccess } - val metaSyncSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema) - metaSyncSuccess } def bulkInsertAsRow(sqlContext: SQLContext, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index bd520c91f..69a0d1e89 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -543,6 +543,12 @@ class TestHoodieSparkSqlWriter { // Verify that HoodieWriteClient is closed correctly verify(client, times(1)).close() + + val ignoreResult = HoodieSparkSqlWriter.bootstrap(sqlContext, SaveMode.Ignore, fooTableModifier, spark.emptyDataFrame, Option.empty, + Option(client)) + assertFalse(ignoreResult) + verify(client, times(2)).close() + // fetch all records from parquet files generated from write to hudi val actualDf = sqlContext.read.parquet(tempBasePath) assert(actualDf.count == 100)