[HUDI-4001] Filter the properties should not be used when create table for Spark SQL (#5495)
This commit is contained in:
@@ -18,6 +18,7 @@
|
|||||||
package org.apache.spark.sql.catalyst.catalog
|
package org.apache.spark.sql.catalyst.catalog
|
||||||
|
|
||||||
import org.apache.hudi.AvroConversionUtils
|
import org.apache.hudi.AvroConversionUtils
|
||||||
|
import org.apache.hudi.DataSourceWriteOptions.OPERATION
|
||||||
import org.apache.hudi.HoodieWriterUtils._
|
import org.apache.hudi.HoodieWriterUtils._
|
||||||
import org.apache.hudi.common.config.DFSPropertiesConfiguration
|
import org.apache.hudi.common.config.DFSPropertiesConfiguration
|
||||||
import org.apache.hudi.common.model.HoodieTableType
|
import org.apache.hudi.common.model.HoodieTableType
|
||||||
@@ -321,6 +322,8 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
}
|
}
|
||||||
|
|
||||||
object HoodieCatalogTable {
|
object HoodieCatalogTable {
|
||||||
|
// The properties should not be used when create table
|
||||||
|
val needFilterProps: List[String] = List(HoodieTableConfig.DATABASE_NAME.key, HoodieTableConfig.NAME.key, OPERATION.key)
|
||||||
|
|
||||||
def apply(sparkSession: SparkSession, tableIdentifier: TableIdentifier): HoodieCatalogTable = {
|
def apply(sparkSession: SparkSession, tableIdentifier: TableIdentifier): HoodieCatalogTable = {
|
||||||
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
|
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
|
||||||
|
|||||||
@@ -255,8 +255,7 @@ trait ProvidesHoodieConfig extends Logging {
|
|||||||
val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
|
val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
|
||||||
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
|
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
|
||||||
|
|
||||||
// operation can not be overwrite
|
val options = hoodieCatalogTable.catalogProperties
|
||||||
val options = hoodieCatalogTable.catalogProperties.-(OPERATION.key())
|
|
||||||
|
|
||||||
withSparkConf(sparkSession, options) {
|
withSparkConf(sparkSession, options) {
|
||||||
Map(
|
Map(
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
|
|||||||
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
|
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
|
||||||
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
|
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
|
||||||
import org.apache.spark.sql.catalyst.catalog._
|
import org.apache.spark.sql.catalyst.catalog._
|
||||||
|
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
|
||||||
import org.apache.spark.sql.hive.HiveClientUtils
|
import org.apache.spark.sql.hive.HiveClientUtils
|
||||||
import org.apache.spark.sql.hive.HiveExternalCatalog._
|
import org.apache.spark.sql.hive.HiveExternalCatalog._
|
||||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive
|
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive
|
||||||
@@ -130,8 +131,9 @@ object CreateHoodieTableCommand {
|
|||||||
.copy(table = tableName, database = Some(newDatabaseName))
|
.copy(table = tableName, database = Some(newDatabaseName))
|
||||||
|
|
||||||
val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name)
|
val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name)
|
||||||
// append pk, preCombineKey, type to the properties of table
|
// Remove some properties should not be used;append pk, preCombineKey, type to the properties of table
|
||||||
val newTblProperties = hoodieCatalogTable.catalogProperties ++ HoodieOptionConfig.extractSqlOptions(properties)
|
val newTblProperties =
|
||||||
|
hoodieCatalogTable.catalogProperties.--(needFilterProps) ++ HoodieOptionConfig.extractSqlOptions(properties)
|
||||||
val newTable = table.copy(
|
val newTable = table.copy(
|
||||||
identifier = newTableIdentifier,
|
identifier = newTableIdentifier,
|
||||||
storage = newStorage,
|
storage = newStorage,
|
||||||
|
|||||||
@@ -23,7 +23,8 @@ import org.apache.hudi.DataSourceWriteOptions
|
|||||||
import org.apache.hudi.hive.HiveSyncConfig
|
import org.apache.hudi.hive.HiveSyncConfig
|
||||||
import org.apache.hudi.hive.util.ConfigUtils
|
import org.apache.hudi.hive.util.ConfigUtils
|
||||||
import org.apache.hudi.sql.InsertMode
|
import org.apache.hudi.sql.InsertMode
|
||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
|
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable}
|
||||||
|
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
|
||||||
import org.apache.spark.sql.catalyst.plans.QueryPlan
|
import org.apache.spark.sql.catalyst.plans.QueryPlan
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
|
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
|
||||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
|
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
|
||||||
@@ -66,9 +67,21 @@ case class CreateHoodieTableAsSelectCommand(
|
|||||||
|
|
||||||
// ReOrder the query which move the partition columns to the last of the project list
|
// ReOrder the query which move the partition columns to the last of the project list
|
||||||
val reOrderedQuery = reOrderPartitionColumn(query, table.partitionColumnNames)
|
val reOrderedQuery = reOrderPartitionColumn(query, table.partitionColumnNames)
|
||||||
val tableWithSchema = table.copy(schema = reOrderedQuery.schema)
|
// Remove some properties should not be used
|
||||||
|
val newStorage = new CatalogStorageFormat(
|
||||||
|
table.storage.locationUri,
|
||||||
|
table.storage.inputFormat,
|
||||||
|
table.storage.outputFormat,
|
||||||
|
table.storage.serde,
|
||||||
|
table.storage.compressed,
|
||||||
|
table.storage.properties.--(needFilterProps))
|
||||||
|
val newTable = table.copy(
|
||||||
|
storage = newStorage,
|
||||||
|
schema = reOrderedQuery.schema,
|
||||||
|
properties = table.properties.--(needFilterProps)
|
||||||
|
)
|
||||||
|
|
||||||
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableWithSchema)
|
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
|
||||||
val tablePath = hoodieCatalogTable.tableLocation
|
val tablePath = hoodieCatalogTable.tableLocation
|
||||||
val hadoopConf = sparkSession.sessionState.newHadoopConf()
|
val hadoopConf = sparkSession.sessionState.newHadoopConf()
|
||||||
assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf),
|
assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf),
|
||||||
@@ -83,11 +96,11 @@ case class CreateHoodieTableAsSelectCommand(
|
|||||||
val options = Map(
|
val options = Map(
|
||||||
HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
|
HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString,
|
||||||
HiveSyncConfig.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava),
|
HiveSyncConfig.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava),
|
||||||
HiveSyncConfig.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(table.properties.asJava),
|
HiveSyncConfig.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(newTable.properties.asJava),
|
||||||
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
|
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
|
||||||
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
|
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
|
||||||
)
|
)
|
||||||
val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty,
|
val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, reOrderedQuery, Map.empty,
|
||||||
mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options)
|
mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options)
|
||||||
if (success) {
|
if (success) {
|
||||||
// If write success, create the table in catalog if it has not synced to the
|
// If write success, create the table in catalog if it has not synced to the
|
||||||
|
|||||||
@@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
|
|||||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions.assertFalse
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
class TestCreateTable extends HoodieSparkSqlTestBase {
|
class TestCreateTable extends HoodieSparkSqlTestBase {
|
||||||
@@ -49,8 +51,11 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|
|||||||
| ts long
|
| ts long
|
||||||
| ) using hudi
|
| ) using hudi
|
||||||
| tblproperties (
|
| tblproperties (
|
||||||
|
| hoodie.database.name = "databaseName",
|
||||||
|
| hoodie.table.name = "tableName",
|
||||||
| primaryKey = 'id',
|
| primaryKey = 'id',
|
||||||
| preCombineField = 'ts'
|
| preCombineField = 'ts',
|
||||||
|
| hoodie.datasource.write.operation = 'upsert'
|
||||||
| )
|
| )
|
||||||
""".stripMargin)
|
""".stripMargin)
|
||||||
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
|
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
|
||||||
@@ -65,6 +70,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|
|||||||
StructField("price", DoubleType),
|
StructField("price", DoubleType),
|
||||||
StructField("ts", LongType))
|
StructField("ts", LongType))
|
||||||
)(table.schema.fields)
|
)(table.schema.fields)
|
||||||
|
assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key()))
|
||||||
|
assertFalse(table.properties.contains(HoodieTableConfig.NAME.key()))
|
||||||
|
assertFalse(table.properties.contains(OPERATION.key()))
|
||||||
|
|
||||||
val tablePath = table.storage.properties("path")
|
val tablePath = table.storage.properties("path")
|
||||||
val metaClient = HoodieTableMetaClient.builder()
|
val metaClient = HoodieTableMetaClient.builder()
|
||||||
@@ -73,6 +81,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|
|||||||
.build()
|
.build()
|
||||||
val tableConfig = metaClient.getTableConfig
|
val tableConfig = metaClient.getTableConfig
|
||||||
assertResult(databaseName)(tableConfig.getDatabaseName)
|
assertResult(databaseName)(tableConfig.getDatabaseName)
|
||||||
|
assertResult(tableName)(tableConfig.getTableName)
|
||||||
|
assertFalse(tableConfig.contains(OPERATION.key()))
|
||||||
|
|
||||||
|
spark.sql("use default")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("Test Create Hoodie Table With Options") {
|
test("Test Create Hoodie Table With Options") {
|
||||||
@@ -88,8 +100,11 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|
|||||||
| ) using hudi
|
| ) using hudi
|
||||||
| partitioned by (dt)
|
| partitioned by (dt)
|
||||||
| options (
|
| options (
|
||||||
|
| hoodie.database.name = "databaseName",
|
||||||
|
| hoodie.table.name = "tableName",
|
||||||
| primaryKey = 'id',
|
| primaryKey = 'id',
|
||||||
| preCombineField = 'ts'
|
| preCombineField = 'ts',
|
||||||
|
| hoodie.datasource.write.operation = 'upsert'
|
||||||
| )
|
| )
|
||||||
""".stripMargin)
|
""".stripMargin)
|
||||||
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
|
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
|
||||||
@@ -108,6 +123,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|
|||||||
StructField("ts", LongType),
|
StructField("ts", LongType),
|
||||||
StructField("dt", StringType))
|
StructField("dt", StringType))
|
||||||
)(table.schema.fields)
|
)(table.schema.fields)
|
||||||
|
assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key()))
|
||||||
|
assertFalse(table.properties.contains(HoodieTableConfig.NAME.key()))
|
||||||
|
assertFalse(table.properties.contains(OPERATION.key()))
|
||||||
|
|
||||||
val tablePath = table.storage.properties("path")
|
val tablePath = table.storage.properties("path")
|
||||||
val metaClient = HoodieTableMetaClient.builder()
|
val metaClient = HoodieTableMetaClient.builder()
|
||||||
@@ -120,6 +138,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|
|||||||
assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key))
|
assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key))
|
||||||
assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key))
|
assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key))
|
||||||
assertResult(classOf[ComplexKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
|
assertResult(classOf[ComplexKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
|
||||||
|
assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
|
||||||
|
assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
|
||||||
|
assertFalse(tableConfig.contains(OPERATION.key()))
|
||||||
}
|
}
|
||||||
|
|
||||||
test("Test Create External Hoodie Table") {
|
test("Test Create External Hoodie Table") {
|
||||||
@@ -361,6 +382,84 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test Create Table As Select With Tblproperties For Filter Props") {
|
||||||
|
Seq("cow", "mor").foreach { tableType =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| create table $tableName using hudi
|
||||||
|
| partitioned by (dt)
|
||||||
|
| tblproperties(
|
||||||
|
| hoodie.database.name = "databaseName",
|
||||||
|
| hoodie.table.name = "tableName",
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts',
|
||||||
|
| hoodie.datasource.write.operation = 'upsert',
|
||||||
|
| type = '$tableType'
|
||||||
|
| )
|
||||||
|
| AS
|
||||||
|
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts
|
||||||
|
""".stripMargin
|
||||||
|
)
|
||||||
|
checkAnswer(s"select id, name, price, dt from $tableName")(
|
||||||
|
Seq(1, "a1", 10, "2021-04-01")
|
||||||
|
)
|
||||||
|
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
|
||||||
|
assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key()))
|
||||||
|
assertFalse(table.properties.contains(HoodieTableConfig.NAME.key()))
|
||||||
|
assertFalse(table.properties.contains(OPERATION.key()))
|
||||||
|
|
||||||
|
val tablePath = table.storage.properties("path")
|
||||||
|
val metaClient = HoodieTableMetaClient.builder()
|
||||||
|
.setBasePath(tablePath)
|
||||||
|
.setConf(spark.sessionState.newHadoopConf())
|
||||||
|
.build()
|
||||||
|
val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap
|
||||||
|
assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
|
||||||
|
assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
|
||||||
|
assertFalse(tableConfig.contains(OPERATION.key()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("Test Create Table As Select With Options For Filter Props") {
|
||||||
|
Seq("cow", "mor").foreach { tableType =>
|
||||||
|
val tableName = generateTableName
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
| create table $tableName using hudi
|
||||||
|
| partitioned by (dt)
|
||||||
|
| options(
|
||||||
|
| hoodie.database.name = "databaseName",
|
||||||
|
| hoodie.table.name = "tableName",
|
||||||
|
| primaryKey = 'id',
|
||||||
|
| preCombineField = 'ts',
|
||||||
|
| hoodie.datasource.write.operation = 'upsert',
|
||||||
|
| type = '$tableType'
|
||||||
|
| )
|
||||||
|
| AS
|
||||||
|
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts
|
||||||
|
""".stripMargin
|
||||||
|
)
|
||||||
|
checkAnswer(s"select id, name, price, dt from $tableName")(
|
||||||
|
Seq(1, "a1", 10, "2021-04-01")
|
||||||
|
)
|
||||||
|
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
|
||||||
|
assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key()))
|
||||||
|
assertFalse(table.properties.contains(HoodieTableConfig.NAME.key()))
|
||||||
|
assertFalse(table.properties.contains(OPERATION.key()))
|
||||||
|
|
||||||
|
val tablePath = table.storage.properties("path")
|
||||||
|
val metaClient = HoodieTableMetaClient.builder()
|
||||||
|
.setBasePath(tablePath)
|
||||||
|
.setConf(spark.sessionState.newHadoopConf())
|
||||||
|
.build()
|
||||||
|
val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap
|
||||||
|
assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
|
||||||
|
assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
|
||||||
|
assertFalse(tableConfig.contains(OPERATION.key()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
test("Test Create Table As Select when 'spark.sql.datetime.java8API.enabled' enables") {
|
test("Test Create Table As Select when 'spark.sql.datetime.java8API.enabled' enables") {
|
||||||
try {
|
try {
|
||||||
// enable spark.sql.datetime.java8API.enabled
|
// enable spark.sql.datetime.java8API.enabled
|
||||||
|
|||||||
Reference in New Issue
Block a user