1
0

[HUDI-4149] Drop-Table fails when underlying table directory is broken (#5672)

This commit is contained in:
Jin Xing
2022-05-30 19:09:26 +08:00
committed by GitHub
parent 329da34ee0
commit 918c4f4e0b
4 changed files with 182 additions and 50 deletions

View File

@@ -39,9 +39,12 @@ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
/** /**
* A wrapper of hoodie CatalogTable instance and hoodie Table. * Table definition for SQL funcitonalities. Depending on the way of data generation,
* meta of Hudi table can be from Spark catalog or meta directory on filesystem.
* [[HoodieCatalogTable]] takes both meta sources into consideration when handling
* EXTERNAL and MANAGED tables.
*/ */
class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) extends Logging { class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) extends Logging {
assert(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi", "It's not a Hudi table") assert(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi", "It's not a Hudi table")
@@ -117,23 +120,9 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name() lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name()
/** /**
* The schema of table. * Table schema
* Make StructField nullable and fill the comments in.
*/ */
lazy val tableSchema: StructType = { lazy val tableSchema: StructType = table.schema
val resolver = spark.sessionState.conf.resolver
val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).getOrElse(table.schema)
val fields = originSchema.fields.map { f =>
val nullableField: StructField = f.copy(nullable = true)
val catalogField = findColumnByName(table.schema, nullableField.name, resolver)
if (catalogField.isDefined) {
catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField)
} else {
nullableField
}
}
StructType(fields)
}
/** /**
* The schema without hoodie meta fields * The schema without hoodie meta fields
@@ -168,12 +157,14 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
def isPartitionedTable: Boolean = table.partitionColumnNames.nonEmpty def isPartitionedTable: Boolean = table.partitionColumnNames.nonEmpty
/** /**
* init hoodie table for create table (as select) * Initializes table meta on filesystem when applying CREATE TABLE clause.
*/ */
def initHoodieTable(): Unit = { def initHoodieTable(): Unit = {
logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}") logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}")
val (finalSchema, tableConfigs) = parseSchemaAndConfigs() val (finalSchema, tableConfigs) = parseSchemaAndConfigs()
table = table.copy(schema = finalSchema)
// Save all the table config to the hoodie.properties. // Save all the table config to the hoodie.properties.
val properties = new Properties() val properties = new Properties()
properties.putAll(tableConfigs.asJava) properties.putAll(tableConfigs.asJava)
@@ -199,7 +190,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
} }
/** /**
* @return schema, table parameters in which all parameters aren't sql-styled. * Derives the SQL schema and configurations for a Hudi table:
* 1. Columns in the schema fall under two categories -- the data columns described in
* CREATE TABLE clause and meta columns enumerated in [[HoodieRecord#HOODIE_META_COLUMNS]];
* 2. Configurations derived come from config file, PROPERTIES and OPTIONS in CREATE TABLE clause.
*/ */
private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = { private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
@@ -216,24 +210,25 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties) val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties)
validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig)) validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig))
val options = extraTableConfig(spark, hoodieTableExists, currentTableConfig) ++ val options = extraTableConfig(hoodieTableExists, currentTableConfig) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig
ValidationUtils.checkArgument(tableSchema.nonEmpty || table.schema.nonEmpty, val schemaFromMetaOpt = loadTableSchemaByMetaClient()
s"Missing schema for Create Table: $catalogTableName") val schema = if (schemaFromMetaOpt.nonEmpty) {
val schema = if (tableSchema.nonEmpty) { schemaFromMetaOpt.get
tableSchema } else if (table.schema.nonEmpty) {
} else {
addMetaFields(table.schema) addMetaFields(table.schema)
} else {
throw new AnalysisException(
s"Missing schema fields when applying CREATE TABLE clause for ${catalogTableName}")
} }
(schema, options) (schema, options)
case (_, false) => case (_, false) =>
ValidationUtils.checkArgument(table.schema.nonEmpty, ValidationUtils.checkArgument(table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName") s"Missing schema for Create Table: $catalogTableName")
val schema = table.schema val schema = table.schema
val options = extraTableConfig(spark, isTableExists = false, globalTableConfigs) ++ val options = extraTableConfig(tableExists = false, globalTableConfigs) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
(addMetaFields(schema), options) (addMetaFields(schema), options)
@@ -253,10 +248,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
(finalSchema, tableConfigs) (finalSchema, tableConfigs)
} }
private def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean, private def extraTableConfig(tableExists: Boolean,
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = { originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
val extraConfig = mutable.Map.empty[String, String] val extraConfig = mutable.Map.empty[String, String]
if (isTableExists) { if (tableExists) {
val allPartitionPaths = getPartitionPaths val allPartitionPaths = getPartitionPaths
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) { if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
@@ -287,6 +282,24 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
extraConfig.toMap extraConfig.toMap
} }
private def loadTableSchemaByMetaClient(): Option[StructType] = {
val resolver = spark.sessionState.conf.resolver
getTableSqlSchema(metaClient, includeMetadataFields = true).map(originSchema => {
// Load table schema from meta on filesystem, and fill in 'comment'
// information from Spark catalog.
val fields = originSchema.fields.map { f =>
val nullableField: StructField = f.copy(nullable = true)
val catalogField = findColumnByName(table.schema, nullableField.name, resolver)
if (catalogField.isDefined) {
catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField)
} else {
nullableField
}
}
StructType(fields)
})
}
// This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema // This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema
private def verifyDataSchema(tableIdentifier: TableIdentifier, tableType: CatalogTableType, private def verifyDataSchema(tableIdentifier: TableIdentifier, tableType: CatalogTableType,
dataSchema: Seq[StructField]): Unit = { dataSchema: Seq[StructField]): Unit = {

View File

@@ -23,39 +23,44 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog._
import scala.util.control.NonFatal /**
* Physical plan node for dropping a table.
*/
case class DropHoodieTableCommand( case class DropHoodieTableCommand(
tableIdentifier: TableIdentifier, tableIdentifier: TableIdentifier,
ifExists: Boolean, ifExists: Boolean,
isView: Boolean, isView: Boolean,
purge: Boolean) purge: Boolean) extends HoodieLeafRunnableCommand {
extends HoodieLeafRunnableCommand {
val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt" private val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt"
val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro" private val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro"
override def run(sparkSession: SparkSession): Seq[Row] = { override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" logInfo(s"Start executing 'DROP TABLE' on ${tableIdentifier.unquotedString}" +
logInfo(s"start execute drop table command for $fullTableName") s" (ifExists=${ifExists}, purge=${purge}).")
sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) if (!sparkSession.catalog.tableExists(tableIdentifier.unquotedString)) {
sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
try {
// drop catalog table for this hoodie table
dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}")
} }
val qualifiedTableName = QualifiedTableName(
tableIdentifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
tableIdentifier.table)
sparkSession.sessionState.catalog.invalidateCachedTable(qualifiedTableName)
logInfo(s"Finish execute drop table command for $fullTableName") dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge)
logInfo(s"Finished executing 'DROP TABLE' on ${tableIdentifier.unquotedString}.")
Seq.empty[Row] Seq.empty[Row]
} }
def dropTableInCatalog(sparkSession: SparkSession, /**
* Drops table in Spark catalog. Note that RO & RT table could coexist with a MOR table.
* If `purge` enabled, RO & RT table and corresponding data directory on filesystem will
* all be removed.
*/
private def dropTableInCatalog(sparkSession: SparkSession,
tableIdentifier: TableIdentifier, tableIdentifier: TableIdentifier,
ifExists: Boolean, ifExists: Boolean,
purge: Boolean): Unit = { purge: Boolean): Unit = {
@@ -67,7 +72,8 @@ extends HoodieLeafRunnableCommand {
val catalog = sparkSession.sessionState.catalog val catalog = sparkSession.sessionState.catalog
// Drop table in the catalog // Drop table in the catalog
if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) { if (hoodieCatalogTable.hoodieTableExists &&
HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable) val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable)
rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false)) rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false)) roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))

