From 282aa68552e3872cf2758824de1d2efd004ae1c1 Mon Sep 17 00:00:00 2001 From: ForwardXu Date: Tue, 28 Dec 2021 09:38:26 +0800 Subject: [PATCH] [HUDI-3099] Purge drop partition for spark sql (#4436) --- .../catalyst/catalog/HoodieCatalogTable.scala | 5 + .../sql/hudi/analysis/HoodieAnalysis.scala | 4 +- ...AlterHoodieTableDropPartitionCommand.scala | 74 +++++++--- .../hudi/command/DropHoodieTableCommand.scala | 6 +- .../hudi/TestAlterTableDropPartition.scala | 133 +++++++++++++++++- 5 files changed, 191 insertions(+), 31 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index fdf1b062e..21a60087f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -148,6 +148,11 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten */ def getAllPartitionPaths: Seq[String] = HoodieSqlUtils.getAllPartitionPaths(spark, table) + /** + * Check if table is a partitioned table + */ + def isPartitionedTable: Boolean = table.partitionColumnNames.nonEmpty + /** * init hoodie table for create table (as select) */ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index a8e074603..09164f182 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -410,9 +410,9 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic if isHoodieTable(tableName, sparkSession) => DropHoodieTableCommand(tableName, ifExists, isView, purge) // Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand - case AlterTableDropPartitionCommand(tableName, specs, _, _, _) + case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) if isHoodieTable(tableName, sparkSession) => - AlterHoodieTableDropPartitionCommand(tableName, specs) + AlterHoodieTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) // Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand // Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand case AlterTableAddColumnsCommand(tableId, colsToAdd) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index 1c295fb5b..d14783556 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor @@ -33,11 +35,22 @@ import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} case class AlterHoodieTableDropPartitionCommand( tableIdentifier: TableIdentifier, - specs: Seq[TablePartitionSpec]) + specs: Seq[TablePartitionSpec], + ifExists : Boolean, + purge : Boolean, + retainData : Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { + val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" + logInfo(s"start execute alter table drop partition command for $fullTableName") + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) + + if (!hoodieCatalogTable.isPartitionedTable) { + throw new AnalysisException(s"$fullTableName is a non-partitioned table that is not allowed to drop partition") + } + DDLUtils.verifyAlterTableType( sparkSession.sessionState.catalog, hoodieCatalogTable.table, isView = false) @@ -49,41 +62,35 @@ extends RunnableCommand { sparkSession.sessionState.conf.resolver) } - val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, normalizedSpecs) + val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) + val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) HoodieSparkSqlWriter.write( sparkSession.sqlContext, SaveMode.Append, parameters, sparkSession.emptyDataFrame) + + // Recursively delete partition directories + if (purge) { + val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext) + val basePath = hoodieCatalogTable.tableLocation + val fullPartitionPath = FSUtils.getPartitionPath(basePath, partitionsToDrop) + logInfo("Clean partition up " + fullPartitionPath) + val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration) + FSUtils.deleteDir(engineContext, fs, fullPartitionPath, sparkSession.sparkContext.defaultParallelism) + } + sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) + logInfo(s"Finish execute alter table drop partition command for $fullTableName") Seq.empty[Row] } private def buildHoodieConfig( sparkSession: SparkSession, hoodieCatalogTable: HoodieCatalogTable, - normalizedSpecs: Seq[Map[String, String]]): Map[String, String] = { - val table = hoodieCatalogTable.table - val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths - val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) - val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) + partitionsToDrop: String): Map[String, String] = { val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") - val partitionsToDrop = normalizedSpecs.map { spec => - hoodieCatalogTable.partitionFields.map{ partitionColumn => - val encodedPartitionValue = if (enableEncodeUrl) { - PartitionPathEncodeUtils.escapePathName(spec(partitionColumn)) - } else { - spec(partitionColumn) - } - if (enableHiveStylePartitioning) { - partitionColumn + "=" + encodedPartitionValue - } else { - encodedPartitionValue - } - }.mkString("/") - }.mkString(",") - val enableHive = isEnableHive(sparkSession) withSparkConf(sparkSession, Map.empty) { Map( @@ -137,4 +144,27 @@ extends RunnableCommand { normalizedPartSpec.toMap } + def getPartitionPathToDrop( + hoodieCatalogTable: HoodieCatalogTable, + normalizedSpecs: Seq[Map[String, String]]): String = { + val table = hoodieCatalogTable.table + val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths + val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) + val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) + val partitionsToDrop = normalizedSpecs.map { spec => + hoodieCatalogTable.partitionFields.map { partitionColumn => + val encodedPartitionValue = if (enableEncodeUrl) { + PartitionPathEncodeUtils.escapePathName(spec(partitionColumn)) + } else { + spec(partitionColumn) + } + if (enableHiveStylePartitioning) { + partitionColumn + "=" + encodedPartitionValue + } else { + encodedPartitionValue + } + }.mkString("/") + }.mkString(",") + partitionsToDrop + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index ed61153f5..4b4426e16 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.fs.Path import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -77,10 +78,9 @@ case class DropHoodieTableCommand( if (purge) { logInfo("Clean up " + basePath) val targetPath = new Path(basePath) + val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext) val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration) - if (fs.exists(targetPath)) { - fs.delete(targetPath, true) - } + FSUtils.deleteDir(engineContext, fs, targetPath, sparkSession.sparkContext.defaultParallelism) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index f8326386d..22d20cdd0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -18,12 +18,11 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} import org.apache.spark.sql.SaveMode -import scala.util.control.NonFatal - class TestAlterTableDropPartition extends TestHoodieSqlBase { test("Drop non-partitioned table") { @@ -47,7 +46,31 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")( - s"dt is not a valid partition column in table") + s"$tableName is a non-partitioned table that is not allowed to drop partition") + } + + test("Purge drop non-partitioned table") { + val tableName = generateTableName + // create table + spark.sql( + s""" + | create table $tableName ( + | id bigint, + | name string, + | ts string, + | dt string + | ) + | using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + |""".stripMargin) + // insert data + spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") + + checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01') purge")( + s"$tableName is a non-partitioned table that is not allowed to drop partition") } Seq(false, true).foreach { urlencode => @@ -88,7 +111,62 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')") - checkAnswer(s"select dt from $tableName") (Seq(s"2021/10/02")) + val partitionPath = if (urlencode) { + PartitionPathEncodeUtils.escapePathName("2021/10/01") + } else { + "2021/10/01" + } + checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) + assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) + } + } + } + + Seq(false, true).foreach { urlencode => + test(s"Purge drop single-partition table' partitions, urlencode: $urlencode") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + + import spark.implicits._ + val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02")) + .toDF("id", "name", "ts", "dt") + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(URL_ENCODE_PARTITIONING.key(), urlencode) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) + + // register meta to spark catalog by creating table + spark.sql( + s""" + |create table $tableName using hudi + |tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |partitioned by (dt) + |location '$tablePath' + |""".stripMargin) + + // drop 2021-10-01 partition + spark.sql(s"alter table $tableName drop partition (dt='2021/10/01') purge") + + val partitionPath = if (urlencode) { + PartitionPathEncodeUtils.escapePathName("2021/10/01") + } else { + "2021/10/01" + } + checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) + assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) } } } @@ -172,4 +250,51 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { } } } + + Seq(false, true).foreach { hiveStyle => + test(s"Purge drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + + import spark.implicits._ + val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02")) + .toDF("id", "name", "ts", "year", "month", "day") + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "year,month,day") + .option(HIVE_STYLE_PARTITIONING.key, hiveStyle) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) + + // register meta to spark catalog by creating table + spark.sql( + s""" + |create table $tableName using hudi + |tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |partitioned by (year, month, day) + |location '$tablePath' + |""".stripMargin) + + // drop 2021-10-01 partition + spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01') purge") + + checkAnswer(s"select id, name, ts, year, month, day from $tableName")( + Seq(2, "l4", "v1", "2021", "10", "02") + ) + assertResult(false)(existsPath( + s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01")) + } + } + } }