From b7da6cb33d27002525e40913dd63f077aeba26f0 Mon Sep 17 00:00:00 2001 From: liujinhui <965147871@qq.com> Date: Sat, 14 Aug 2021 14:53:39 +0800 Subject: [PATCH] [HUDI-2307] When using delete_partition with ds should not rely on the primary key (#3469) - Co-authored-by: Sivabalan Narayanan --- .../org/apache/hudi/DataSourceOptions.scala | 5 + .../apache/hudi/HoodieSparkSqlWriter.scala | 11 +- .../hudi/HoodieSparkSqlWriterSuite.scala | 109 ++++++++++-------- style/scalastyle.xml | 2 +- 4 files changed, 72 insertions(+), 55 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index a534ac569..36c049393 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -325,6 +325,11 @@ object DataSourceWriteOptions { @Deprecated val INSERT_DROP_DUPS_OPT_KEY = INSERT_DROP_DUPS.key() + val PARTITIONS_TO_DELETE: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.write.partitions.to.delete") + .noDefaultValue() + .withDocumentation("Comma separated list of partitions to delete") + val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.streaming.retry.count") .defaultValue("3") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f41df94db..2c8d33e56 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -29,8 +29,8 @@ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.table.TableSchemaResolver +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS} @@ -192,7 +192,12 @@ object HoodieSparkSqlWriter { } // Get list of partitions to delete - val partitionsToDelete = genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect() + val partitionsToDelete = if (parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) { + val partitionColsToDelete = parameters.get(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).get.split(",") + java.util.Arrays.asList(partitionColsToDelete:_*) + } else { + genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect() + } // Create a HoodieWriteClient & issue the delete. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path.get, tblName, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala index e767f8ad9..1ff30e2b6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.config.HoodieConfig -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.testutils.HoodieTestDataGenerator @@ -679,61 +679,68 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } - test("test delete partitions") { - initSparkContext("test_delete_partitions") - val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions") - try { - val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions" - val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) - val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) - val schema = DataSourceTestUtils.getStructTypeExampleSchema - val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) - val records = DataSourceTestUtils.generateRandomRows(10) - val recordsSeq = convertRowListToSeq(records) - val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) + List(true, false) + .foreach(usePartitionsToDeleteConfig => { + test("test delete partitions for " + usePartitionsToDeleteConfig) { + initSparkContext("test_delete_partitions_" + usePartitionsToDeleteConfig) + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions") + try { + val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions" + val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + var fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val records = DataSourceTestUtils.generateRandomRows(10) + val recordsSeq = convertRowListToSeq(records) + val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) - val snapshotDF1 = spark.read.format("org.apache.hudi") - .load(path.toAbsolutePath.toString + "/*/*/*/*") - assertEquals(10, snapshotDF1.count()) - // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf1 = dropMetaFields(snapshotDF1) - assert(df1.except(trimmedDf1).count() == 0) + val snapshotDF1 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(10, snapshotDF1.count()) + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf1 = dropMetaFields(snapshotDF1) + assert(df1.except(trimmedDf1).count() == 0) - // issue updates so that log files are created for MOR table - var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5)) - var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) - // write updates to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) - val snapshotDF2 = spark.read.format("org.apache.hudi") - .load(path.toAbsolutePath.toString + "/*/*/*/*") - assertEquals(10, snapshotDF2.count()) + // issue updates so that log files are created for MOR table + var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5)) + var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) + // write updates to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) + val snapshotDF2 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(10, snapshotDF2.count()) - // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf2 = dropMetaFields(snapshotDF2) - // ensure 2nd batch of updates matches. - assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf2 = dropMetaFields(snapshotDF2) + // ensure 2nd batch of updates matches. + assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) - // delete partitions - val recordsToDelete = df1.filter(entry => { - val partitionPath : String = entry.getString(1) - partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) - }) - val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name()) - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete) + if ( usePartitionsToDeleteConfig) { + fooTableParams.updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + } + // delete partitions contains the primary key + val recordsToDelete = df1.filter(entry => { + val partitionPath : String = entry.getString(1) + partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || + partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) + }) + val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name()) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete) - val snapshotDF3 = spark.read.format("org.apache.hudi") - .load(path.toAbsolutePath.toString + "/*/*/*/*") - assertEquals(0, snapshotDF3.filter(entry => { - val partitionPath = entry.getString(3) - !partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) - }).count()) - } finally { - spark.stop() - FileUtils.deleteDirectory(path.toFile) - } - } + val snapshotDF3 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(0, snapshotDF3.filter(entry => { + val partitionPath = entry.getString(3) + !partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) + }).count()) + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + }) def dropMetaFields(df: Dataset[Row]) : Dataset[Row] = { df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) diff --git a/style/scalastyle.xml b/style/scalastyle.xml index fa2c4d314..89306f36e 100644 --- a/style/scalastyle.xml +++ b/style/scalastyle.xml @@ -27,7 +27,7 @@ - +