1
0

[HUDI-2520] Fix drop table issue when sync to Hive (#5143)

This commit is contained in:
leesf
2022-03-29 10:34:12 +08:00
committed by GitHub
parent 3bf9c5ffe8
commit 8f8a8158e2
2 changed files with 15 additions and 14 deletions

View File

@@ -19,12 +19,10 @@
package org.apache.spark.sql.hudi.catalog
import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
@@ -34,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChan
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CreateHoodieTableCommand}
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _}
@@ -116,21 +114,24 @@ class HoodieCatalog extends DelegatingCatalogExtension
override def tableExists(ident: Identifier): Boolean = super.tableExists(ident)
override def dropTable(ident: Identifier): Boolean = super.dropTable(ident)
override def dropTable(ident: Identifier): Boolean = {
val table = loadTable(ident)
table match {
case _: HoodieInternalV2Table =>
DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = false).run(spark)
true
case _ => super.dropTable(ident)
}
}
override def purgeTable(ident: Identifier): Boolean = {
val table = loadTable(ident)
table match {
case hoodieTable: HoodieInternalV2Table =>
val location = hoodieTable.hoodieCatalogTable.tableLocation
val targetPath = new Path(location)
val engineContext = new HoodieSparkEngineContext(spark.sparkContext)
val fs = FSUtils.getFs(location, spark.sparkContext.hadoopConfiguration)
FSUtils.deleteDir(engineContext, fs, targetPath, spark.sparkContext.defaultParallelism)
super.dropTable(ident)
case _ =>
case _: HoodieInternalV2Table =>
DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = true).run(spark)
true
case _ => super.purgeTable(ident)
}
true
}
@throws[NoSuchTableException]