From 504747ecf4c93d53f0fc565cdcf12544549b7903 Mon Sep 17 00:00:00 2001 From: ForwardXu Date: Wed, 29 Dec 2021 20:23:23 +0800 Subject: [PATCH] [HUDI-3108] Fix Purge Drop MOR Table Cause error (#4455) --- .../hudi/command/DropHoodieTableCommand.scala | 56 +++++++++++++------ 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index 5e4a264a0..aa9d9b812 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.fs.Path - import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils - +import org.apache.hudi.common.model.HoodieTableType import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hudi.HoodieSqlUtils.isEnableHive @@ -38,6 +37,9 @@ case class DropHoodieTableCommand( purge: Boolean) extends HoodieLeafRunnableCommand { + val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt" + val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro" + override def run(sparkSession: SparkSession): Seq[Row] = { val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" logInfo(s"start execute drop table command for $fullTableName") @@ -69,9 +71,11 @@ extends HoodieLeafRunnableCommand { // Drop table in the catalog val enableHive = isEnableHive(sparkSession) if (enableHive) { - dropHiveDataSourceTable(sparkSession, table, ifExists, purge) + dropHiveDataSourceTable(sparkSession, hoodieCatalogTable) } else { - catalog.dropTable(tableIdentifier, ifExists, purge) + if (catalog.tableExists(tableIdentifier)) { + catalog.dropTable(tableIdentifier, ifExists, purge) + } } // Recursively delete table directories @@ -85,25 +89,41 @@ extends HoodieLeafRunnableCommand { } private def dropHiveDataSourceTable( - sparkSession: SparkSession, - table: CatalogTable, - ifExists: Boolean, - purge: Boolean): Unit = { + sparkSession: SparkSession, + hoodieCatalogTable: HoodieCatalogTable): Unit = { + val table = hoodieCatalogTable.table val dbName = table.identifier.database.get - val tableName = table.identifier.table + val tableName = hoodieCatalogTable.tableName + // check database exists val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName) if (!dbExists) { throw new NoSuchDatabaseException(dbName) } - // check table exists - if (!sparkSession.sessionState.catalog.tableExists(table.identifier)) { - throw new NoSuchTableException(dbName, table.identifier.table) + + if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) { + val snapshotTableName = tableName + MOR_SNAPSHOT_TABLE_SUFFIX + val roTableName = tableName + MOR_READ_OPTIMIZED_TABLE_SUFFIX + + dropHiveTable(sparkSession, dbName, snapshotTableName) + dropHiveTable(sparkSession, dbName, roTableName) } - val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf, - sparkSession.sessionState.newHadoopConf()) - // drop hive table. - client.dropTable(dbName, tableName, ifExists, purge) + dropHiveTable(sparkSession, dbName, tableName, purge) + } + + private def dropHiveTable( + sparkSession: SparkSession, + dbName: String, + tableName: String, + purge: Boolean = false): Unit = { + // check table exists + if (sparkSession.sessionState.catalog.tableExists(new TableIdentifier(tableName, Option(dbName)))) { + val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf, + sparkSession.sessionState.newHadoopConf()) + + // drop hive table. + client.dropTable(dbName, tableName, ifExists, purge) + } } }