[HUDI-2212] Missing PrimaryKey In Hoodie Properties For CTAS Table (#3332)
This commit is contained in:
@@ -90,7 +90,7 @@ case class HoodieFileIndex(
|
||||
*/
|
||||
private lazy val _partitionSchemaFromProperties: StructType = {
|
||||
val tableConfig = metaClient.getTableConfig
|
||||
val partitionColumns = tableConfig.getPartitionColumns
|
||||
val partitionColumns = tableConfig.getPartitionFields
|
||||
val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
|
||||
|
||||
if (partitionColumns.isPresent) {
|
||||
|
||||
@@ -119,15 +119,17 @@ object HoodieSparkSqlWriter {
|
||||
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP)
|
||||
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP)
|
||||
val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator)
|
||||
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
|
||||
|
||||
val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(tableType)
|
||||
.setTableName(tblName)
|
||||
.setRecordKeyFields(recordKeyFields)
|
||||
.setBaseFileFormat(baseFileFormat)
|
||||
.setArchiveLogFolder(archiveLogFolder)
|
||||
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
|
||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
.setPartitionColumns(partitionColumns)
|
||||
.setPartitionFields(partitionColumns)
|
||||
.setPopulateMetaFields(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean)
|
||||
.initTable(sparkContext.hadoopConfiguration, path.get)
|
||||
tableConfig = tableMetaClient.getTableConfig
|
||||
@@ -302,15 +304,18 @@ object HoodieSparkSqlWriter {
|
||||
if (!tableExists) {
|
||||
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP)
|
||||
val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
|
||||
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
|
||||
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(HoodieTableType.valueOf(tableType))
|
||||
.setTableName(tableName)
|
||||
.setRecordKeyFields(recordKeyFields)
|
||||
.setArchiveLogFolder(archiveLogFolder)
|
||||
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_OPT_KEY))
|
||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
|
||||
.setBootstrapIndexClass(bootstrapIndexClass)
|
||||
.setBootstrapBasePath(bootstrapBasePath)
|
||||
.setPartitionColumns(partitionColumns)
|
||||
.setPartitionFields(partitionColumns)
|
||||
.initTable(sparkContext.hadoopConfiguration, path)
|
||||
}
|
||||
|
||||
@@ -471,6 +476,9 @@ object HoodieSparkSqlWriter {
|
||||
hiveSyncConfig.syncAsSparkDataSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
|
||||
hiveSyncConfig.sparkSchemaLengthThreshold = sqlConf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD)
|
||||
hiveSyncConfig.createManagedTable = hoodieConfig.getBoolean(HIVE_CREATE_MANAGED_TABLE)
|
||||
|
||||
hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
|
||||
hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
|
||||
hiveSyncConfig
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hudi.DataSourceWriteOptions
|
||||
import org.apache.hudi.hive.util.ConfigUtils
|
||||
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
|
||||
@@ -27,6 +28,8 @@ import org.apache.spark.sql.execution.SparkPlan
|
||||
import org.apache.spark.sql.execution.command.DataWritingCommand
|
||||
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* Command for create table as query statement.
|
||||
*/
|
||||
@@ -71,8 +74,14 @@ case class CreateHoodieTableAsSelectCommand(
|
||||
// Execute the insert query
|
||||
try {
|
||||
// Set if sync as a managed table.
|
||||
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key(),
|
||||
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key,
|
||||
(table.tableType == CatalogTableType.MANAGED).toString)
|
||||
// Sync the options to hive serde properties
|
||||
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key,
|
||||
ConfigUtils.configToString(table.storage.properties.asJava))
|
||||
// Sync the table properties to hive
|
||||
sparkSession.sessionState.conf.setConfString(DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key,
|
||||
ConfigUtils.configToString(table.properties.asJava))
|
||||
val success = InsertIntoHoodieTableCommand.run(sparkSession, tableWithSchema, reOrderedQuery, Map.empty,
|
||||
mode == SaveMode.Overwrite, refreshTable = false)
|
||||
if (success) {
|
||||
|
||||
@@ -103,9 +103,9 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
|
||||
val tableSchema = avroSchema.map(SchemaConverters.toSqlType(_).dataType
|
||||
.asInstanceOf[StructType])
|
||||
|
||||
// Get options from the external table
|
||||
// Get options from the external table and append with the options in ddl.
|
||||
val options = HoodieOptionConfig.mappingTableConfigToSqlOption(
|
||||
metaClient.getTableConfig.getProps.asScala.toMap)
|
||||
metaClient.getTableConfig.getProps.asScala.toMap) ++ table.storage.properties
|
||||
|
||||
val userSpecifiedSchema = table.schema
|
||||
if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) {
|
||||
@@ -329,7 +329,7 @@ object CreateHoodieTableCommand extends Logging {
|
||||
.fromProperties(properties)
|
||||
.setTableName(tableName)
|
||||
.setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString())
|
||||
.setPartitionColumns(table.partitionColumnNames.mkString(","))
|
||||
.setPartitionFields(table.partitionColumnNames.mkString(","))
|
||||
.initTable(conf, location)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.spark.sql.hudi
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
||||
|
||||
class TestMergeIntoTable2 extends TestHoodieSqlBase {
|
||||
|
||||
test("Test MergeInto for MOR table 2") {
|
||||
@@ -135,4 +137,39 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
|
||||
)("assertion failed: Target table's field(price) cannot be the right-value of the update clause for MOR table.")
|
||||
}
|
||||
}
|
||||
|
||||
test("Test Merge Into CTAS Table") {
|
||||
withTempDir { tmp =>
|
||||
val tableName = generateTableName
|
||||
spark.sql(
|
||||
s"""
|
||||
|create table $tableName using hudi
|
||||
|options(primaryKey = 'id')
|
||||
|location '${tmp.getCanonicalPath}'
|
||||
|as
|
||||
|select 1 as id, 'a1' as name
|
||||
|""".stripMargin
|
||||
)
|
||||
val metaClient = HoodieTableMetaClient.builder()
|
||||
.setBasePath(tmp.getCanonicalPath)
|
||||
.setConf(spark.sessionState.newHadoopConf())
|
||||
.build()
|
||||
// check record key in hoodie.properties
|
||||
assertResult("id")(metaClient.getTableConfig.getRecordKeyFields.get().mkString(","))
|
||||
|
||||
spark.sql(
|
||||
s"""
|
||||
|merge into $tableName h0
|
||||
|using (
|
||||
| select 1 as s_id, 'a1_1' as name
|
||||
|) s0
|
||||
|on h0.id = s0.s_id
|
||||
|when matched then update set *
|
||||
|""".stripMargin
|
||||
)
|
||||
checkAnswer(s"select id, name from $tableName")(
|
||||
Seq(1, "a1_1")
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user