1
0

[HUDI-4346] Fix params not update BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED (#5999)

This commit is contained in:
RexAn
2022-06-30 10:26:00 +08:00
committed by GitHub
parent 6a01f7029c
commit cdaaa3c4c7

View File

@@ -520,7 +520,8 @@ object HoodieSparkSqlWriter {
if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
}
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString)
val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)
params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
@@ -535,7 +536,7 @@ object HoodieSparkSqlWriter {
new NonSortPartitionerWithRows()
}
val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted()
parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString)
params(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED) = arePartitionRecordsSorted.toString
val isGlobalIndex = if (populateMetaFields) {
SparkHoodieIndexFactory.isGlobalIndex(writeConfig)
} else {