diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java index dd0d4c5c5..6b059db9d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java @@ -123,6 +123,21 @@ public class RowKeyGeneratorHelper { /** * Fetch the field value located at the positions requested for. + * + * The fetching logic recursively goes into the nested field based on the position list to get the field value. + * For example, given the row [4357686,key1,2020-03-21,pi,[val1,10]] with the following schema, which has the fourth + * field as a nested field, and positions list as [4,0], + * + * 0 = "StructField(timestamp,LongType,false)" + * 1 = "StructField(_row_key,StringType,false)" + * 2 = "StructField(ts_ms,StringType,false)" + * 3 = "StructField(pii_col,StringType,false)" + * 4 = "StructField(nested_col,StructType(StructField(prop1,StringType,false), StructField(prop2,LongType,false)),false)" + * + * the logic fetches the value from field nested_col.prop1. + * If any level of the nested field is null, {@link NULL_RECORDKEY_PLACEHOLDER} is returned. + * If the field value is an empty String, {@link EMPTY_RECORDKEY_PLACEHOLDER} is returned. + * * @param row instance of {@link Row} of interest * @param positions tree style positions where the leaf node need to be fetched and returned * @return the field value as per the positions requested for. @@ -137,14 +152,15 @@ public class RowKeyGeneratorHelper { Object toReturn = null; while (index < totalCount) { + if (valueToProcess.isNullAt(positions.get(index))) { + toReturn = NULL_RECORDKEY_PLACEHOLDER; + break; + } + if (index < totalCount - 1) { - if (valueToProcess.isNullAt(positions.get(index))) { - toReturn = NULL_RECORDKEY_PLACEHOLDER; - break; - } valueToProcess = (Row) valueToProcess.get(positions.get(index)); } else { // last index - if (null != valueToProcess.getAs(positions.get(index)) && valueToProcess.getAs(positions.get(index)).toString().isEmpty()) { + if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) { toReturn = EMPTY_RECORDKEY_PLACEHOLDER; break; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java index 80b85d8ee..161b7c2ff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java @@ -28,9 +28,14 @@ import org.apache.hudi.testutils.KeyGeneratorTestUtilities; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH; public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities { - private TypedProperties getCommonProps() { TypedProperties properties = new TypedProperties(); properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); @@ -75,6 +80,12 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities { return properties; } + private TypedProperties getPropsWithNestedPartitionPathField() { + TypedProperties properties = getCommonProps(); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "nested_col.prop1"); + return properties; + } + @Test public void testNullPartitionPathFields() { Assertions.assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutPartitionPathProp())); @@ -121,4 +132,28 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities { Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686"); } + private static Stream nestedColTestRecords() { + return Stream.of(null, getNestedColRecord(null, 10L), + getNestedColRecord("", 10L), getNestedColRecord("val1", 10L)); + } + + @ParameterizedTest + @MethodSource("nestedColTestRecords") + public void testNestedPartitionPathField(GenericRecord nestedColRecord) { + SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getPropsWithNestedPartitionPathField()); + GenericRecord record = getRecord(nestedColRecord); + String partitionPathFieldValue = null; + if (nestedColRecord != null) { + partitionPathFieldValue = (String) nestedColRecord.get("prop1"); + } + String expectedPartitionPath = "nested_col.prop1=" + + (partitionPathFieldValue != null && !partitionPathFieldValue.isEmpty() ? partitionPathFieldValue : DEFAULT_PARTITION_PATH); + HoodieKey key = keyGenerator.getKey(record); + Assertions.assertEquals("key1", key.getRecordKey()); + Assertions.assertEquals(expectedPartitionPath, key.getPartitionPath()); + + Row row = KeyGeneratorTestUtilities.getRow(record); + Assertions.assertEquals("key1", keyGenerator.getRecordKey(row)); + Assertions.assertEquals(expectedPartitionPath, keyGenerator.getPartitionPath(row)); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java index 53b2abfd9..40f7c9e2d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java @@ -32,22 +32,39 @@ import scala.Function1; public class KeyGeneratorTestUtilities { - public static String exampleSchema = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", \"name\":\"nested_col\",\"fields\": [" + + "{\"name\": \"prop1\",\"type\": \"string\"},{\"name\": \"prop2\", \"type\": \"long\"}]}"; + public static final String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"ts_ms\", \"type\": \"string\"}," - + "{\"name\": \"pii_col\", \"type\": \"string\"}]}"; + + "{\"name\": \"pii_col\", \"type\": \"string\"}," + + "{\"name\": \"nested_col\",\"type\": " + + NESTED_COL_SCHEMA + "}" + + "]}"; public static final String TEST_STRUCTNAME = "test_struct_name"; public static final String TEST_RECORD_NAMESPACE = "test_record_namespace"; - public static Schema schema = new Schema.Parser().parse(exampleSchema); + public static Schema schema = new Schema.Parser().parse(EXAMPLE_SCHEMA); public static StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(schema); - public GenericRecord getRecord() { - GenericRecord record = new GenericData.Record(new Schema.Parser().parse(exampleSchema)); + public static GenericRecord getRecord() { + return getRecord(getNestedColRecord("val1", 10L)); + } + + public static GenericRecord getNestedColRecord(String prop1Value, Long prop2Value) { + GenericRecord nestedColRecord = new GenericData.Record(new Schema.Parser().parse(NESTED_COL_SCHEMA)); + nestedColRecord.put("prop1", prop1Value); + nestedColRecord.put("prop2", prop2Value); + return nestedColRecord; + } + + public static GenericRecord getRecord(GenericRecord nestedColRecord) { + GenericRecord record = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); record.put("timestamp", 4357686); record.put("_row_key", "key1"); record.put("ts_ms", "2020-03-21"); record.put("pii_col", "pi"); + record.put("nested_col", nestedColRecord); return record; }