1
0

[HUDI-4494] keep the fields' order when data is written out of order (#6233)

This commit is contained in:
Yann Byron
2022-07-28 22:15:01 +08:00
committed by GitHub
parent 07eedd3ef6
commit ea1fbc71ec
2 changed files with 25 additions and 6 deletions

View File

@@ -154,12 +154,15 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
schemaWithoutMetaFields: Seq[StructField], schemaWithoutMetaFields: Seq[StructField],
conf: SQLConf): Seq[Alias] = { conf: SQLConf): Seq[Alias] = {
queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case (dataAttr, dataField) => queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case (dataAttr, dataField) =>
val targetFieldOption = if (dataAttr.name.startsWith("col")) None else val targetAttrOption = if (dataAttr.name.startsWith("col")) {
schemaWithoutMetaFields.find(_.name.equals(dataAttr.name)) None
val targetField = if (targetFieldOption.isDefined) targetFieldOption.get else dataField } else {
val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable), queryOutputWithoutMetaFields.find(_.name.equals(dataField.name))
targetField.dataType, conf) }
Alias(castAttr, targetField.name)() val targetAttr = targetAttrOption.getOrElse(dataAttr)
val castAttr = castIfNeeded(targetAttr.withNullability(dataField.nullable),
dataField.dataType, conf)
Alias(castAttr, dataField.name)()
} }
} }
} }

View File

@@ -93,6 +93,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| insert into $tableName partition(dt = '2021-01-06') | insert into $tableName partition(dt = '2021-01-06')
| select 20 as price, 2000 as ts, 2 as id, 'a2' as name | select 20 as price, 2000 as ts, 2 as id, 'a2' as name
""".stripMargin) """.stripMargin)
// should not mess with the original order after write the out-of-order data.
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tmp.getCanonicalPath)
.setConf(spark.sessionState.newHadoopConf())
.build()
val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
assert(schema.getFieldIndex("id").contains(0))
assert(schema.getFieldIndex("price").contains(2))
// Note: Do not write the field alias, the partition field must be placed last. // Note: Do not write the field alias, the partition field must be placed last.
spark.sql( spark.sql(
@@ -133,6 +141,14 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| insert into $tableName partition(dt) | insert into $tableName partition(dt)
| select 1 as id, '2021-01-05' as dt, 'a1' as name, 10 as price, 1000 as ts | select 1 as id, '2021-01-05' as dt, 'a1' as name, 10 as price, 1000 as ts
""".stripMargin) """.stripMargin)
// should not mess with the original order after write the out-of-order data.
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tmp.getCanonicalPath)
.setConf(spark.sessionState.newHadoopConf())
.build()
val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
assert(schema.getFieldIndex("id").contains(0))
assert(schema.getFieldIndex("price").contains(2))
spark.sql( spark.sql(
s""" s"""