[HUDI-2214]residual temporary files after clustering are not cleaned up (#3335)
This commit is contained in:
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
|
||||
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
|
||||
@@ -700,4 +701,28 @@ class TestMORDataSource extends HoodieClientTestBase {
|
||||
// Test read log only mor table.
|
||||
assertEquals(20, spark.read.format("hudi").load(basePath).count())
|
||||
}
|
||||
|
||||
@Test
|
||||
def testTempFilesCleanForClustering(): Unit = {
|
||||
val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList
|
||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||
// option for clustering
|
||||
.option("hoodie.parquet.small.file.limit", "0")
|
||||
.option("hoodie.clustering.inline", "true")
|
||||
.option("hoodie.clustering.inline.max.commits", "1")
|
||||
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
|
||||
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
|
||||
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
|
||||
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(12 *1024 * 1024L))
|
||||
.option("hoodie.clustering.plan.strategy.sort.columns", "begin_lat, begin_lon")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
val tempPath = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)
|
||||
val fs = tempPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
|
||||
assertEquals(true, fs.listStatus(tempPath).isEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user