[HUDI-4087] Support dropping RO and RT table in DropHoodieTableCommand (#5564)
* [HUDI-4087] Support dropping RO and RT table in DropHoodieTableCommand * Set hoodie.query.as.ro.table in serde properties
This commit is contained in:
@@ -21,12 +21,10 @@ import org.apache.hadoop.fs.Path
|
|||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
import org.apache.hudi.client.common.HoodieSparkEngineContext
|
||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
import org.apache.hudi.common.model.HoodieTableType
|
import org.apache.hudi.common.model.HoodieTableType
|
||||||
|
import org.apache.hudi.hive.util.ConfigUtils
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
|
import org.apache.spark.sql.catalyst.catalog._
|
||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable}
|
|
||||||
import org.apache.spark.sql.hive.HiveClientUtils
|
|
||||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive
|
|
||||||
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
@@ -69,13 +67,13 @@ extends HoodieLeafRunnableCommand {
|
|||||||
val catalog = sparkSession.sessionState.catalog
|
val catalog = sparkSession.sessionState.catalog
|
||||||
|
|
||||||
// Drop table in the catalog
|
// Drop table in the catalog
|
||||||
val enableHive = isEnableHive(sparkSession)
|
if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
|
||||||
if (enableHive) {
|
val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable)
|
||||||
dropHiveDataSourceTable(sparkSession, hoodieCatalogTable)
|
rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
|
||||||
|
roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
|
||||||
|
catalog.dropTable(table.identifier.copy(table = hoodieCatalogTable.tableName), ifExists, purge)
|
||||||
} else {
|
} else {
|
||||||
if (catalog.tableExists(tableIdentifier)) {
|
catalog.dropTable(table.identifier, ifExists, purge)
|
||||||
catalog.dropTable(tableIdentifier, ifExists, purge)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recursively delete table directories
|
// Recursively delete table directories
|
||||||
@@ -88,42 +86,33 @@ extends HoodieLeafRunnableCommand {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def dropHiveDataSourceTable(
|
private def getTableRTAndRO(catalog: SessionCatalog,
|
||||||
sparkSession: SparkSession,
|
hoodieTable: HoodieCatalogTable): (Option[CatalogTable], Option[CatalogTable]) = {
|
||||||
hoodieCatalogTable: HoodieCatalogTable): Unit = {
|
val rtIdt = hoodieTable.table.identifier.copy(
|
||||||
val table = hoodieCatalogTable.table
|
table = s"${hoodieTable.tableName}${MOR_SNAPSHOT_TABLE_SUFFIX}")
|
||||||
val dbName = table.identifier.database.get
|
val roIdt = hoodieTable.table.identifier.copy(
|
||||||
val tableName = hoodieCatalogTable.tableName
|
table = s"${hoodieTable.tableName}${MOR_READ_OPTIMIZED_TABLE_SUFFIX}")
|
||||||
|
|
||||||
// check database exists
|
var rtTableOpt: Option[CatalogTable] = None
|
||||||
val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName)
|
var roTableOpt: Option[CatalogTable] = None
|
||||||
if (!dbExists) {
|
if (catalog.tableExists(rtIdt)) {
|
||||||
throw new NoSuchDatabaseException(dbName)
|
val rtTable = catalog.getTableMetadata(rtIdt)
|
||||||
|
if (rtTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) {
|
||||||
|
rtTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
|
||||||
|
case Some(v) if v.equalsIgnoreCase("false") => rtTableOpt = Some(rtTable)
|
||||||
|
case _ => // do-nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (catalog.tableExists(roIdt)) {
|
||||||
if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
|
val roTable = catalog.getTableMetadata(roIdt)
|
||||||
val snapshotTableName = tableName + MOR_SNAPSHOT_TABLE_SUFFIX
|
if (roTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) {
|
||||||
val roTableName = tableName + MOR_READ_OPTIMIZED_TABLE_SUFFIX
|
roTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
|
||||||
|
case Some(v) if v.equalsIgnoreCase("true") => roTableOpt = Some(roTable)
|
||||||
dropHiveTable(sparkSession, dbName, snapshotTableName)
|
case _ => // do-nothing
|
||||||
dropHiveTable(sparkSession, dbName, roTableName)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dropHiveTable(sparkSession, dbName, tableName, purge)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def dropHiveTable(
|
|
||||||
sparkSession: SparkSession,
|
|
||||||
dbName: String,
|
|
||||||
tableName: String,
|
|
||||||
purge: Boolean = false): Unit = {
|
|
||||||
// check table exists
|
|
||||||
if (sparkSession.sessionState.catalog.tableExists(new TableIdentifier(tableName, Option(dbName)))) {
|
|
||||||
val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf,
|
|
||||||
sparkSession.sessionState.newHadoopConf())
|
|
||||||
|
|
||||||
// drop hive table.
|
|
||||||
client.dropTable(dbName, tableName, ifExists, purge)
|
|
||||||
}
|
}
|
||||||
|
(rtTableOpt, roTableOpt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,9 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
|
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
|
||||||
|
|
||||||
class TestDropTable extends HoodieSparkSqlTestBase {
|
class TestDropTable extends HoodieSparkSqlTestBase {
|
||||||
|
|
||||||
test("Test Drop Table") {
|
test("Test Drop Table") {
|
||||||
@@ -72,4 +75,167 @@ class TestDropTable extends HoodieSparkSqlTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test Drop RO & RT table by purging base table.") {
|
||||||
|
withTempDir { tmp =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| ts long
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table ${tableName}_ro using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
|
||||||
|
Map("hoodie.query.as.ro.table" -> "true"))
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table ${tableName}_rt using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
|
||||||
|
Map("hoodie.query.as.ro.table" -> "false"))
|
||||||
|
|
||||||
|
spark.sql(s"drop table ${tableName} purge")
|
||||||
|
checkAnswer("show tables")()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("Test Drop RO & RT table by one by one.") {
|
||||||
|
withTempDir { tmp =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| ts long
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table ${tableName}_ro using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
|
||||||
|
Map("hoodie.query.as.ro.table" -> "true"))
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table ${tableName}_rt using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
|
||||||
|
Map("hoodie.query.as.ro.table" -> "false"))
|
||||||
|
|
||||||
|
spark.sql(s"drop table ${tableName}_ro")
|
||||||
|
checkAnswer("show tables")(
|
||||||
|
Seq("default", tableName, false), Seq("default", s"${tableName}_rt", false))
|
||||||
|
|
||||||
|
spark.sql(s"drop table ${tableName}_rt")
|
||||||
|
checkAnswer("show tables")(Seq("default", tableName, false))
|
||||||
|
|
||||||
|
spark.sql(s"drop table ${tableName}")
|
||||||
|
checkAnswer("show tables")()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("Test Drop RO table with purge") {
|
||||||
|
withTempDir { tmp =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName (
|
||||||
|
| id int,
|
||||||
|
| name string,
|
||||||
|
| ts long
|
||||||
|
|) using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table ${tableName}_ro using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
|
||||||
|
Map("hoodie.query.as.ro.table" -> "true"))
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table ${tableName}_rt using hudi
|
||||||
|
| location '${tmp.getCanonicalPath}/$tableName'
|
||||||
|
| tblproperties (
|
||||||
|
| type = 'mor',
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts'
|
||||||
|
| )
|
||||||
|
""".stripMargin)
|
||||||
|
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
|
||||||
|
Map("hoodie.query.as.ro.table" -> "false"))
|
||||||
|
|
||||||
|
spark.sql(s"drop table ${tableName}_ro purge")
|
||||||
|
checkAnswer("show tables")()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier,
|
||||||
|
newProperties: Map[String, String]): Unit = {
|
||||||
|
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt)
|
||||||
|
val storage = catalogTable.storage
|
||||||
|
val storageProperties = storage.properties ++ newProperties
|
||||||
|
val newCatalogTable = catalogTable.copy(storage = storage.copy(properties = storageProperties))
|
||||||
|
sessionCatalog.alterTable(newCatalogTable)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user