diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 1e1e9c663..fcdbacea5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -19,30 +19,28 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} - import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType} -import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} import java.net.URI import java.text.SimpleDateFormat import java.util.{Locale, Properties} - import scala.collection.JavaConverters._ import scala.collection.immutable.Map @@ -321,4 +319,57 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child } } + + def normalizePartitionSpec[T]( + partitionSpec: Map[String, T], + partColNames: Seq[String], + tblName: String, + resolver: Resolver): Map[String, T] = { + val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) => + val normalizedKey = partColNames.find(resolver(_, key)).getOrElse { + throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") + } + normalizedKey -> value + } + + if (normalizedPartSpec.size < partColNames.size) { + throw new AnalysisException( + "All partition columns need to be specified for Hoodie's partition") + } + + val lowerPartColNames = partColNames.map(_.toLowerCase) + if (lowerPartColNames.distinct.length != lowerPartColNames.length) { + val duplicateColumns = lowerPartColNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } + throw new AnalysisException( + s"Found duplicate column(s) in the partition schema: ${duplicateColumns.mkString(", ")}") + } + + normalizedPartSpec.toMap + } + + def getPartitionPathToDrop( + hoodieCatalogTable: HoodieCatalogTable, + normalizedSpecs: Seq[Map[String, String]]): String = { + val table = hoodieCatalogTable.table + val allPartitionPaths = hoodieCatalogTable.getPartitionPaths + val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) + val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) + val partitionsToDrop = normalizedSpecs.map { spec => + hoodieCatalogTable.partitionFields.map { partitionColumn => + val encodedPartitionValue = if (enableEncodeUrl) { + PartitionPathEncodeUtils.escapePathName(spec(partitionColumn)) + } else { + spec(partitionColumn) + } + if (enableHiveStylePartitioning) { + partitionColumn + "=" + encodedPartitionValue + } else { + encodedPartitionValue + } + }.mkString("/") + }.mkString(",") + partitionsToDrop + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index 2e639d78e..bcc397694 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -20,14 +20,12 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor} import org.apache.hudi.hive.ddl.HiveSyncMode +import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor} import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.DDLUtils @@ -115,57 +113,4 @@ case class AlterHoodieTableDropPartitionCommand( ) } } - - def normalizePartitionSpec[T]( - partitionSpec: Map[String, T], - partColNames: Seq[String], - tblName: String, - resolver: Resolver): Map[String, T] = { - val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) => - val normalizedKey = partColNames.find(resolver(_, key)).getOrElse { - throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") - } - normalizedKey -> value - } - - if (normalizedPartSpec.size < partColNames.size) { - throw new AnalysisException( - "All partition columns need to be specified for Hoodie's dropping partition") - } - - val lowerPartColNames = partColNames.map(_.toLowerCase) - if (lowerPartColNames.distinct.length != lowerPartColNames.length) { - val duplicateColumns = lowerPartColNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => s"`$x`" - } - throw new AnalysisException( - s"Found duplicate column(s) in the partition schema: ${duplicateColumns.mkString(", ")}") - } - - normalizedPartSpec.toMap - } - - def getPartitionPathToDrop( - hoodieCatalogTable: HoodieCatalogTable, - normalizedSpecs: Seq[Map[String, String]]): String = { - val table = hoodieCatalogTable.table - val allPartitionPaths = hoodieCatalogTable.getPartitionPaths - val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) - val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) - val partitionsToDrop = normalizedSpecs.map { spec => - hoodieCatalogTable.partitionFields.map { partitionColumn => - val encodedPartitionValue = if (enableEncodeUrl) { - PartitionPathEncodeUtils.escapePathName(spec(partitionColumn)) - } else { - spec(partitionColumn) - } - if (enableHiveStylePartitioning) { - partitionColumn + "=" + encodedPartitionValue - } else { - encodedPartitionValue - } - }.mkString("/") - }.mkString(",") - partitionsToDrop - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index 4d2debbe9..04936978e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -17,42 +17,107 @@ package org.apache.spark.sql.hudi.command +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.HoodieTableMetaClient - -import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable -import org.apache.spark.sql.execution.command.TruncateTableCommand +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop, normalizePartitionSpec} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import scala.util.control.NonFatal /** * Command for truncate hudi table. */ -class TruncateHoodieTableCommand( +case class TruncateHoodieTableCommand( tableIdentifier: TableIdentifier, partitionSpec: Option[TablePartitionSpec]) - extends TruncateTableCommand(tableIdentifier, partitionSpec) { + extends HoodieLeafRunnableCommand { - override def run(sparkSession: SparkSession): Seq[Row] = { - val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) + override def run(spark: SparkSession): Seq[Row] = { + val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" + logInfo(s"start execute truncate table command for $fullTableName") + + val hoodieCatalogTable = HoodieCatalogTable(spark, tableIdentifier) val properties = hoodieCatalogTable.tableConfig.getProps try { // Delete all data in the table directory - super.run(sparkSession) + val catalog = spark.sessionState.catalog + val table = catalog.getTableMetadata(tableIdentifier) + val tableIdentWithDB = table.identifier.quotedString + + if (table.tableType == CatalogTableType.VIEW) { + throw new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB") + } + + if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { + throw new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + + s"for tables that are not partitioned: $tableIdentWithDB") + } + + val basePath = hoodieCatalogTable.tableLocation + val partCols = table.partitionColumnNames + val locations = if (partitionSpec.isEmpty || partCols.isEmpty) { + Seq(basePath) + } else { + val normalizedSpec: Seq[Map[String, String]] = Seq(partitionSpec.map { spec => + normalizePartitionSpec( + spec, + partCols, + table.identifier.quotedString, + spark.sessionState.conf.resolver) + }.get) + + val fullPartitionPath = FSUtils.getPartitionPath(basePath, getPartitionPathToDrop(hoodieCatalogTable, normalizedSpec)) + + Seq(fullPartitionPath) + } + + val hadoopConf = spark.sessionState.newHadoopConf() + locations.foreach { location => + val path = new Path(location.toString) + try { + val fs = path.getFileSystem(hadoopConf) + fs.delete(path, true) + fs.mkdirs(path) + } catch { + case NonFatal(e) => + throw new AnalysisException( + s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " + + s"because of ${e.toString}") + } + } + + // Also try to drop the contents of the table from the columnar cache + try { + spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), cascade = true) + } catch { + case NonFatal(_) => + } + + if (table.stats.nonEmpty) { + // empty table after truncation + val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0)) + catalog.alterTableStats(tableIdentifier, Some(newStats)) + } + Seq.empty[Row] } catch { // TruncateTableCommand will delete the related directories first, and then refresh the table. // It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step. // So here ignore this failure, and refresh table later. - case NonFatal(_) => + case NonFatal(e) => + throw new AnalysisException(s"Exception when attempting to truncate table ${tableIdentifier.quotedString}: " + e) } // If we have not specified the partition, truncate will delete all the data in the table path // include the hoodie.properties. In this case we should reInit the table. if (partitionSpec.isEmpty) { - val hadoopConf = sparkSession.sessionState.newHadoopConf() + val hadoopConf = spark.sessionState.newHadoopConf() // ReInit hoodie.properties HoodieTableMetaClient.withPropertyBuilder() .fromProperties(properties) @@ -61,7 +126,7 @@ class TruncateHoodieTableCommand( // After deleting the data, refresh the table to make sure we don't keep around a stale // file relation in the metastore cache and cached table data in the cache manager. - sparkSession.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString) + spark.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString) Seq.empty[Row] } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 3e7adec7d..d545e3dad 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -224,7 +224,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { // not specified all partition column checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")( - "All partition columns need to be specified for Hoodie's dropping partition" + "All partition columns need to be specified for Hoodie's partition" ) // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala index 6a0f0a406..a61d0f822 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTruncateTable.scala @@ -18,9 +18,14 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} +import org.apache.spark.sql.SaveMode + class TestTruncateTable extends TestHoodieSqlBase { - test("Test Truncate Table") { + test("Test Truncate non-partitioned Table") { Seq("cow", "mor").foreach { tableType => val tableName = generateTableName // Create table @@ -51,4 +56,95 @@ class TestTruncateTable extends TestHoodieSqlBase { ) } } + + Seq(false, true).foreach { urlencode => + test(s"Test Truncate single-partition table' partitions, urlencode: $urlencode") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + + import spark.implicits._ + val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02")) + .toDF("id", "name", "ts", "dt") + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(URL_ENCODE_PARTITIONING.key(), urlencode) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) + + // register meta to spark catalog by creating table + spark.sql( + s""" + |create table $tableName using hudi + |location '$tablePath' + |""".stripMargin) + + // truncate 2021-10-01 partition + spark.sql(s"truncate table $tableName partition (dt='2021/10/01')") + + checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) + + // Truncate table + spark.sql(s"truncate table $tableName") + checkAnswer(s"select count(1) from $tableName")(Seq(0)) + } + } + } + + Seq(false, true).foreach { hiveStyle => + test(s"Test Truncate multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + + import spark.implicits._ + val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02")) + .toDF("id", "name", "ts", "year", "month", "day") + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "year,month,day") + .option(HIVE_STYLE_PARTITIONING.key, hiveStyle) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) + + // register meta to spark catalog by creating table + spark.sql( + s""" + |create table $tableName using hudi + |location '$tablePath' + |""".stripMargin) + + // not specified all partition column + checkExceptionContain(s"truncate table $tableName partition (year='2021', month='10')")( + "All partition columns need to be specified for Hoodie's partition" + ) + + // truncate 2021-10-01 partition + spark.sql(s"truncate table $tableName partition (year='2021', month='10', day='01')") + + checkAnswer(s"select id, name, ts, year, month, day from $tableName")( + Seq(2, "l4", "v1", "2021", "10", "02") + ) + + // Truncate table + spark.sql(s"truncate table $tableName") + checkAnswer(s"select count(1) from $tableName")(Seq(0)) + } + } + } }