From 5353243449ea16c415b4fb5517eb6d2e2723ba76 Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Tue, 27 Jul 2021 01:26:20 +0800 Subject: [PATCH] [HUDI-2214]residual temporary files after clustering are not cleaned up (#3335) --- .../hudi/client/SparkRDDWriteClient.java | 3 ++- .../hudi/functional/TestMORDataSource.scala | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index d7fa218a8..2a9cee16f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -53,6 +53,7 @@ import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.SparkCompactHelpers; import org.apache.hudi.table.upgrade.AbstractUpgradeDowngrade; @@ -370,7 +371,7 @@ public class SparkRDDWriteClient extends } catch (IOException e) { throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); } - + new MarkerFiles(table, clusteringCommitTime).quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); if (clusteringTimer != null) { long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); try { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 1028eefd2..1ad35f8fd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} @@ -700,4 +701,28 @@ class TestMORDataSource extends HoodieClientTestBase { // Test read log only mor table. assertEquals(20, spark.read.format("hudi").load(basePath).count()) } + + @Test + def testTempFilesCleanForClustering(): Unit = { + val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + // option for clustering + .option("hoodie.parquet.small.file.limit", "0") + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") + .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") + .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(12 *1024 * 1024L)) + .option("hoodie.clustering.plan.strategy.sort.columns", "begin_lat, begin_lon") + .mode(SaveMode.Overwrite) + .save(basePath) + val tempPath = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME) + val fs = tempPath.getFileSystem(spark.sparkContext.hadoopConfiguration) + assertEquals(true, fs.listStatus(tempPath).isEmpty) + } }