From 75f847691f0bdaf226d4713a8cb8c7639cffd5e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=91=A3=E5=8F=AF=E4=BC=A6?= Date: Mon, 16 May 2022 09:50:29 +0800 Subject: [PATCH] [HUDI-4001] Filter the properties should not be used when create table for Spark SQL (#5495) --- .../catalyst/catalog/HoodieCatalogTable.scala | 3 + .../spark/sql/hudi/ProvidesHoodieConfig.scala | 3 +- .../command/CreateHoodieTableCommand.scala | 6 +- .../CreateHoodieTableAsSelectCommand.scala | 23 +++- .../spark/sql/hudi/TestCreateTable.scala | 103 +++++++++++++++++- 5 files changed, 127 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 7ee8f6ad5..76cea362a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.hudi.AvroConversionUtils +import org.apache.hudi.DataSourceWriteOptions.OPERATION import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType @@ -321,6 +322,8 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten } object HoodieCatalogTable { + // The properties should not be used when create table + val needFilterProps: List[String] = List(HoodieTableConfig.DATABASE_NAME.key, HoodieTableConfig.NAME.key, OPERATION.key) def apply(sparkSession: SparkSession, tableIdentifier: TableIdentifier): HoodieCatalogTable = { val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 31fb0ad6c..131ebebe8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -255,8 +255,7 @@ trait ProvidesHoodieConfig extends Logging { val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) - // operation can not be overwrite - val options = hoodieCatalogTable.catalogProperties.-(OPERATION.key()) + val options = hoodieCatalogTable.catalogProperties withSparkConf(sparkSession, options) { Map( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 195bf4153..9bf1d7215 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -26,6 +26,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive @@ -130,8 +131,9 @@ object CreateHoodieTableCommand { .copy(table = tableName, database = Some(newDatabaseName)) val partitionColumnNames = hoodieCatalogTable.partitionSchema.map(_.name) - // append pk, preCombineKey, type to the properties of table - val newTblProperties = hoodieCatalogTable.catalogProperties ++ HoodieOptionConfig.extractSqlOptions(properties) + // Remove some properties should not be used;append pk, preCombineKey, type to the properties of table + val newTblProperties = + hoodieCatalogTable.catalogProperties.--(needFilterProps) ++ HoodieOptionConfig.extractSqlOptions(properties) val newTable = table.copy( identifier = newTableIdentifier, storage = newStorage, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 1d2cea10a..66aeb850e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -23,7 +23,8 @@ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.sql.InsertMode -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils @@ -66,9 +67,21 @@ case class CreateHoodieTableAsSelectCommand( // ReOrder the query which move the partition columns to the last of the project list val reOrderedQuery = reOrderPartitionColumn(query, table.partitionColumnNames) - val tableWithSchema = table.copy(schema = reOrderedQuery.schema) + // Remove some properties should not be used + val newStorage = new CatalogStorageFormat( + table.storage.locationUri, + table.storage.inputFormat, + table.storage.outputFormat, + table.storage.serde, + table.storage.compressed, + table.storage.properties.--(needFilterProps)) + val newTable = table.copy( + storage = newStorage, + schema = reOrderedQuery.schema, + properties = table.properties.--(needFilterProps) + ) - val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableWithSchema) + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable) val tablePath = hoodieCatalogTable.tableLocation val hadoopConf = sparkSession.sessionState.newHadoopConf() assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf), @@ -83,11 +96,11 @@ case class CreateHoodieTableAsSelectCommand( val options = Map( HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString, HiveSyncConfig.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava), - HiveSyncConfig.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(table.properties.asJava), + HiveSyncConfig.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(newTable.properties.asJava), DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(), DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true" ) - val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty, + val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, reOrderedQuery, Map.empty, mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options) if (success) { // If write success, create the table in catalog if it has not synced to the diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 69147272d..5435aad05 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertFalse + import scala.collection.JavaConverters._ class TestCreateTable extends HoodieSparkSqlTestBase { @@ -49,8 +51,11 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | ts long | ) using hudi | tblproperties ( + | hoodie.database.name = "databaseName", + | hoodie.table.name = "tableName", | primaryKey = 'id', - | preCombineField = 'ts' + | preCombineField = 'ts', + | hoodie.datasource.write.operation = 'upsert' | ) """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) @@ -65,6 +70,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase { StructField("price", DoubleType), StructField("ts", LongType)) )(table.schema.fields) + assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key())) + assertFalse(table.properties.contains(HoodieTableConfig.NAME.key())) + assertFalse(table.properties.contains(OPERATION.key())) val tablePath = table.storage.properties("path") val metaClient = HoodieTableMetaClient.builder() @@ -73,6 +81,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase { .build() val tableConfig = metaClient.getTableConfig assertResult(databaseName)(tableConfig.getDatabaseName) + assertResult(tableName)(tableConfig.getTableName) + assertFalse(tableConfig.contains(OPERATION.key())) + + spark.sql("use default") } test("Test Create Hoodie Table With Options") { @@ -88,8 +100,11 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | ) using hudi | partitioned by (dt) | options ( + | hoodie.database.name = "databaseName", + | hoodie.table.name = "tableName", | primaryKey = 'id', - | preCombineField = 'ts' + | preCombineField = 'ts', + | hoodie.datasource.write.operation = 'upsert' | ) """.stripMargin) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) @@ -108,6 +123,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase { StructField("ts", LongType), StructField("dt", StringType)) )(table.schema.fields) + assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key())) + assertFalse(table.properties.contains(HoodieTableConfig.NAME.key())) + assertFalse(table.properties.contains(OPERATION.key())) val tablePath = table.storage.properties("path") val metaClient = HoodieTableMetaClient.builder() @@ -120,6 +138,9 @@ class TestCreateTable extends HoodieSparkSqlTestBase { assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key)) assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key)) assertResult(classOf[ComplexKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) + assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) + assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) + assertFalse(tableConfig.contains(OPERATION.key())) } test("Test Create External Hoodie Table") { @@ -361,6 +382,84 @@ class TestCreateTable extends HoodieSparkSqlTestBase { } } + test("Test Create Table As Select With Tblproperties For Filter Props") { + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName using hudi + | partitioned by (dt) + | tblproperties( + | hoodie.database.name = "databaseName", + | hoodie.table.name = "tableName", + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.operation = 'upsert', + | type = '$tableType' + | ) + | AS + | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts + """.stripMargin + ) + checkAnswer(s"select id, name, price, dt from $tableName")( + Seq(1, "a1", 10, "2021-04-01") + ) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key())) + assertFalse(table.properties.contains(HoodieTableConfig.NAME.key())) + assertFalse(table.properties.contains(OPERATION.key())) + + val tablePath = table.storage.properties("path") + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap + assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) + assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) + assertFalse(tableConfig.contains(OPERATION.key())) + } + } + + test("Test Create Table As Select With Options For Filter Props") { + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName using hudi + | partitioned by (dt) + | options( + | hoodie.database.name = "databaseName", + | hoodie.table.name = "tableName", + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.datasource.write.operation = 'upsert', + | type = '$tableType' + | ) + | AS + | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt, 1000 as ts + """.stripMargin + ) + checkAnswer(s"select id, name, price, dt from $tableName")( + Seq(1, "a1", 10, "2021-04-01") + ) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assertFalse(table.properties.contains(HoodieTableConfig.DATABASE_NAME.key())) + assertFalse(table.properties.contains(HoodieTableConfig.NAME.key())) + assertFalse(table.properties.contains(OPERATION.key())) + + val tablePath = table.storage.properties("path") + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap + assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key())) + assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key())) + assertFalse(tableConfig.contains(OPERATION.key())) + } + } + test("Test Create Table As Select when 'spark.sql.datetime.java8API.enabled' enables") { try { // enable spark.sql.datetime.java8API.enabled