1
0

[HUDI-3826] Make truncate partition use delete_partition operation (#5272)

Make truncate partition and drop partition behave as drop partition with purge, which delete all records via Hudi DELETE_PARTITION; partition removed from metastore
This commit is contained in:
ForwardXu
2022-04-14 15:53:05 +08:00
committed by GitHub
parent a081c2b9b5
commit 44b3630b5d
5 changed files with 87 additions and 115 deletions

View File

@@ -29,13 +29,12 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation} import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType} import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import java.net.URI import java.net.URI

View File

@@ -18,8 +18,6 @@
package org.apache.spark.sql.hudi.command package org.apache.spark.sql.hudi.command
import org.apache.hudi.HoodieSparkSqlWriter import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
@@ -30,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
case class AlterHoodieTableDropPartitionCommand( case class AlterHoodieTableDropPartitionCommand(
tableIdentifier: TableIdentifier, tableIdentifier: TableIdentifier,
specs: Seq[TablePartitionSpec], partitionSpecs: Seq[TablePartitionSpec],
ifExists : Boolean, ifExists : Boolean,
purge : Boolean, purge : Boolean,
retainData : Boolean) retainData : Boolean)
@@ -49,7 +47,7 @@ case class AlterHoodieTableDropPartitionCommand(
DDLUtils.verifyAlterTableType( DDLUtils.verifyAlterTableType(
sparkSession.sessionState.catalog, hoodieCatalogTable.table, isView = false) sparkSession.sessionState.catalog, hoodieCatalogTable.table, isView = false)
val normalizedSpecs: Seq[Map[String, String]] = specs.map { spec => val normalizedSpecs: Seq[Map[String, String]] = partitionSpecs.map { spec =>
normalizePartitionSpec( normalizePartitionSpec(
spec, spec,
hoodieCatalogTable.partitionFields, hoodieCatalogTable.partitionFields,
@@ -57,6 +55,8 @@ case class AlterHoodieTableDropPartitionCommand(
sparkSession.sessionState.conf.resolver) sparkSession.sessionState.conf.resolver)
} }
// drop partitions to lazy clean (https://github.com/apache/hudi/pull/4489)
// delete partition files by enabling cleaner and setting retention policies.
val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs)
val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop)
HoodieSparkSqlWriter.write( HoodieSparkSqlWriter.write(
@@ -65,17 +65,6 @@ case class AlterHoodieTableDropPartitionCommand(
parameters, parameters,
sparkSession.emptyDataFrame) sparkSession.emptyDataFrame)
// Recursively delete partition directories
if (purge) {
val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext)
val basePath = hoodieCatalogTable.tableLocation
val fullPartitionPath = FSUtils.getPartitionPath(basePath, partitionsToDrop)
logInfo("Clean partition up " + fullPartitionPath)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
FSUtils.deleteDir(engineContext, fs, fullPartitionPath, sparkSession.sparkContext.defaultParallelism)
}
sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
logInfo(s"Finish execute alter table drop partition command for $fullTableName") logInfo(s"Finish execute alter table drop partition command for $fullTableName")
Seq.empty[Row] Seq.empty[Row]

View File

