[HUDI-1888] Fix NPE when the nested partition path field has null value (#2957)
This commit is contained in:
@@ -123,6 +123,21 @@ public class RowKeyGeneratorHelper {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch the field value located at the positions requested for.
|
* Fetch the field value located at the positions requested for.
|
||||||
|
*
|
||||||
|
* The fetching logic recursively goes into the nested field based on the position list to get the field value.
|
||||||
|
* For example, given the row [4357686,key1,2020-03-21,pi,[val1,10]] with the following schema, which has the fourth
|
||||||
|
* field as a nested field, and positions list as [4,0],
|
||||||
|
*
|
||||||
|
* 0 = "StructField(timestamp,LongType,false)"
|
||||||
|
* 1 = "StructField(_row_key,StringType,false)"
|
||||||
|
* 2 = "StructField(ts_ms,StringType,false)"
|
||||||
|
* 3 = "StructField(pii_col,StringType,false)"
|
||||||
|
* 4 = "StructField(nested_col,StructType(StructField(prop1,StringType,false), StructField(prop2,LongType,false)),false)"
|
||||||
|
*
|
||||||
|
* the logic fetches the value from field nested_col.prop1.
|
||||||
|
* If any level of the nested field is null, {@link NULL_RECORDKEY_PLACEHOLDER} is returned.
|
||||||
|
* If the field value is an empty String, {@link EMPTY_RECORDKEY_PLACEHOLDER} is returned.
|
||||||
|
*
|
||||||
* @param row instance of {@link Row} of interest
|
* @param row instance of {@link Row} of interest
|
||||||
* @param positions tree style positions where the leaf node need to be fetched and returned
|
* @param positions tree style positions where the leaf node need to be fetched and returned
|
||||||
* @return the field value as per the positions requested for.
|
* @return the field value as per the positions requested for.
|
||||||
@@ -137,14 +152,15 @@ public class RowKeyGeneratorHelper {
|
|||||||
Object toReturn = null;
|
Object toReturn = null;
|
||||||
|
|
||||||
while (index < totalCount) {
|
while (index < totalCount) {
|
||||||
|
if (valueToProcess.isNullAt(positions.get(index))) {
|
||||||
|
toReturn = NULL_RECORDKEY_PLACEHOLDER;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (index < totalCount - 1) {
|
if (index < totalCount - 1) {
|
||||||
if (valueToProcess.isNullAt(positions.get(index))) {
|
|
||||||
toReturn = NULL_RECORDKEY_PLACEHOLDER;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
valueToProcess = (Row) valueToProcess.get(positions.get(index));
|
valueToProcess = (Row) valueToProcess.get(positions.get(index));
|
||||||
} else { // last index
|
} else { // last index
|
||||||
if (null != valueToProcess.getAs(positions.get(index)) && valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
|
if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
|
||||||
toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
|
toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,9 +28,14 @@ import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
|
|||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
|
||||||
|
|
||||||
public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
|
public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
|
||||||
|
|
||||||
private TypedProperties getCommonProps() {
|
private TypedProperties getCommonProps() {
|
||||||
TypedProperties properties = new TypedProperties();
|
TypedProperties properties = new TypedProperties();
|
||||||
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
|
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
|
||||||
@@ -75,6 +80,12 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
|
|||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TypedProperties getPropsWithNestedPartitionPathField() {
|
||||||
|
TypedProperties properties = getCommonProps();
|
||||||
|
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "nested_col.prop1");
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNullPartitionPathFields() {
|
public void testNullPartitionPathFields() {
|
||||||
Assertions.assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutPartitionPathProp()));
|
Assertions.assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutPartitionPathProp()));
|
||||||
@@ -121,4 +132,28 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
|
|||||||
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686");
|
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Stream<GenericRecord> nestedColTestRecords() {
|
||||||
|
return Stream.of(null, getNestedColRecord(null, 10L),
|
||||||
|
getNestedColRecord("", 10L), getNestedColRecord("val1", 10L));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource("nestedColTestRecords")
|
||||||
|
public void testNestedPartitionPathField(GenericRecord nestedColRecord) {
|
||||||
|
SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getPropsWithNestedPartitionPathField());
|
||||||
|
GenericRecord record = getRecord(nestedColRecord);
|
||||||
|
String partitionPathFieldValue = null;
|
||||||
|
if (nestedColRecord != null) {
|
||||||
|
partitionPathFieldValue = (String) nestedColRecord.get("prop1");
|
||||||
|
}
|
||||||
|
String expectedPartitionPath = "nested_col.prop1="
|
||||||
|
+ (partitionPathFieldValue != null && !partitionPathFieldValue.isEmpty() ? partitionPathFieldValue : DEFAULT_PARTITION_PATH);
|
||||||
|
HoodieKey key = keyGenerator.getKey(record);
|
||||||
|
Assertions.assertEquals("key1", key.getRecordKey());
|
||||||
|
Assertions.assertEquals(expectedPartitionPath, key.getPartitionPath());
|
||||||
|
|
||||||
|
Row row = KeyGeneratorTestUtilities.getRow(record);
|
||||||
|
Assertions.assertEquals("key1", keyGenerator.getRecordKey(row));
|
||||||
|
Assertions.assertEquals(expectedPartitionPath, keyGenerator.getPartitionPath(row));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,22 +32,39 @@ import scala.Function1;
|
|||||||
|
|
||||||
public class KeyGeneratorTestUtilities {
|
public class KeyGeneratorTestUtilities {
|
||||||
|
|
||||||
public static String exampleSchema = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
|
public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", \"name\":\"nested_col\",\"fields\": ["
|
||||||
|
+ "{\"name\": \"prop1\",\"type\": \"string\"},{\"name\": \"prop2\", \"type\": \"long\"}]}";
|
||||||
|
public static final String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
|
||||||
+ "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
+ "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"ts_ms\", \"type\": \"string\"},"
|
+ "{\"name\": \"ts_ms\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]}";
|
+ "{\"name\": \"pii_col\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"nested_col\",\"type\": "
|
||||||
|
+ NESTED_COL_SCHEMA + "}"
|
||||||
|
+ "]}";
|
||||||
|
|
||||||
public static final String TEST_STRUCTNAME = "test_struct_name";
|
public static final String TEST_STRUCTNAME = "test_struct_name";
|
||||||
public static final String TEST_RECORD_NAMESPACE = "test_record_namespace";
|
public static final String TEST_RECORD_NAMESPACE = "test_record_namespace";
|
||||||
public static Schema schema = new Schema.Parser().parse(exampleSchema);
|
public static Schema schema = new Schema.Parser().parse(EXAMPLE_SCHEMA);
|
||||||
public static StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
|
public static StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
|
||||||
|
|
||||||
public GenericRecord getRecord() {
|
public static GenericRecord getRecord() {
|
||||||
GenericRecord record = new GenericData.Record(new Schema.Parser().parse(exampleSchema));
|
return getRecord(getNestedColRecord("val1", 10L));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static GenericRecord getNestedColRecord(String prop1Value, Long prop2Value) {
|
||||||
|
GenericRecord nestedColRecord = new GenericData.Record(new Schema.Parser().parse(NESTED_COL_SCHEMA));
|
||||||
|
nestedColRecord.put("prop1", prop1Value);
|
||||||
|
nestedColRecord.put("prop2", prop2Value);
|
||||||
|
return nestedColRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static GenericRecord getRecord(GenericRecord nestedColRecord) {
|
||||||
|
GenericRecord record = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
record.put("timestamp", 4357686);
|
record.put("timestamp", 4357686);
|
||||||
record.put("_row_key", "key1");
|
record.put("_row_key", "key1");
|
||||||
record.put("ts_ms", "2020-03-21");
|
record.put("ts_ms", "2020-03-21");
|
||||||
record.put("pii_col", "pi");
|
record.put("pii_col", "pi");
|
||||||
|
record.put("nested_col", nestedColRecord);
|
||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user