1
0

[HUDI-2558] Fixing Clustering w/ sort columns with null values fails (#4404)

This commit is contained in:
harshal
2022-01-03 12:19:43 +05:30
committed by GitHub
parent 0273f2e65d
commit 2b2ae34cb9
2 changed files with 33 additions and 1 deletions

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
@@ -55,8 +56,17 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
final String[] sortColumns = this.sortColumnNames;
final SerializableSchema schema = this.serializableSchema;
return records.sortBy(
record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema),
record -> {
Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema);
// null values are replaced with empty string for null_first order
if (recordValue == null) {
return StringUtils.EMPTY_STRING;
} else {
return StringUtils.objToString(record);
}
},
true, outputSparkPartitions);
}
@Override

View File

@@ -715,6 +715,28 @@ class TestMORDataSource extends HoodieClientTestBase {
assertEquals(true, fs.listStatus(tempPath).isEmpty)
}
@Test
def testClusteringOnNullableColumn(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
.withColumn("cluster_id", when(expr("end_lon < 0.2 "), lit(null).cast("string"))
.otherwise(col("_row_key")))
.withColumn("struct_cluster_col", when(expr("end_lon < 0.1"), lit(null))
.otherwise(struct(col("cluster_id"), col("_row_key"))))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
// option for clustering
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "1")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
.option("hoodie.clustering.plan.strategy.sort.columns", "struct_cluster_col")
.mode(SaveMode.Overwrite)
.save(basePath)
}
@Test
def testHoodieIsDeletedMOR(): Unit = {
val numRecords = 100