@@ -18,15 +18,16 @@
package org.apache.spark.sql.hudi.command package org.apache.spark.sql.hudi.command
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop, normalizePartitionSpec} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop, normalizePartitionSpec}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import scala.util.control.NonFatal
/** /**
* Command for truncate hudi table. * Command for truncate hudi table.
@@ -34,99 +35,67 @@ import scala.util.control.NonFatal
case class TruncateHoodieTableCommand( case class TruncateHoodieTableCommand(
tableIdentifier: TableIdentifier, tableIdentifier: TableIdentifier,
partitionSpec: Option[TablePartitionSpec]) partitionSpec: Option[TablePartitionSpec])
extends HoodieLeafRunnableCommand { extends HoodieLeafRunnableCommand with ProvidesHoodieConfig {
override def run(spark: SparkSession): Seq[Row] = { override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
logInfo(s"start execute truncate table command for $fullTableName") logInfo(s"start execute truncate table command for $fullTableName")
val hoodieCatalogTable = HoodieCatalogTable(spark, tableIdentifier) val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val properties = hoodieCatalogTable.tableConfig.getProps
try { val catalog = sparkSession.sessionState.catalog
// Delete all data in the table directory
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableIdentifier) val table = catalog.getTableMetadata(tableIdentifier)
val tableIdentWithDB = table.identifier.quotedString val tableId = table.identifier.quotedString
if (table.tableType == CatalogTableType.VIEW) { if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException( throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB") s"Operation not allowed: TRUNCATE TABLE on views: $tableId")
} }
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException( throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables that are not partitioned: $tableIdentWithDB") s"for tables that are not partitioned: $tableId")
} }
val basePath = hoodieCatalogTable.tableLocation val basePath = hoodieCatalogTable.tableLocation
val partCols = table.partitionColumnNames val properties = hoodieCatalogTable.tableConfig.getProps
val locations = if (partitionSpec.isEmpty || partCols.isEmpty) { val hadoopConf = sparkSession.sessionState.newHadoopConf()
Seq(basePath)
} else {
val normalizedSpec: Seq[Map[String, String]] = Seq(partitionSpec.map { spec =>
normalizePartitionSpec(
spec,
partCols,
table.identifier.quotedString,
spark.sessionState.conf.resolver)
}.get)
val fullPartitionPath = FSUtils.getPartitionPath(basePath, getPartitionPathToDrop(hoodieCatalogTable, normalizedSpec))
Seq(fullPartitionPath)
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
val path = new Path(location.toString)
try {
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
fs.mkdirs(path)
} catch {
case NonFatal(e) =>
throw new AnalysisException(
s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " +
s"because of ${e.toString}")
}
}
// Also try to drop the contents of the table from the columnar cache
try {
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), cascade = true)
} catch {
case NonFatal(_) =>
}
if (table.stats.nonEmpty) {
// empty table after truncation
val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0))
catalog.alterTableStats(tableIdentifier, Some(newStats))
}
Seq.empty[Row]
} catch {
// TruncateTableCommand will delete the related directories first, and then refresh the table.
// It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step.
// So here ignore this failure, and refresh table later.
case NonFatal(e) =>
throw new AnalysisException(s"Exception when attempting to truncate table ${tableIdentifier.quotedString}: " + e)
}
// If we have not specified the partition, truncate will delete all the data in the table path // If we have not specified the partition, truncate will delete all the data in the table path
// include the hoodie.properties. In this case we should reInit the table.
if (partitionSpec.isEmpty) { if (partitionSpec.isEmpty) {
val hadoopConf = spark.sessionState.newHadoopConf() val targetPath = new Path(basePath)
val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
FSUtils.deleteDir(engineContext, fs, targetPath, sparkSession.sparkContext.defaultParallelism)
// ReInit hoodie.properties // ReInit hoodie.properties
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties) .fromProperties(properties)
.initTable(hadoopConf, hoodieCatalogTable.tableLocation) .initTable(hadoopConf, hoodieCatalogTable.tableLocation)
} else {
val normalizedSpecs: Seq[Map[String, String]] = Seq(partitionSpec.map { spec =>
normalizePartitionSpec(
spec,
hoodieCatalogTable.partitionFields,
hoodieCatalogTable.tableName,
sparkSession.sessionState.conf.resolver)
}.get)
// drop partitions to lazy clean
val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs)
val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop)
HoodieSparkSqlWriter.write(
sparkSession.sqlContext,
SaveMode.Append,
parameters,
sparkSession.emptyDataFrame)
} }
// After deleting the data, refresh the table to make sure we don't keep around a stale // After deleting the data, refresh the table to make sure we don't keep around a stale
// file relation in the metastore cache and cached table data in the cache manager. // file relation in the metastore cache and cached table data in the cache manager.
spark.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString) sparkSession.catalog.refreshTable(table.identifier.quotedString)
logInfo(s"Finish execute truncate table command for $fullTableName")
Seq.empty[Row] Seq.empty[Row]
} }
} }

View File

@@ -539,7 +539,7 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand // Rewrite TruncateTableCommand to TruncateHoodieTableCommand
case TruncateTableCommand(tableName, partitionSpec) case TruncateTableCommand(tableName, partitionSpec)
if sparkAdapter.isHoodieTable(tableName, sparkSession) => if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
new TruncateHoodieTableCommand(tableName, partitionSpec) TruncateHoodieTableCommand(tableName, partitionSpec)
case _ => plan case _ => plan
} }
} }

View File

@@ -52,7 +52,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
checkAnswer(s"show partitions $tableName")(Seq.empty: _*) checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
} }
test("Purge drop non-partitioned table") { test("Lazy Clean drop non-partitioned table") {
val tableName = generateTableName val tableName = generateTableName
// create table // create table
spark.sql( spark.sql(
@@ -66,13 +66,14 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
| using hudi | using hudi
| tblproperties ( | tblproperties (
| primaryKey = 'id', | primaryKey = 'id',
| preCombineField = 'ts' | preCombineField = 'ts',
| hoodie.cleaner.commits.retained= '1'
| ) | )
|""".stripMargin) |""".stripMargin)
// insert data // insert data
spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""")
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01') purge")( checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")(
s"$tableName is a non-partitioned table that is not allowed to drop partition") s"$tableName is a non-partitioned table that is not allowed to drop partition")
// show partitions // show partitions
@@ -131,14 +132,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
} }
Seq(false, true).foreach { urlencode => Seq(false, true).foreach { urlencode =>
test(s"Purge drop single-partition table' partitions, urlencode: $urlencode") { test(s"Lazy Clean drop single-partition table' partitions, urlencode: $urlencode") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName" val tablePath = s"${tmp.getCanonicalPath}/$tableName"
import spark.implicits._ import spark.implicits._
val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02")) val df = Seq((1, "z3", "v1", "2021/10/01")).toDF("id", "name", "ts", "dt")
.toDF("id", "name", "ts", "dt")
df.write.format("hudi") df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName) .option(HoodieWriteConfig.TBL_NAME.key, tableName)
@@ -158,17 +158,24 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
s""" s"""
|create table $tableName using hudi |create table $tableName using hudi
|location '$tablePath' |location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.cleaner.commits.retained= '1'
| )
|""".stripMargin) |""".stripMargin)
// drop 2021-10-01 partition // drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01') purge") spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")
spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021/10/02")""")
val partitionPath = if (urlencode) { val partitionPath = if (urlencode) {
PartitionPathEncodeUtils.escapePathName("2021/10/01") PartitionPathEncodeUtils.escapePathName("2021/10/01")
} else { } else {
"2021/10/01" "2021/10/01"
} }
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) checkAnswer(s"select dt from $tableName")(Seq("2021/10/02"))
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))
// show partitions // show partitions
@@ -267,14 +274,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
} }
Seq(false, true).foreach { hiveStyle => Seq(false, true).foreach { hiveStyle =>
test(s"Purge drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") { test(s"Lazy Clean drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") {
withTempDir { tmp => withTempDir { tmp =>
val tableName = generateTableName val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName" val tablePath = s"${tmp.getCanonicalPath}/$tableName"
import spark.implicits._ import spark.implicits._
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10", "02")) val df = Seq((1, "z3", "v1", "2021", "10", "01")).toDF("id", "name", "ts", "year", "month", "day")
.toDF("id", "name", "ts", "year", "month", "day")
df.write.format("hudi") df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName) .option(HoodieWriteConfig.TBL_NAME.key, tableName)
@@ -294,14 +300,23 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
s""" s"""
|create table $tableName using hudi |create table $tableName using hudi
|location '$tablePath' |location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.cleaner.commits.retained= '1'
| )
|""".stripMargin) |""".stripMargin)
// drop 2021-10-01 partition // drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01') purge") spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")
// insert data
spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""")
checkAnswer(s"select id, name, ts, year, month, day from $tableName")( checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02") Seq(2, "l4", "v1", "2021", "10", "02")
) )
assertResult(false)(existsPath( assertResult(false)(existsPath(
s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01")) s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01"))