1
0

[HUDI-4103] [HUDI-4001] Filter the properties should not be used when create table for Spark SQL

This commit is contained in:
董可伦
2022-05-16 23:26:23 +08:00
committed by GitHub
parent 43e08193ef
commit a7a42e4490

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils, HoodieCatalogTable}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType} import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType}
@@ -215,7 +216,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
val loc = locUriOpt val loc = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri)) .orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(id)) .getOrElse(spark.sessionState.catalog.defaultTablePath(id))
val storage = DataSource.buildStorageFormatFromOptions(writeOptions) val storage = DataSource.buildStorageFormatFromOptions(writeOptions.--(needFilterProps))
.copy(locationUri = Option(loc)) .copy(locationUri = Option(loc))
val tableType = val tableType =
if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
@@ -233,7 +234,7 @@ class HoodieCatalog extends DelegatingCatalogExtension
provider = Option("hudi"), provider = Option("hudi"),
partitionColumnNames = newPartitionColumns, partitionColumnNames = newPartitionColumns,
bucketSpec = newBucketSpec, bucketSpec = newBucketSpec,
properties = tablePropertiesNew.asScala.toMap, properties = tablePropertiesNew.asScala.toMap.--(needFilterProps),
comment = commentOpt) comment = commentOpt)
val hoodieCatalogTable = HoodieCatalogTable(spark, tableDesc) val hoodieCatalogTable = HoodieCatalogTable(spark, tableDesc)