diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index 954f08ce6..68582fc27 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -21,12 +21,10 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.hive.util.ConfigUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException -import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} -import org.apache.spark.sql.hive.HiveClientUtils -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive +import org.apache.spark.sql.catalyst.catalog._ import scala.util.control.NonFatal @@ -69,13 +67,13 @@ extends HoodieLeafRunnableCommand { val catalog = sparkSession.sessionState.catalog // Drop table in the catalog - val enableHive = isEnableHive(sparkSession) - if (enableHive) { - dropHiveDataSourceTable(sparkSession, hoodieCatalogTable) + if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) { + val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable) + rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false)) + roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false)) + catalog.dropTable(table.identifier.copy(table = hoodieCatalogTable.tableName), ifExists, purge) } else { - if (catalog.tableExists(tableIdentifier)) { - catalog.dropTable(tableIdentifier, ifExists, purge) - } + catalog.dropTable(table.identifier, ifExists, purge) } // Recursively delete table directories @@ -88,42 +86,33 @@ extends HoodieLeafRunnableCommand { } } - private def dropHiveDataSourceTable( - sparkSession: SparkSession, - hoodieCatalogTable: HoodieCatalogTable): Unit = { - val table = hoodieCatalogTable.table - val dbName = table.identifier.database.get - val tableName = hoodieCatalogTable.tableName + private def getTableRTAndRO(catalog: SessionCatalog, + hoodieTable: HoodieCatalogTable): (Option[CatalogTable], Option[CatalogTable]) = { + val rtIdt = hoodieTable.table.identifier.copy( + table = s"${hoodieTable.tableName}${MOR_SNAPSHOT_TABLE_SUFFIX}") + val roIdt = hoodieTable.table.identifier.copy( + table = s"${hoodieTable.tableName}${MOR_READ_OPTIMIZED_TABLE_SUFFIX}") - // check database exists - val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName) - if (!dbExists) { - throw new NoSuchDatabaseException(dbName) + var rtTableOpt: Option[CatalogTable] = None + var roTableOpt: Option[CatalogTable] = None + if (catalog.tableExists(rtIdt)) { + val rtTable = catalog.getTableMetadata(rtIdt) + if (rtTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) { + rtTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match { + case Some(v) if v.equalsIgnoreCase("false") => rtTableOpt = Some(rtTable) + case _ => // do-nothing + } + } } - - if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) { - val snapshotTableName = tableName + MOR_SNAPSHOT_TABLE_SUFFIX - val roTableName = tableName + MOR_READ_OPTIMIZED_TABLE_SUFFIX - - dropHiveTable(sparkSession, dbName, snapshotTableName) - dropHiveTable(sparkSession, dbName, roTableName) - } - - dropHiveTable(sparkSession, dbName, tableName, purge) - } - - private def dropHiveTable( - sparkSession: SparkSession, - dbName: String, - tableName: String, - purge: Boolean = false): Unit = { - // check table exists - if (sparkSession.sessionState.catalog.tableExists(new TableIdentifier(tableName, Option(dbName)))) { - val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf, - sparkSession.sessionState.newHadoopConf()) - - // drop hive table. - client.dropTable(dbName, tableName, ifExists, purge) + if (catalog.tableExists(roIdt)) { + val roTable = catalog.getTableMetadata(roIdt) + if (roTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) { + roTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match { + case Some(v) if v.equalsIgnoreCase("true") => roTableOpt = Some(roTable) + case _ => // do-nothing + } + } } + (rtTableOpt, roTableOpt) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala index ed43d37d0..174835cba 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.hudi +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.SessionCatalog + class TestDropTable extends HoodieSparkSqlTestBase { test("Test Drop Table") { @@ -72,4 +75,167 @@ class TestDropTable extends HoodieSparkSqlTestBase { } } } + + test("Test Drop RO & RT table by purging base table.") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql( + s""" + |create table ${tableName}_ro using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"), + Map("hoodie.query.as.ro.table" -> "true")) + + spark.sql( + s""" + |create table ${tableName}_rt using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"), + Map("hoodie.query.as.ro.table" -> "false")) + + spark.sql(s"drop table ${tableName} purge") + checkAnswer("show tables")() + } + } + + test("Test Drop RO & RT table by one by one.") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql( + s""" + |create table ${tableName}_ro using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"), + Map("hoodie.query.as.ro.table" -> "true")) + + spark.sql( + s""" + |create table ${tableName}_rt using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"), + Map("hoodie.query.as.ro.table" -> "false")) + + spark.sql(s"drop table ${tableName}_ro") + checkAnswer("show tables")( + Seq("default", tableName, false), Seq("default", s"${tableName}_rt", false)) + + spark.sql(s"drop table ${tableName}_rt") + checkAnswer("show tables")(Seq("default", tableName, false)) + + spark.sql(s"drop table ${tableName}") + checkAnswer("show tables")() + } + } + + test("Test Drop RO table with purge") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql( + s""" + |create table ${tableName}_ro using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"), + Map("hoodie.query.as.ro.table" -> "true")) + + spark.sql( + s""" + |create table ${tableName}_rt using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"), + Map("hoodie.query.as.ro.table" -> "false")) + + spark.sql(s"drop table ${tableName}_ro purge") + checkAnswer("show tables")() + } + } + + private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier, + newProperties: Map[String, String]): Unit = { + val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt) + val storage = catalogTable.storage + val storageProperties = storage.properties ++ newProperties + val newCatalogTable = catalogTable.copy(storage = storage.copy(properties = storageProperties)) + sessionCatalog.alterTable(newCatalogTable) + } }