[HUDI-4100] CTAS failed to clean up when given an illegal MANAGED table definition (#5588)
This commit is contained in:
@@ -763,4 +763,22 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|
||||
assertResult(true)(shown.contains("COMMENT 'This is a simple hudi table'"))
|
||||
}
|
||||
}
|
||||
|
||||
test("Test CTAS using an illegal definition -- a COW table with compaction enabled.") {
|
||||
val tableName = generateTableName
|
||||
checkExceptionContain(
|
||||
s"""
|
||||
| create table $tableName using hudi
|
||||
| tblproperties(
|
||||
| primaryKey = 'id',
|
||||
| type = 'cow',
|
||||
| hoodie.compact.inline='true'
|
||||
| )
|
||||
| AS
|
||||
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
|
||||
|""".stripMargin)("Compaction is not supported on a CopyOnWrite table")
|
||||
val dbPath = spark.sessionState.catalog.getDatabaseMetadata("default").locationUri.getPath
|
||||
val tablePath = s"${dbPath}/${tableName}"
|
||||
assertResult(false)(existsPath(tablePath))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
|
||||
import org.apache.spark.sql.types.{StructField, StructType}
|
||||
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _}
|
||||
|
||||
import java.net.URI
|
||||
import java.util
|
||||
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}
|
||||
|
||||
@@ -50,7 +51,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
|
||||
|
||||
override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
|
||||
if (sparkAdapter.isHoodieTable(properties)) {
|
||||
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
|
||||
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
|
||||
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
|
||||
properties, TableCreationMode.STAGE_CREATE)
|
||||
} else {
|
||||
BasicStagedTable(
|
||||
ident,
|
||||
@@ -61,7 +64,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
|
||||
|
||||
override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
|
||||
if (sparkAdapter.isHoodieTable(properties)) {
|
||||
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
|
||||
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
|
||||
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
|
||||
properties, TableCreationMode.STAGE_REPLACE)
|
||||
} else {
|
||||
super.dropTable(ident)
|
||||
BasicStagedTable(
|
||||
@@ -76,8 +81,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
|
||||
partitions: Array[Transform],
|
||||
properties: util.Map[String, String]): StagedTable = {
|
||||
if (sparkAdapter.isHoodieTable(properties)) {
|
||||
HoodieStagedTable(
|
||||
ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
|
||||
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
|
||||
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
|
||||
properties, TableCreationMode.CREATE_OR_REPLACE)
|
||||
} else {
|
||||
try super.dropTable(ident) catch {
|
||||
case _: NoSuchTableException => // ignore the exception
|
||||
@@ -112,7 +118,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
|
||||
schema: StructType,
|
||||
partitions: Array[Transform],
|
||||
properties: util.Map[String, String]): Table = {
|
||||
createHoodieTable(ident, schema, partitions, properties, Map.empty, Option.empty, TableCreationMode.CREATE)
|
||||
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
|
||||
createHoodieTable(ident, schema, locUriAndTableType, partitions, properties,
|
||||
Map.empty, Option.empty, TableCreationMode.CREATE)
|
||||
}
|
||||
|
||||
override def tableExists(ident: Identifier): Boolean = super.tableExists(ident)
|
||||
@@ -193,8 +201,30 @@ class HoodieCatalog extends DelegatingCatalogExtension
|
||||
loadTable(ident)
|
||||
}
|
||||
|
||||
private def deduceTableLocationURIAndTableType(
|
||||
ident: Identifier, properties: util.Map[String, String]): (URI, CatalogTableType) = {
|
||||
val locOpt = if (isPathIdentifier(ident)) {
|
||||
Option(ident.name())
|
||||
} else {
|
||||
Option(properties.get("location"))
|
||||
}
|
||||
val tableType = if (locOpt.nonEmpty) {
|
||||
CatalogTableType.EXTERNAL
|
||||
} else {
|
||||
CatalogTableType.MANAGED
|
||||
}
|
||||
val locUriOpt = locOpt.map(CatalogUtils.stringToURI)
|
||||
val tableIdent = ident.asTableIdentifier
|
||||
val existingTableOpt = getExistingTableIfExists(tableIdent)
|
||||
val locURI = locUriOpt
|
||||
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
|
||||
.getOrElse(spark.sessionState.catalog.defaultTablePath(tableIdent))
|
||||
(locURI, tableType)
|
||||
}
|
||||
|
||||
def createHoodieTable(ident: Identifier,
|
||||
schema: StructType,
|
||||
locUriAndTableType: (URI, CatalogTableType),
|
||||
partitions: Array[Transform],
|
||||
allTableProperties: util.Map[String, String],
|
||||
writeOptions: Map[String, String],
|
||||
@@ -206,29 +236,17 @@ class HoodieCatalog extends DelegatingCatalogExtension
|
||||
val newPartitionColumns = partitionColumns
|
||||
val newBucketSpec = maybeBucketSpec
|
||||
|
||||
val isByPath = isPathIdentifier(ident)
|
||||
|
||||
val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location"))
|
||||
val id = ident.asTableIdentifier
|
||||
|
||||
val locUriOpt = location.map(CatalogUtils.stringToURI)
|
||||
val existingTableOpt = getExistingTableIfExists(id)
|
||||
val loc = locUriOpt
|
||||
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
|
||||
.getOrElse(spark.sessionState.catalog.defaultTablePath(id))
|
||||
val storage = DataSource.buildStorageFormatFromOptions(writeOptions.--(needFilterProps))
|
||||
.copy(locationUri = Option(loc))
|
||||
val tableType =
|
||||
if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
|
||||
.copy(locationUri = Option(locUriAndTableType._1))
|
||||
val commentOpt = Option(allTableProperties.get("comment"))
|
||||
|
||||
val tablePropertiesNew = new util.HashMap[String, String](allTableProperties)
|
||||
// put path to table properties.
|
||||
tablePropertiesNew.put("path", loc.getPath)
|
||||
tablePropertiesNew.put("path", locUriAndTableType._1.getPath)
|
||||
|
||||
val tableDesc = new CatalogTable(
|
||||
identifier = id,
|
||||
tableType = tableType,
|
||||
identifier = ident.asTableIdentifier,
|
||||
tableType = locUriAndTableType._2,
|
||||
storage = storage,
|
||||
schema = newSchema,
|
||||
provider = Option("hudi"),
|
||||
|
||||
@@ -21,16 +21,18 @@ import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.DataSourceWriteOptions.RECORDKEY_FIELD
|
||||
import org.apache.spark.sql.DataFrame
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, SupportsWrite, TableCapability}
|
||||
import org.apache.spark.sql.connector.expressions.Transform
|
||||
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, V1Write, WriteBuilder}
|
||||
import org.apache.spark.sql.sources.InsertableRelation
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import java.net.URI
|
||||
import java.util
|
||||
import scala.collection.JavaConverters.{mapAsScalaMapConverter, setAsJavaSetConverter}
|
||||
|
||||
case class HoodieStagedTable(ident: Identifier,
|
||||
locUriAndTableType: (URI, CatalogTableType),
|
||||
catalog: HoodieCatalog,
|
||||
override val schema: StructType,
|
||||
partitions: Array[Transform],
|
||||
@@ -59,13 +61,14 @@ case class HoodieStagedTable(ident: Identifier,
|
||||
props.putAll(properties)
|
||||
props.put("hoodie.table.name", ident.name())
|
||||
props.put(RECORDKEY_FIELD.key, properties.get("primaryKey"))
|
||||
catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode)
|
||||
catalog.createHoodieTable(
|
||||
ident, schema, locUriAndTableType, partitions, props, writeOptions, sourceQuery, mode)
|
||||
}
|
||||
|
||||
override def name(): String = ident.name()
|
||||
|
||||
override def abortStagedChanges(): Unit = {
|
||||
clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration)
|
||||
clearTablePath(locUriAndTableType._1.getPath, catalog.spark.sparkContext.hadoopConfiguration)
|
||||
}
|
||||
|
||||
private def clearTablePath(tablePath: String, conf: Configuration): Unit = {
|
||||
@@ -85,13 +88,9 @@ case class HoodieStagedTable(ident: Identifier,
|
||||
* WriteBuilder for creating a Hoodie table.
|
||||
*/
|
||||
private class HoodieV1WriteBuilder extends WriteBuilder {
|
||||
override def build(): V1Write = new V1Write {
|
||||
override def toInsertableRelation(): InsertableRelation = {
|
||||
new InsertableRelation {
|
||||
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
|
||||
sourceQuery = Option(data)
|
||||
}
|
||||
}
|
||||
override def build(): V1Write = () => {
|
||||
(data: DataFrame, overwrite: Boolean) => {
|
||||
sourceQuery = Option(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user