1
0

[HUDI-3099] Purge drop partition for spark sql (#4436)

This commit is contained in:
ForwardXu
2021-12-28 09:38:26 +08:00
committed by GitHub
parent c81df99e50
commit 282aa68552
5 changed files with 191 additions and 31 deletions

View File

@@ -148,6 +148,11 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
*/
def getAllPartitionPaths: Seq[String] = HoodieSqlUtils.getAllPartitionPaths(spark, table)
/**
* Check if table is a partitioned table
*/
def isPartitionedTable: Boolean = table.partitionColumnNames.nonEmpty
/**
* init hoodie table for create table (as select)
*/

View File

@@ -410,9 +410,9 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
if isHoodieTable(tableName, sparkSession) =>
DropHoodieTableCommand(tableName, ifExists, isView, purge)
// Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand
case AlterTableDropPartitionCommand(tableName, specs, _, _, _)
case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableDropPartitionCommand(tableName, specs)
AlterHoodieTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
// Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
// Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand
case AlterTableAddColumnsCommand(tableId, colsToAdd)

View File

@@ -18,6 +18,8 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
@@ -33,11 +35,22 @@ import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
case class AlterHoodieTableDropPartitionCommand(
tableIdentifier: TableIdentifier,
specs: Seq[TablePartitionSpec])
specs: Seq[TablePartitionSpec],
ifExists : Boolean,
purge : Boolean,
retainData : Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
logInfo(s"start execute alter table drop partition command for $fullTableName")
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
if (!hoodieCatalogTable.isPartitionedTable) {
throw new AnalysisException(s"$fullTableName is a non-partitioned table that is not allowed to drop partition")
}
DDLUtils.verifyAlterTableType(
sparkSession.sessionState.catalog, hoodieCatalogTable.table, isView = false)
@@ -49,41 +62,35 @@ extends RunnableCommand {
sparkSession.sessionState.conf.resolver)
}
val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, normalizedSpecs)
val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs)
val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, partitionsToDrop)
HoodieSparkSqlWriter.write(
sparkSession.sqlContext,
SaveMode.Append,
parameters,
sparkSession.emptyDataFrame)
// Recursively delete partition directories
if (purge) {
val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext)
val basePath = hoodieCatalogTable.tableLocation
val fullPartitionPath = FSUtils.getPartitionPath(basePath, partitionsToDrop)
logInfo("Clean partition up " + fullPartitionPath)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
FSUtils.deleteDir(engineContext, fs, fullPartitionPath, sparkSession.sparkContext.defaultParallelism)
}
sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
logInfo(s"Finish execute alter table drop partition command for $fullTableName")
Seq.empty[Row]
}
private def buildHoodieConfig(
sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
normalizedSpecs: Seq[Map[String, String]]): Map[String, String] = {
val table = hoodieCatalogTable.table
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
partitionsToDrop: String): Map[String, String] = {
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val partitionsToDrop = normalizedSpecs.map { spec =>
hoodieCatalogTable.partitionFields.map{ partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
} else {
spec(partitionColumn)
}
if (enableHiveStylePartitioning) {
partitionColumn + "=" + encodedPartitionValue
} else {
encodedPartitionValue
}
}.mkString("/")
}.mkString(",")
val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, Map.empty) {
Map(
@@ -137,4 +144,27 @@ extends RunnableCommand {
normalizedPartSpec.toMap
}
def getPartitionPathToDrop(
hoodieCatalogTable: HoodieCatalogTable,
normalizedSpecs: Seq[Map[String, String]]): String = {
val table = hoodieCatalogTable.table
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
val partitionsToDrop = normalizedSpecs.map { spec =>
hoodieCatalogTable.partitionFields.map { partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
} else {
spec(partitionColumn)
}
if (enableHiveStylePartitioning) {
partitionColumn + "=" + encodedPartitionValue
} else {
encodedPartitionValue
}
}.mkString("/")
}.mkString(",")
partitionsToDrop
}
}

View File

@@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -77,10 +78,9 @@ case class DropHoodieTableCommand(
if (purge) {
logInfo("Clean up " + basePath)
val targetPath = new Path(basePath)
val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
if (fs.exists(targetPath)) {
fs.delete(targetPath, true)
}
FSUtils.deleteDir(engineContext, fs, targetPath, sparkSession.sparkContext.defaultParallelism)
}
}