[HUDI-4404] Fix insert into dynamic partition write misalignment (#6124)
This commit is contained in:
@@ -20,13 +20,14 @@ package org.apache.spark.sql.hudi.command
|
||||
import org.apache.hudi.HoodieSparkSqlWriter
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal}
|
||||
import org.apache.spark.sql.catalyst.plans.QueryPlan
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
|
||||
import org.apache.spark.sql.execution.datasources.LogicalRelation
|
||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
|
||||
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types.{StructField, StructType}
|
||||
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
|
||||
|
||||
/**
|
||||
@@ -120,50 +121,45 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
|
||||
|
||||
val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get)
|
||||
assert(staticPartitionValues.isEmpty ||
|
||||
staticPartitionValues.size == targetPartitionSchema.size,
|
||||
s"Required partition columns is: ${targetPartitionSchema.json}, Current static partitions " +
|
||||
insertPartitions.size == targetPartitionSchema.size,
|
||||
s"Required partition columns is: ${targetPartitionSchema.json}, Current input partitions " +
|
||||
s"is: ${staticPartitionValues.mkString("," + "")}")
|
||||
|
||||
val queryOutputWithoutMetaFields = removeMetaFields(query.output)
|
||||
assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
|
||||
== hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
|
||||
== hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
|
||||
s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
|
||||
s"Current select columns(including static partition column) count: " +
|
||||
s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size},columns: " +
|
||||
s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
|
||||
|
||||
val queryDataFieldsWithoutMetaFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
|
||||
queryOutputWithoutMetaFields.dropRight(targetPartitionSchema.fields.length)
|
||||
} else { // insert static partition
|
||||
queryOutputWithoutMetaFields
|
||||
}
|
||||
// Align for the data fields of the query
|
||||
val dataProjectsWithoutMetaFields = queryDataFieldsWithoutMetaFields.zip(
|
||||
hoodieCatalogTable.dataSchemaWithoutMetaFields.fields).map { case (dataAttr, targetField) =>
|
||||
val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
|
||||
targetField.dataType, conf)
|
||||
Alias(castAttr, targetField.name)()
|
||||
}
|
||||
val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
|
||||
hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)))
|
||||
val dataProjectsWithoutMetaFields = getTableFieldsAlias(queryOutputWithoutMetaFields,
|
||||
dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
|
||||
|
||||
val partitionProjects = if (staticPartitionValues.isEmpty) { // insert dynamic partitions
|
||||
// The partition attributes is followed the data attributes in the query
|
||||
// So we init the partitionAttrPosition with the data schema size.
|
||||
var partitionAttrPosition = hoodieCatalogTable.dataSchemaWithoutMetaFields.size
|
||||
targetPartitionSchema.fields.map(f => {
|
||||
val partitionAttr = queryOutputWithoutMetaFields(partitionAttrPosition)
|
||||
partitionAttrPosition = partitionAttrPosition + 1
|
||||
val castAttr = castIfNeeded(partitionAttr.withNullability(f.nullable), f.dataType, conf)
|
||||
Alias(castAttr, f.name)()
|
||||
})
|
||||
} else { // insert static partitions
|
||||
targetPartitionSchema.fields.map(f => {
|
||||
val partitionProjects = targetPartitionSchema.fields.filter(f => staticPartitionValues.contains(f.name))
|
||||
.map(f => {
|
||||
val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
|
||||
s"Missing static partition value for: ${f.name}")
|
||||
s"Missing static partition value for: ${f.name}")
|
||||
val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf)
|
||||
Alias(castAttr, f.name)()
|
||||
})
|
||||
|
||||
Project(dataProjectsWithoutMetaFields ++ partitionProjects, query)
|
||||
}
|
||||
|
||||
private def getTableFieldsAlias(
|
||||
queryOutputWithoutMetaFields: Seq[Attribute],
|
||||
schemaWithoutMetaFields: Seq[StructField],
|
||||
conf: SQLConf): Seq[Alias] = {
|
||||
queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case (dataAttr, dataField) =>
|
||||
val targetFieldOption = if (dataAttr.name.startsWith("col")) None else
|
||||
schemaWithoutMetaFields.find(_.name.equals(dataAttr.name))
|
||||
val targetField = if (targetFieldOption.isDefined) targetFieldOption.get else dataField
|
||||
val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable),
|
||||
targetField.dataType, conf)
|
||||
Alias(castAttr, targetField.name)()
|
||||
}
|
||||
val alignedProjects = dataProjectsWithoutMetaFields ++ partitionProjects
|
||||
Project(alignedProjects, query)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user