diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java index ce3cd6f09..4db7eb26e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java @@ -68,8 +68,8 @@ public class HoodieRowCreateHandle implements Serializable { private final HoodieTimer currTimer; public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId, - String instantTime, int taskPartitionId, long taskId, long taskEpochId, - StructType structType) { + String instantTime, int taskPartitionId, long taskId, long taskEpochId, + StructType structType) { this.partitionPath = partitionPath; this.table = table; this.writeConfig = writeConfig; @@ -107,16 +107,15 @@ public class HoodieRowCreateHandle implements Serializable { /** * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter. + * * @param record instance of {@link InternalRow} that needs to be written to the fileWriter. * @throws IOException */ public void write(InternalRow record) throws IOException { try { - String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get( - HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString(); - String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement()); - String recordKey = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get( - HoodieRecord.RECORD_KEY_METADATA_FIELD)).toString(); + final String partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS)); + final String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement()); + final String recordKey = String.valueOf(record.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_POS)); HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, seqId, recordKey, partitionPath, path.getName(), record); try { @@ -141,6 +140,7 @@ public class HoodieRowCreateHandle implements Serializable { /** * Closes the {@link HoodieRowCreateHandle} and returns an instance of {@link HoodieInternalWriteStatus} containing the stats and * status of the writes to this handle. + * * @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle. * @throws IOException */ diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java index fe03f60ee..0642a85c5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java @@ -18,25 +18,25 @@ package org.apache.hudi.keygen; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.PublicAPIMethod; -import org.apache.hudi.client.utils.SparkRowSerDe; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructType; -import scala.Function1; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import scala.Function1; /** * Base class for the built-in key generators. Contains methods structured for @@ -46,13 +46,12 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp private static final String STRUCT_NAME = "hoodieRowTopLevelField"; private static final String NAMESPACE = "hoodieRow"; - private transient Function1 converterFn = null; - private SparkRowSerDe sparkRowSerDe; + private Function1 converterFn = null; + private final AtomicBoolean validatePartitionFields = new AtomicBoolean(false); protected StructType structType; - protected Map> recordKeyPositions = new HashMap<>(); - protected Map> partitionPathPositions = new HashMap<>(); - protected Map> partitionPathDataTypes = null; + protected Map, DataType>> recordKeySchemaInfo = new HashMap<>(); + protected Map, DataType>> partitionPathSchemaInfo = new HashMap<>(); protected BuiltinKeyGenerator(TypedProperties config) { super(config); @@ -60,6 +59,7 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp /** * Fetch record key from {@link Row}. + * * @param row instance of {@link Row} from which record key is requested. * @return the record key of interest from {@link Row}. */ @@ -74,6 +74,7 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp /** * Fetch partition path from {@link Row}. + * * @param row instance of {@link Row} from which partition path is requested * @return the partition path of interest from {@link Row}. */ @@ -97,87 +98,41 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public String getPartitionPath(InternalRow internalRow, StructType structType) { try { - initDeserializer(structType); - Row row = sparkRowSerDe.deserializeRow(internalRow); - return getPartitionPath(row); + buildFieldSchemaInfoIfNeeded(structType); + return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(), + hiveStylePartitioning, partitionPathSchemaInfo); } catch (Exception e) { throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e); } } - private void initDeserializer(StructType structType) { - if (sparkRowSerDe == null) { - sparkRowSerDe = HoodieSparkUtils.getDeserializer(structType); - } - } - - void buildFieldPositionMapIfNeeded(StructType structType) { + void buildFieldSchemaInfoIfNeeded(StructType structType) { if (this.structType == null) { - // parse simple fields - getRecordKeyFields().stream() - .filter(f -> !(f.contains("."))) - .forEach(f -> { - if (structType.getFieldIndex(f).isDefined()) { - recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))); - } else { - throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\""); - } - }); - // parse nested fields - getRecordKeyFields().stream() - .filter(f -> f.contains(".")) - .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true))); - // parse simple fields + getRecordKeyFields() + .stream().filter(f -> !f.isEmpty()) + .forEach(f -> recordKeySchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, true))); if (getPartitionPathFields() != null) { - getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains("."))) - .forEach(f -> { - if (structType.getFieldIndex(f).isDefined()) { - partitionPathPositions.put(f, - Collections.singletonList((Integer) (structType.getFieldIndex(f).get()))); - } else { - partitionPathPositions.put(f, Collections.singletonList(-1)); - } - }); - // parse nested fields - getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains(".")) - .forEach(f -> partitionPathPositions.put(f, - RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false))); + getPartitionPathFields().stream().filter(f -> !f.isEmpty()) + .forEach(f -> partitionPathSchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, false))); } this.structType = structType; } } protected String getPartitionPathInternal(InternalRow row, StructType structType) { - buildFieldDataTypesMapIfNeeded(structType); + buildFieldSchemaInfoIfNeeded(structType); validatePartitionFieldsForInternalRow(); return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(row, getPartitionPathFields(), - hiveStylePartitioning, partitionPathPositions, partitionPathDataTypes); + hiveStylePartitioning, partitionPathSchemaInfo); } protected void validatePartitionFieldsForInternalRow() { - partitionPathPositions.entrySet().forEach(entry -> { - if (entry.getValue().size() > 1) { - throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns"); - } - }); - } - - void buildFieldDataTypesMapIfNeeded(StructType structType) { - buildFieldPositionMapIfNeeded(structType); - if (this.partitionPathDataTypes == null) { - this.partitionPathDataTypes = new HashMap<>(); - if (getPartitionPathFields() != null) { - // populating simple fields are good enough - getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains("."))) - .forEach(f -> { - if (structType.getFieldIndex(f).isDefined()) { - partitionPathDataTypes.put(f, - Collections.singletonList((structType.fields()[structType.fieldIndex(f)].dataType()))); - } else { - partitionPathDataTypes.put(f, Collections.singletonList(null)); - } - }); - } + if (!validatePartitionFields.getAndSet(true)) { + partitionPathSchemaInfo.values().forEach(entry -> { + if (entry.getKey().size() > 1) { + throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns"); + } + }); } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java index 2e2167f93..9ba3fb876 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java @@ -60,15 +60,15 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator { @Override public String getRecordKey(Row row) { - buildFieldPositionMapIfNeeded(row.schema()); - return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, true); + buildFieldSchemaInfoIfNeeded(row.schema()); + return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true); } @Override public String getPartitionPath(Row row) { - buildFieldPositionMapIfNeeded(row.schema()); + buildFieldSchemaInfoIfNeeded(row.schema()); return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(), - hiveStylePartitioning, partitionPathPositions); + hiveStylePartitioning, partitionPathSchemaInfo); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java index 391ea2c87..77eec748c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java @@ -60,8 +60,8 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator { @Override public String getRecordKey(Row row) { - buildFieldPositionMapIfNeeded(row.schema()); - return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, true); + buildFieldSchemaInfoIfNeeded(row.schema()); + return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java index 032c750f0..dc8b253b0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java @@ -61,6 +61,12 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator { return nonpartitionedAvroKeyGenerator.getPartitionPathFields(); } + @Override + public String getRecordKey(Row row) { + buildFieldSchemaInfoIfNeeded(row.schema()); + return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false); + } + @Override public String getPartitionPath(Row row) { return nonpartitionedAvroKeyGenerator.getEmptyPartition(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java index 6a28fbe95..c0e10e6f9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.keygen; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieKeyException; import org.apache.spark.sql.Row; @@ -52,17 +53,18 @@ public class RowKeyGeneratorHelper { /** * Generates record key for the corresponding {@link Row}. - * @param row instance of {@link Row} of interest - * @param recordKeyFields record key fields as a list + * + * @param row instance of {@link Row} of interest + * @param recordKeyFields record key fields as a list * @param recordKeyPositions record key positions for the corresponding record keys in {@code recordKeyFields} - * @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise. + * @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise. * @return the record key thus generated */ - public static String getRecordKeyFromRow(Row row, List recordKeyFields, Map> recordKeyPositions, boolean prefixFieldName) { + public static String getRecordKeyFromRow(Row row, List recordKeyFields, Map, DataType>> recordKeyPositions, boolean prefixFieldName) { AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true); String toReturn = recordKeyFields.stream().map(field -> { String val = null; - List fieldPositions = recordKeyPositions.get(field); + List fieldPositions = recordKeyPositions.get(field).getKey(); if (fieldPositions.size() == 1) { // simple field Integer fieldPos = fieldPositions.get(0); if (row.isNullAt(fieldPos)) { @@ -76,7 +78,7 @@ public class RowKeyGeneratorHelper { } } } else { // nested fields - val = getNestedFieldVal(row, recordKeyPositions.get(field)).toString(); + val = getNestedFieldVal(row, recordKeyPositions.get(field).getKey()).toString(); if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) { keyIsNullOrEmpty.set(false); } @@ -91,17 +93,18 @@ public class RowKeyGeneratorHelper { /** * Generates partition path for the corresponding {@link Row}. - * @param row instance of {@link Row} of interest - * @param partitionPathFields partition path fields as a list - * @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise + * + * @param row instance of {@link Row} of interest + * @param partitionPathFields partition path fields as a list + * @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise * @param partitionPathPositions partition path positions for the corresponding fields in {@code partitionPathFields} * @return the generated partition path for the row */ - public static String getPartitionPathFromRow(Row row, List partitionPathFields, boolean hiveStylePartitioning, Map> partitionPathPositions) { + public static String getPartitionPathFromRow(Row row, List partitionPathFields, boolean hiveStylePartitioning, Map, DataType>> partitionPathPositions) { return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> { String field = partitionPathFields.get(idx); String val = null; - List fieldPositions = partitionPathPositions.get(field); + List fieldPositions = partitionPathPositions.get(field).getKey(); if (fieldPositions.size() == 1) { // simple Integer fieldPos = fieldPositions.get(0); // for partition path, if field is not found, index will be set to -1 @@ -118,7 +121,7 @@ public class RowKeyGeneratorHelper { val = field + "=" + val; } } else { // nested - Object data = getNestedFieldVal(row, partitionPathPositions.get(field)); + Object data = getNestedFieldVal(row, partitionPathPositions.get(field).getKey()); data = convertToTimestampIfInstant(data); if (data.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || data.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) { val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH; @@ -130,20 +133,20 @@ public class RowKeyGeneratorHelper { }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR)); } - public static String getPartitionPathFromInternalRow(InternalRow row, List partitionPathFields, boolean hiveStylePartitioning, - Map> partitionPathPositions, - Map> partitionPathDataTypes) { + public static String getPartitionPathFromInternalRow(InternalRow internalRow, List partitionPathFields, boolean hiveStylePartitioning, + Map, DataType>> partitionPathPositions) { return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> { String field = partitionPathFields.get(idx); String val = null; - List fieldPositions = partitionPathPositions.get(field); + List fieldPositions = partitionPathPositions.get(field).getKey(); + DataType dataType = partitionPathPositions.get(field).getValue(); if (fieldPositions.size() == 1) { // simple Integer fieldPos = fieldPositions.get(0); // for partition path, if field is not found, index will be set to -1 - if (fieldPos == -1 || row.isNullAt(fieldPos)) { + if (fieldPos == -1 || internalRow.isNullAt(fieldPos)) { val = HUDI_DEFAULT_PARTITION_PATH; } else { - Object value = row.get(fieldPos, partitionPathDataTypes.get(field).get(0)); + Object value = internalRow.get(fieldPos, dataType); if (value == null || value.toString().isEmpty()) { val = HUDI_DEFAULT_PARTITION_PATH; } else { @@ -180,22 +183,22 @@ public class RowKeyGeneratorHelper { /** * Fetch the field value located at the positions requested for. - * + *

* The fetching logic recursively goes into the nested field based on the position list to get the field value. * For example, given the row [4357686,key1,2020-03-21,pi,[val1,10]] with the following schema, which has the fourth * field as a nested field, and positions list as [4,0], - * + *

* 0 = "StructField(timestamp,LongType,false)" * 1 = "StructField(_row_key,StringType,false)" * 2 = "StructField(ts_ms,StringType,false)" * 3 = "StructField(pii_col,StringType,false)" * 4 = "StructField(nested_col,StructType(StructField(prop1,StringType,false), StructField(prop2,LongType,false)),false)" - * + *

* the logic fetches the value from field nested_col.prop1. * If any level of the nested field is null, {@link KeyGenUtils#NULL_RECORDKEY_PLACEHOLDER} is returned. * If the field value is an empty String, {@link KeyGenUtils#EMPTY_RECORDKEY_PLACEHOLDER} is returned. * - * @param row instance of {@link Row} of interest + * @param row instance of {@link Row} of interest * @param positions tree style positions where the leaf node need to be fetched and returned * @return the field value as per the positions requested for. */ @@ -234,13 +237,14 @@ public class RowKeyGeneratorHelper { * @param structType schema of interest * @param field field of interest for which the positions are requested for * @param isRecordKey {@code true} if the field requested for is a record key. {@code false} in case of a partition path. - * @return the positions of the field as per the struct type. + * @return the positions of the field as per the struct type and the leaf field's datatype. */ - public static List getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) { + public static Pair, DataType> getFieldSchemaInfo(StructType structType, String field, boolean isRecordKey) { String[] slices = field.split("\\."); List positions = new ArrayList<>(); int index = 0; int totalCount = slices.length; + DataType leafFieldDataType = null; while (index < totalCount) { String slice = slices[index]; Option curIndexOpt = structType.getFieldIndex(slice); @@ -258,6 +262,9 @@ public class RowKeyGeneratorHelper { } } structType = (StructType) nestedField.dataType(); + } else { + // leaf node. + leafFieldDataType = nestedField.dataType(); } } else { if (isRecordKey) { @@ -269,7 +276,7 @@ public class RowKeyGeneratorHelper { } index++; } - return positions; + return Pair.of(positions, leafFieldDataType); } private static Object convertToTimestampIfInstant(Object data) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index b84a8abdc..2f139a61e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -65,15 +65,15 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator { @Override public String getRecordKey(Row row) { - buildFieldPositionMapIfNeeded(row.schema()); - return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false); + buildFieldSchemaInfoIfNeeded(row.schema()); + return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false); } @Override public String getPartitionPath(Row row) { - buildFieldPositionMapIfNeeded(row.schema()); + buildFieldSchemaInfoIfNeeded(row.schema()); return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(), - hiveStylePartitioning, partitionPathPositions); + hiveStylePartitioning, partitionPathSchemaInfo); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java index e3a5a3310..004753f24 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java @@ -29,8 +29,8 @@ import org.apache.spark.sql.types.StructType; import java.io.IOException; -import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH; import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER; +import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH; import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER; /** @@ -61,24 +61,24 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { @Override public String getRecordKey(Row row) { - buildFieldPositionMapIfNeeded(row.schema()); - return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false); + buildFieldSchemaInfoIfNeeded(row.schema()); + return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false); } @Override public String getPartitionPath(Row row) { - buildFieldPositionMapIfNeeded(row.schema()); - Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0))); + buildFieldSchemaInfoIfNeeded(row.schema()); + Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getKey()); return getTimestampBasedPartitionPath(partitionPathFieldVal); } @Override public String getPartitionPath(InternalRow internalRow, StructType structType) { - buildFieldDataTypesMapIfNeeded(structType); + buildFieldSchemaInfoIfNeeded(structType); validatePartitionFieldsForInternalRow(); Object partitionPathFieldVal = RowKeyGeneratorHelper.getFieldValFromInternalRow(internalRow, - partitionPathPositions.get(getPartitionPathFields().get(0)).get(0), - partitionPathDataTypes.get(getPartitionPathFields().get(0)).get(0)); + partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getKey().get(0), + partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getValue()); return getTimestampBasedPartitionPath(partitionPathFieldVal); } diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala index d4b89e1c9..cd55e381e 100644 --- a/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala +++ b/hudi-client/hudi-spark-client/src/test/scala/org/apache/hudi/keygen/TestRowGeneratorHelper.scala @@ -19,11 +19,9 @@ package org.apache.hudi.keygen import java.sql.Timestamp - import org.apache.spark.sql.Row - import org.apache.hudi.keygen.RowKeyGeneratorHelper._ - +import org.apache.spark.sql.types.{DataType, DataTypes} import org.junit.jupiter.api.{Assertions, Test} import scala.collection.JavaConverters._ @@ -36,7 +34,9 @@ class TestRowGeneratorHelper { /** single plain partition */ val row1 = Row.fromSeq(Seq(1, "z3", 10.0, "20220108")) val ptField1 = List("dt").asJava - val ptPos1 = Map("dt" -> List(new Integer(3)).asJava).asJava + val mapValue = org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.LongType) + val ptPos1 = Map("dt" -> mapValue).asJava + Assertions.assertEquals("20220108", getPartitionPathFromRow(row1, ptField1, false, ptPos1)) Assertions.assertEquals("dt=20220108", @@ -45,9 +45,9 @@ class TestRowGeneratorHelper { /** multiple plain partitions */ val row2 = Row.fromSeq(Seq(1, "z3", 10.0, "2022", "01", "08")) val ptField2 = List("year", "month", "day").asJava - val ptPos2 = Map("year" -> List(new Integer(3)).asJava, - "month" -> List(new Integer(4)).asJava, - "day" -> List(new Integer(5)).asJava + val ptPos2 = Map("year" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.StringType), + "month" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.StringType), + "day" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(5)).asJava, DataTypes.StringType) ).asJava Assertions.assertEquals("2022/01/08", getPartitionPathFromRow(row2, ptField2, false, ptPos2)) @@ -58,8 +58,8 @@ class TestRowGeneratorHelper { val timestamp = Timestamp.valueOf("2020-01-08 10:00:00") val instant = timestamp.toInstant val ptField3 = List("event", "event_time").asJava - val ptPos3 = Map("event" -> List(new Integer(3)).asJava, - "event_time" -> List(new Integer(4)).asJava + val ptPos3 = Map("event" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.StringType), + "event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.TimestampType) ).asJava // with timeStamp type @@ -79,7 +79,7 @@ class TestRowGeneratorHelper { /** mixed case with plain and nested partitions */ val nestedRow4 = Row.fromSeq(Seq(instant, "ad")) val ptField4 = List("event_time").asJava - val ptPos4 = Map("event_time" -> List(new Integer(3), new Integer(0)).asJava).asJava + val ptPos4 = Map("event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3), new Integer(0)).asJava, DataTypes.TimestampType)).asJava // with instant type val row4 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow4, "click")) Assertions.assertEquals("2020-01-08 10:00:00.0", @@ -90,8 +90,8 @@ class TestRowGeneratorHelper { val nestedRow5 = Row.fromSeq(Seq(timestamp, "ad")) val ptField5 = List("event", "event_time").asJava val ptPos5 = Map( - "event_time" -> List(new Integer(3), new Integer(0)).asJava, - "event" -> List(new Integer(4)).asJava + "event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3), new Integer(0)).asJava, DataTypes.TimestampType), + "event" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.StringType) ).asJava val row5 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow5, "click")) Assertions.assertEquals("click/2020-01-08 10:00:00.0", diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index f69d5683d..e2b586964 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -400,7 +400,7 @@ public class HoodieAvroUtils { copyOldValueOrSetDefault(genericRecord, newRecord, f); } // do not preserve FILENAME_METADATA_FIELD - newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName); + newRecord.put(HoodieRecord.FILENAME_META_FIELD_POS, fileName); if (!GenericData.get().validate(newSchema, newRecord)) { throw new SchemaCompatibilityException( "Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema); @@ -412,7 +412,7 @@ public class HoodieAvroUtils { public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) { GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>()); // do not preserve FILENAME_METADATA_FIELD - newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName); + newRecord.put(HoodieRecord.FILENAME_META_FIELD_POS, fileName); return newRecord; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 0f21ae1be..e504b7b87 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -42,8 +42,6 @@ public abstract class HoodieRecord implements Serializable { public static final String OPERATION_METADATA_FIELD = "_hoodie_operation"; public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted"; - public static int FILENAME_METADATA_FIELD_POS = 4; - public static final List HOODIE_META_COLUMNS = CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD); @@ -59,6 +57,10 @@ public abstract class HoodieRecord implements Serializable { IntStream.range(0, HOODIE_META_COLUMNS.size()).mapToObj(idx -> Pair.of(HOODIE_META_COLUMNS.get(idx), idx)) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + public static int RECORD_KEY_META_FIELD_POS = HOODIE_META_COLUMNS_NAME_TO_POS.get(RECORD_KEY_METADATA_FIELD); + public static int PARTITION_PATH_META_FIELD_POS = HOODIE_META_COLUMNS_NAME_TO_POS.get(PARTITION_PATH_METADATA_FIELD); + public static int FILENAME_META_FIELD_POS = HOODIE_META_COLUMNS_NAME_TO_POS.get(FILENAME_METADATA_FIELD); + /** * Identifies the record across the table. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 251a990d8..6b10a6282 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -384,12 +384,14 @@ public class HoodieTableMetaClient implements Serializable { throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back"); } - // Meta fields can be disabled only when {@code SimpleKeyGenerator} is used - if (!getTableConfig().populateMetaFields() - && !properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator") - .equals("org.apache.hudi.keygen.SimpleKeyGenerator")) { - throw new HoodieException("Only simple key generator is supported when meta fields are disabled. KeyGenerator used : " - + properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key())); + // meta fields can be disabled only with SimpleKeyGenerator, NonPartitioned and ComplexKeyGen. + if (!getTableConfig().populateMetaFields()) { + String keyGenClass = properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator"); + if (!keyGenClass.equals("org.apache.hudi.keygen.SimpleKeyGenerator") && !keyGenClass.equals("org.apache.hudi.keygen.NonpartitionedKeyGenerator") + && !keyGenClass.equals("org.apache.hudi.keygen.ComplexKeyGenerator")) { + throw new HoodieException("Only simple, non partitioned and complex key generator is supported when meta fields are disabled. KeyGenerator used : " + + properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key())); + } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java index b3acf444a..0ed7b23c7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java @@ -22,7 +22,11 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hive.NonPartitionedExtractor; import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.hudi.keygen.ComplexKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.log4j.LogManager; @@ -57,18 +61,18 @@ public class HoodieDatasetBulkInsertHelper { /** * Prepares input hoodie spark dataset for bulk insert. It does the following steps. - * 1. Uses KeyGenerator to generate hoodie record keys and partition path. - * 2. Add hoodie columns to input spark dataset. - * 3. Reorders input dataset columns so that hoodie columns appear in the beginning. - * 4. Sorts input dataset by hoodie partition path and record key + * 1. Uses KeyGenerator to generate hoodie record keys and partition path. + * 2. Add hoodie columns to input spark dataset. + * 3. Reorders input dataset columns so that hoodie columns appear in the beginning. + * 4. Sorts input dataset by hoodie partition path and record key * * @param sqlContext SQL Context - * @param config Hoodie Write Config - * @param rows Spark Input dataset + * @param config Hoodie Write Config + * @param rows Spark Input dataset * @return hoodie dataset which is ready for bulk insert. */ public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlContext, - HoodieWriteConfig config, Dataset rows, String structName, String recordNamespace, + HoodieWriteConfig config, Dataset rows, String structName, String recordNamespace, BulkInsertPartitioner> bulkInsertPartitionerRows, boolean isGlobalIndex, boolean dropPartitionColumns) { List originalFields = @@ -77,27 +81,46 @@ public class HoodieDatasetBulkInsertHelper { TypedProperties properties = new TypedProperties(); properties.putAll(config.getProps()); String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()); + String recordKeyFields = properties.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()); + String partitionPathFields = properties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) + ? properties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()) : ""; BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties); - String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key()); - String recordKeyUdfFn = RECORD_KEY_UDF_FN + tableName; - String partitionPathUdfFn = PARTITION_PATH_UDF_FN + tableName; - sqlContext.udf().register(recordKeyUdfFn, (UDF1) keyGenerator::getRecordKey, DataTypes.StringType); - sqlContext.udf().register(partitionPathUdfFn, (UDF1) keyGenerator::getPartitionPath, DataTypes.StringType); - final Dataset rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, - callUDF(recordKeyUdfFn, org.apache.spark.sql.functions.struct( - JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq()))); + Dataset rowDatasetWithRecordKeysAndPartitionPath; + if (keyGeneratorClass.equals(NonPartitionedExtractor.class.getName())) { + // for non partitioned, set partition path to empty. + rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields)) + .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType)); + } else if (keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) + || (keyGeneratorClass.equals(ComplexKeyGenerator.class.getName()) && !recordKeyFields.contains(",") && !partitionPathFields.contains(",") + && (!partitionPathFields.contains("timestamp")))) { // incase of ComplexKeyGen, check partition path type. + // simple fields for both record key and partition path: can directly use withColumn + String partitionPathField = keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) ? partitionPathFields : + partitionPathFields.substring(partitionPathFields.indexOf(":") + 1); + rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType)) + .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType)); + } else { + // use udf + String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key()); + String recordKeyUdfFn = RECORD_KEY_UDF_FN + tableName; + String partitionPathUdfFn = PARTITION_PATH_UDF_FN + tableName; + sqlContext.udf().register(recordKeyUdfFn, (UDF1) keyGenerator::getRecordKey, DataTypes.StringType); + sqlContext.udf().register(partitionPathUdfFn, (UDF1) keyGenerator::getPartitionPath, DataTypes.StringType); - final Dataset rowDatasetWithRecordKeysAndPartitionPath = - rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, - callUDF(partitionPathUdfFn, - org.apache.spark.sql.functions.struct( - JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq()))); + final Dataset rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, + callUDF(recordKeyUdfFn, org.apache.spark.sql.functions.struct( + JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq()))); + rowDatasetWithRecordKeysAndPartitionPath = + rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, + callUDF(partitionPathUdfFn, + org.apache.spark.sql.functions.struct( + JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq()))); + } // Add other empty hoodie fields which will be populated before writing to parquet. Dataset rowDatasetWithHoodieColumns = rowDatasetWithRecordKeysAndPartitionPath.withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD, - functions.lit("").cast(DataTypes.StringType)) + functions.lit("").cast(DataTypes.StringType)) .withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType)) .withColumn(HoodieRecord.FILENAME_METADATA_FIELD, @@ -106,7 +129,7 @@ public class HoodieDatasetBulkInsertHelper { Dataset processedDf = rowDatasetWithHoodieColumns; if (dropPartitionColumns) { String partitionColumns = String.join(",", keyGenerator.getPartitionPathFields()); - for (String partitionField: keyGenerator.getPartitionPathFields()) { + for (String partitionField : keyGenerator.getPartitionPathFields()) { originalFields.remove(new Column(partitionField)); } processedDf = rowDatasetWithHoodieColumns.drop(partitionColumns); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java index 823de99fc..9a793c422 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java @@ -25,8 +25,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields; import org.apache.hudi.io.storage.row.HoodieRowCreateHandle; +import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; @@ -123,8 +123,7 @@ public class BulkInsertDataInternalWriterHelper { try { String partitionPath = null; if (populateMetaFields) { // usual path where meta fields are pre populated in prep step. - partitionPath = record.getUTF8String( - HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString(); + partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS)); } else { // if meta columns are disabled. if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen partitionPath = ""; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 4423874ab..89b2ecc1a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -564,8 +564,7 @@ object HoodieSparkSqlWriter { throw new HoodieException("Bulk insert using row writer is not supported with current Spark version." + " To use row writer please switch to spark 2 or spark 3") } - val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(params) - val syncHiveSuccess = metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema) + val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema) (syncHiveSuccess, common.util.Option.ofNullable(instantTime)) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index 9185d09aa..6b617ca20 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows; +import org.apache.hudi.keygen.ComplexKeyGenerator; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.testutils.DataSourceTestUtils; import org.apache.hudi.testutils.HoodieClientTestBase; @@ -94,20 +97,36 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { public void testBulkInsertHelperConcurrently() { IntStream.range(0, 2).parallel().forEach(i -> { if (i % 2 == 0) { - testBulkInsertHelperFor("_row_key"); + testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "_row_key"); } else { - testBulkInsertHelperFor("ts"); + testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "ts"); } }); } - @Test - public void testBulkInsertHelper() { - testBulkInsertHelperFor("_row_key"); + private static Stream provideKeyGenArgs() { + return Stream.of( + Arguments.of(SimpleKeyGenerator.class.getName()), + Arguments.of(ComplexKeyGenerator.class.getName()), + Arguments.of(NonpartitionedKeyGenerator.class.getName())); } - private void testBulkInsertHelperFor(String recordKey) { - HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet(recordKey)).combineInput(false, false).build(); + @ParameterizedTest + @MethodSource("provideKeyGenArgs") + public void testBulkInsertHelper(String keyGenClass) { + testBulkInsertHelperFor(keyGenClass, "_row_key"); + } + + private void testBulkInsertHelperFor(String keyGenClass, String recordKey) { + Map props = null; + if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) { + props = getPropsAllSet(recordKey); + } else if (keyGenClass.equals(ComplexKeyGenerator.class.getName())) { + props = getPropsForComplexKeyGen(recordKey); + } else { // NonPartitioned key gen + props = getPropsForNonPartitionedKeyGen(recordKey); + } + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build(); List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", @@ -121,9 +140,10 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { assertTrue(resultSchema.fieldIndex(entry.getKey()) == entry.getValue()); } + boolean isNonPartitioned = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName()); result.toJavaRDD().foreach(entry -> { assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs(recordKey).toString())); - assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(entry.getAs("partition"))); + assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(isNonPartitioned ? "" : entry.getAs("partition"))); assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals("")); assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals("")); assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals("")); @@ -253,6 +273,23 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { return props; } + private Map getPropsForComplexKeyGen(String recordKey) { + Map props = new HashMap<>(); + props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), ComplexKeyGenerator.class.getName()); + props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey); + props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "simple:partition"); + props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table"); + return props; + } + + private Map getPropsForNonPartitionedKeyGen(String recordKey) { + Map props = new HashMap<>(); + props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), NonpartitionedKeyGenerator.class.getName()); + props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey); + props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table"); + return props; + } + @Test public void testNoPropsSet() { HoodieWriteConfig config = getConfigBuilder(schemaStr).build(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java index 735277d95..6719c2a3d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java @@ -83,7 +83,7 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities { public void testWrongRecordKeyField() { ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps()); Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); - Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType)); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType)); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java index 3bd6a60c4..f6c4c8a8b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java @@ -68,7 +68,7 @@ public class TestGlobalDeleteRecordGenerator extends KeyGeneratorTestUtilities { public void testWrongRecordKeyField() { GlobalDeleteKeyGenerator keyGenerator = new GlobalDeleteKeyGenerator(getWrongRecordKeyFieldProps()); Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); - Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType)); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType)); } @Test @@ -78,7 +78,7 @@ public class TestGlobalDeleteRecordGenerator extends KeyGeneratorTestUtilities { HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); Assertions.assertEquals(key.getPartitionPath(), ""); - keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType); + keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType); Row row = KeyGeneratorTestUtilities.getRow(record); Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi"); Assertions.assertEquals(keyGenerator.getPartitionPath(row), ""); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java index 297b07779..75d9b7da7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestNonpartitionedKeyGenerator.java @@ -94,7 +94,7 @@ public class TestNonpartitionedKeyGenerator extends KeyGeneratorTestUtilities { public void testWrongRecordKeyField() { NonpartitionedKeyGenerator keyGenerator = new NonpartitionedKeyGenerator(getWrongRecordKeyFieldProps()); Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); - Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType)); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType)); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java index 7dea9e414..17cff3505 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java @@ -100,7 +100,7 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities { public void testWrongRecordKeyField() { SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getWrongRecordKeyFieldProps()); Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); - Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType)); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType)); } @Test @@ -116,7 +116,7 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities { public void testComplexRecordKeyField() { SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getComplexRecordKeyProp()); Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); - Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType)); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldSchemaInfoIfNeeded(KeyGeneratorTestUtilities.structType)); } @Test