View File

@@ -34,6 +34,7 @@ set hoodie.delete.shuffle.parallelism = 1;
# CTAS # CTAS
create table h0 using hudi options(type = '${tableType}', primaryKey = 'id') create table h0 using hudi options(type = '${tableType}', primaryKey = 'id')
location '${tmpDir}/h0'
as select 1 as id, 'a1' as name, 10 as price; as select 1 as id, 'a1' as name, 10 as price;
+----------+ +----------+
| ok | | ok |
@@ -46,6 +47,7 @@ select id, name, price from h0;
create table h0_p using hudi partitioned by(dt) create table h0_p using hudi partitioned by(dt)
options(type = '${tableType}', primaryKey = 'id') options(type = '${tableType}', primaryKey = 'id')
location '${tmpDir}/h0_p'
as select cast('2021-05-07 00:00:00' as timestamp) as dt, as select cast('2021-05-07 00:00:00' as timestamp) as dt,
1 as id, 'a1' as name, 10 as price; 1 as id, 'a1' as name, 10 as price;
+----------+ +----------+

View File

@@ -17,6 +17,8 @@
package org.apache.spark.sql.hudi package org.apache.spark.sql.hudi
import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -230,6 +232,115 @@ class TestDropTable extends HoodieSparkSqlTestBase {
} }
} }
test("Drop an EXTERNAL table which path is lost.") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
spark.sql(
s"""
|create table $tableName (
|id int,
|ts int,
|value string
|)using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
|""".stripMargin)
assert(filesystem.exists(new Path(tablePath)), s"Table path doesn't exists (${tablePath}).")
filesystem.delete(new Path(tablePath), true)
spark.sql(s"drop table ${tableName}")
checkAnswer("show tables")()
}
}
test("Drop an MOR table and related RT & RO when path is lost.") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
spark.sql(
s"""
|create table $tableName (
|id int,
|ts int,
|value string
|)using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'mor'
| )
|""".stripMargin)
assert(filesystem.exists(new Path(tablePath)), s"Table path doesn't exist (${tablePath}).")
spark.sql(
s"""
|create table ${tableName}_ro using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))
spark.sql(
s"""
|create table ${tableName}_rt using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))
filesystem.delete(new Path(tablePath), true)
spark.sql(s"drop table ${tableName}")
spark.sql(s"drop table ${tableName}_ro")
spark.sql(s"drop table ${tableName}_rt")
checkAnswer("show tables")()
}
}
test("Drop an MANAGED table which path is lost.") {
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
|id int,
|ts int,
|value string
|)using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
|""".stripMargin)
val tablePath = new Path(
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location)
val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
assert(filesystem.exists(tablePath), s"Table path doesn't exists ($tablePath).")
filesystem.delete(tablePath, true)
spark.sql(s"drop table ${tableName}")
checkAnswer("show tables")()
}
private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier, private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier,
newProperties: Map[String, String]): Unit = { newProperties: Map[String, String]): Unit = {
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt) val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt)