[HUDI-3722] Fix truncate hudi table's error (#5140)
This commit is contained in:
@@ -19,30 +19,28 @@ package org.apache.spark.sql.hudi
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.model.HoodieRecord
|
||||
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
|
||||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||
import org.apache.hudi.common.util.PartitionPathEncodeUtils
|
||||
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation}
|
||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
|
||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
|
||||
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
|
||||
import org.apache.spark.sql.execution.datasources.LogicalRelation
|
||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
|
||||
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
|
||||
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
|
||||
|
||||
import java.net.URI
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.{Locale, Properties}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.immutable.Map
|
||||
|
||||
@@ -321,4 +319,57 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
|
||||
Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child
|
||||
}
|
||||
}
|
||||
|
||||
def normalizePartitionSpec[T](
|
||||
partitionSpec: Map[String, T],
|
||||
partColNames: Seq[String],
|
||||
tblName: String,
|
||||
resolver: Resolver): Map[String, T] = {
|
||||
val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) =>
|
||||
val normalizedKey = partColNames.find(resolver(_, key)).getOrElse {
|
||||
throw new AnalysisException(s"$key is not a valid partition column in table $tblName.")
|
||||
}
|
||||
normalizedKey -> value
|
||||
}
|
||||
|
||||
if (normalizedPartSpec.size < partColNames.size) {
|
||||
throw new AnalysisException(
|
||||
"All partition columns need to be specified for Hoodie's partition")
|
||||
}
|
||||
|
||||
val lowerPartColNames = partColNames.map(_.toLowerCase)
|
||||
if (lowerPartColNames.distinct.length != lowerPartColNames.length) {
|
||||
val duplicateColumns = lowerPartColNames.groupBy(identity).collect {
|
||||
case (x, ys) if ys.length > 1 => s"`$x`"
|
||||
}
|
||||
throw new AnalysisException(
|
||||
s"Found duplicate column(s) in the partition schema: ${duplicateColumns.mkString(", ")}")
|
||||
}
|
||||
|
||||
normalizedPartSpec.toMap
|
||||
}
|
||||
|
||||
def getPartitionPathToDrop(
|
||||
hoodieCatalogTable: HoodieCatalogTable,
|
||||
normalizedSpecs: Seq[Map[String, String]]): String = {
|
||||
val table = hoodieCatalogTable.table
|
||||
val allPartitionPaths = hoodieCatalogTable.getPartitionPaths
|
||||
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
|
||||
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
|
||||
val partitionsToDrop = normalizedSpecs.map { spec =>
|
||||
hoodieCatalogTable.partitionFields.map { partitionColumn =>
|
||||
val encodedPartitionValue = if (enableEncodeUrl) {
|
||||
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
|
||||
} else {
|
||||
spec(partitionColumn)
|
||||
}
|
||||
if (enableHiveStylePartitioning) {
|
||||
partitionColumn + "=" + encodedPartitionValue
|
||||
} else {
|
||||
encodedPartitionValue
|
||||
}
|
||||
}.mkString("/")
|
||||
}.mkString(",")
|
||||
partitionsToDrop
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,14 +20,12 @@ package org.apache.spark.sql.hudi.command
|
||||
import org.apache.hudi.DataSourceWriteOptions._
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.util.PartitionPathEncodeUtils
|
||||
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
|
||||
import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
|
||||
import org.apache.hudi.hive.ddl.HiveSyncMode
|
||||
import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
|
||||
import org.apache.hudi.sync.common.HoodieSyncConfig
|
||||
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.analysis.Resolver
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
|
||||
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
|
||||
import org.apache.spark.sql.execution.command.DDLUtils
|
||||
@@ -115,57 +113,4 @@ case class AlterHoodieTableDropPartitionCommand(
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def normalizePartitionSpec[T](
|
||||
partitionSpec: Map[String, T],
|
||||
partColNames: Seq[String],
|
||||
tblName: String,
|
||||
resolver: Resolver): Map[String, T] = {
|
||||
val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) =>
|
||||
val normalizedKey = partColNames.find(resolver(_, key)).getOrElse {
|
||||
throw new AnalysisException(s"$key is not a valid partition column in table $tblName.")
|
||||
}
|
||||
normalizedKey -> value
|
||||
}
|
||||
|
||||
if (normalizedPartSpec.size < partColNames.size) {
|
||||
throw new AnalysisException(
|
||||
"All partition columns need to be specified for Hoodie's dropping partition")
|
||||
}
|
||||
|
||||
val lowerPartColNames = partColNames.map(_.toLowerCase)
|
||||
if (lowerPartColNames.distinct.length != lowerPartColNames.length) {
|
||||
val duplicateColumns = lowerPartColNames.groupBy(identity).collect {
|
||||
case (x, ys) if ys.length > 1 => s"`$x`"
|
||||
}
|
||||
throw new AnalysisException(
|
||||
s"Found duplicate column(s) in the partition schema: ${duplicateColumns.mkString(", ")}")
|
||||
}
|
||||
|
||||
normalizedPartSpec.toMap
|
||||
}
|
||||
|
||||
def getPartitionPathToDrop(
|
||||
hoodieCatalogTable: HoodieCatalogTable,
|
||||
normalizedSpecs: Seq[Map[String, String]]): String = {
|
||||
val table = hoodieCatalogTable.table
|
||||
val allPartitionPaths = hoodieCatalogTable.getPartitionPaths
|
||||
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
|
||||
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
|
||||
val partitionsToDrop = normalizedSpecs.map { spec =>
|
||||
hoodieCatalogTable.partitionFields.map { partitionColumn =>
|
||||
val encodedPartitionValue = if (enableEncodeUrl) {
|
||||
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
|
||||
} else {
|
||||
spec(partitionColumn)
|
||||
}
|
||||
if (enableHiveStylePartitioning) {
|
||||
partitionColumn + "=" + encodedPartitionValue
|
||||
} else {
|
||||
encodedPartitionValue
|
||||
}
|
||||
}.mkString("/")
|
||||
}.mkString(",")
|
||||
partitionsToDrop
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,42 +17,107 @@
|
||||
|
||||
package org.apache.spark.sql.hudi.command
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.common.fs.FSUtils
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
|
||||
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
|
||||
import org.apache.spark.sql.execution.command.TruncateTableCommand
|
||||
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType, HoodieCatalogTable}
|
||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop, normalizePartitionSpec}
|
||||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* Command for truncate hudi table.
|
||||
*/
|
||||
class TruncateHoodieTableCommand(
|
||||
case class TruncateHoodieTableCommand(
|
||||
tableIdentifier: TableIdentifier,
|
||||
partitionSpec: Option[TablePartitionSpec])
|
||||
extends TruncateTableCommand(tableIdentifier, partitionSpec) {
|
||||
extends HoodieLeafRunnableCommand {
|
||||
|
||||
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
|
||||
override def run(spark: SparkSession): Seq[Row] = {
|
||||
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
|
||||
logInfo(s"start execute truncate table command for $fullTableName")
|
||||
|
||||
val hoodieCatalogTable = HoodieCatalogTable(spark, tableIdentifier)
|
||||
val properties = hoodieCatalogTable.tableConfig.getProps
|
||||
|
||||
try {
|
||||
// Delete all data in the table directory
|
||||
super.run(sparkSession)
|
||||
val catalog = spark.sessionState.catalog
|
||||
val table = catalog.getTableMetadata(tableIdentifier)
|
||||
val tableIdentWithDB = table.identifier.quotedString
|
||||
|
||||
if (table.tableType == CatalogTableType.VIEW) {
|
||||
throw new AnalysisException(
|
||||
s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
|
||||
}
|
||||
|
||||
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
|
||||
throw new AnalysisException(
|
||||
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
|
||||
s"for tables that are not partitioned: $tableIdentWithDB")
|
||||
}
|
||||
|
||||
val basePath = hoodieCatalogTable.tableLocation
|
||||
val partCols = table.partitionColumnNames
|
||||
val locations = if (partitionSpec.isEmpty || partCols.isEmpty) {
|
||||
Seq(basePath)
|
||||
} else {
|
||||
val normalizedSpec: Seq[Map[String, String]] = Seq(partitionSpec.map { spec =>
|
||||
normalizePartitionSpec(
|
||||
spec,
|
||||
partCols,
|
||||
table.identifier.quotedString,
|
||||
spark.sessionState.conf.resolver)
|
||||
}.get)
|
||||
|
||||
val fullPartitionPath = FSUtils.getPartitionPath(basePath, getPartitionPathToDrop(hoodieCatalogTable, normalizedSpec))
|
||||
|
||||
Seq(fullPartitionPath)
|
||||
}
|
||||
|
||||
val hadoopConf = spark.sessionState.newHadoopConf()
|
||||
locations.foreach { location =>
|
||||
val path = new Path(location.toString)
|
||||
try {
|
||||
val fs = path.getFileSystem(hadoopConf)
|
||||
fs.delete(path, true)
|
||||
fs.mkdirs(path)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
throw new AnalysisException(
|
||||
s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " +
|
||||
s"because of ${e.toString}")
|
||||
}
|
||||
}
|
||||
|
||||
// Also try to drop the contents of the table from the columnar cache
|
||||
try {
|
||||
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), cascade = true)
|
||||
} catch {
|
||||
case NonFatal(_) =>
|
||||
}
|
||||
|
||||
if (table.stats.nonEmpty) {
|
||||
// empty table after truncation
|
||||
val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0))
|
||||
catalog.alterTableStats(tableIdentifier, Some(newStats))
|
||||
}
|
||||
Seq.empty[Row]
|
||||
} catch {
|
||||
// TruncateTableCommand will delete the related directories first, and then refresh the table.
|
||||
// It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step.
|
||||
// So here ignore this failure, and refresh table later.
|
||||
case NonFatal(_) =>
|
||||
case NonFatal(e) =>
|
||||
throw new AnalysisException(s"Exception when attempting to truncate table ${tableIdentifier.quotedString}: " + e)
|
||||
}
|
||||
|
||||
// If we have not specified the partition, truncate will delete all the data in the table path
|
||||
// include the hoodie.properties. In this case we should reInit the table.
|
||||
if (partitionSpec.isEmpty) {
|
||||
val hadoopConf = sparkSession.sessionState.newHadoopConf()
|
||||
val hadoopConf = spark.sessionState.newHadoopConf()
|
||||
// ReInit hoodie.properties
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
.fromProperties(properties)
|
||||
@@ -61,7 +126,7 @@ class TruncateHoodieTableCommand(
|
||||
|
||||
// After deleting the data, refresh the table to make sure we don't keep around a stale
|
||||
// file relation in the metastore cache and cached table data in the cache manager.
|
||||
sparkSession.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
|
||||
spark.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
|
||||
Seq.empty[Row]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,7 +224,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
|
||||
|
||||
// not specified all partition column
|
||||
checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")(
|
||||
"All partition columns need to be specified for Hoodie's dropping partition"
|
||||
"All partition columns need to be specified for Hoodie's partition"
|
||||
)
|
||||
// drop 2021-10-01 partition
|
||||
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")
|
||||
|
||||
@@ -18,9 +18,14 @@
|
||||
|
||||
package org.apache.spark.sql.hudi
|
||||
|
||||
import org.apache.hudi.DataSourceWriteOptions._
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
|
||||
import org.apache.spark.sql.SaveMode
|
||||
|
||||
class TestTruncateTable extends TestHoodieSqlBase {
|
||||
|
||||
test("Test Truncate Table") {
|
||||
test("Test Truncate non-partitioned Table") {
|
||||
Seq("cow", "mor").foreach { tableType =>
|
||||
val tableName = generateTableName
|
||||
// Create table
|
||||
@@ -51,4 +56,95 @@ class TestTruncateTable extends TestHoodieSqlBase {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Seq(false, true).foreach { urlencode =>
|
||||
test(s"Test Truncate single-partition table' partitions, urlencode: $urlencode") {
|
||||
withTempDir { tmp =>
|
||||
val tableName = generateTableName
|
||||
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
|
||||
|
||||
import spark.implicits._
|
||||
val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02"))
|
||||
.toDF("id", "name", "ts", "dt")
|
||||
|
||||
df.write.format("hudi")
|
||||
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
|
||||
.option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
|
||||
.option(RECORDKEY_FIELD.key, "id")
|
||||
.option(PRECOMBINE_FIELD.key, "ts")
|
||||
.option(PARTITIONPATH_FIELD.key, "dt")
|
||||
.option(URL_ENCODE_PARTITIONING.key(), urlencode)
|
||||
.option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
|
||||
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
|
||||
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(tablePath)
|
||||
|
||||
// register meta to spark catalog by creating table
|
||||
spark.sql(
|
||||
s"""
|
||||
|create table $tableName using hudi
|
||||
|location '$tablePath'
|
||||
|""".stripMargin)
|
||||
|
||||
// truncate 2021-10-01 partition
|
||||
spark.sql(s"truncate table $tableName partition (dt='2021/10/01')")
|
||||
|
||||
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
|
||||
|
||||
// Truncate table
|
||||
spark.sql(s"truncate table $tableName")
|
||||
checkAnswer(s"select count(1) from $tableName")(Seq(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Seq(false, true).foreach { hiveStyle =>
|
||||
test(s"Test Truncate multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") {
|
||||
withTempDir { tmp =>
|
||||
val tableName = generateTableName
|
||||
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
|
||||
|
||||
import spark.implicits._
|
||||
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02"))
|
||||
.toDF("id", "name", "ts", "year", "month", "day")
|
||||
|
||||
df.write.format("hudi")
|
||||
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
|
||||
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
|
||||
.option(RECORDKEY_FIELD.key, "id")
|
||||
.option(PRECOMBINE_FIELD.key, "ts")
|
||||
.option(PARTITIONPATH_FIELD.key, "year,month,day")
|
||||
.option(HIVE_STYLE_PARTITIONING.key, hiveStyle)
|
||||
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
|
||||
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
|
||||
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(tablePath)
|
||||
|
||||
// register meta to spark catalog by creating table
|
||||
spark.sql(
|
||||
s"""
|
||||
|create table $tableName using hudi
|
||||
|location '$tablePath'
|
||||
|""".stripMargin)
|
||||
|
||||
// not specified all partition column
|
||||
checkExceptionContain(s"truncate table $tableName partition (year='2021', month='10')")(
|
||||
"All partition columns need to be specified for Hoodie's partition"
|
||||
)
|
||||
|
||||
// truncate 2021-10-01 partition
|
||||
spark.sql(s"truncate table $tableName partition (year='2021', month='10', day='01')")
|
||||
|
||||
checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
|
||||
Seq(2, "l4", "v1", "2021", "10", "02")
|
||||
)
|
||||
|
||||
// Truncate table
|
||||
spark.sql(s"truncate table $tableName")
|
||||
checkAnswer(s"select count(1) from $tableName")(Seq(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user