From 4f6fc726d0d3d2dd427210228bbb36cf18893a92 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 6 Jun 2022 13:21:00 -0400 Subject: [PATCH] [HUDI-4140] Fixing hive style partitioning and default partition with bulk insert row writer with SimpleKeyGen and virtual keys (#5664) Bulk insert row writer code path had a gap wrt hive style partitioning and default partition when virtual keys are enabled with SimpleKeyGen. This patch fixes the issue. --- .../apache/hudi/config/HoodieWriteConfig.java | 4 ++ .../BulkInsertDataInternalWriterHelper.java | 7 ++- ...oodieBulkInsertInternalWriterTestBase.java | 5 ++ ...estHoodieBulkInsertDataInternalWriter.java | 48 +++++++++++++++++++ 4 files changed, 63 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 1603965ea..31ce05173 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1064,6 +1064,10 @@ public class HoodieWriteConfig extends HoodieConfig { return MarkerType.valueOf(markerType.toUpperCase()); } + public boolean isHiveStylePartitioningEnabled() { + return getBooleanOrDefault(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE); + } + public int getMarkersTimelineServerBasedBatchNumThreads() { return getInt(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java index 9a793c422..c9404afe6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.row.HoodieRowCreateHandle; @@ -128,7 +129,11 @@ public class BulkInsertDataInternalWriterHelper { if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen partitionPath = ""; } else if (simpleKeyGen) { // SimpleKeyGen - partitionPath = (record.get(simplePartitionFieldIndex, simplePartitionFieldDataType)).toString(); + Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType); + partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH; + if (writeConfig.isHiveStylePartitioningEnabled()) { + partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath; + } } else { // only BuiltIn key generators are supported if meta fields are disabled. partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType); diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java index 95a023abb..54eaadd1e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java @@ -70,6 +70,10 @@ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarn } protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) { + return getWriteConfig(populateMetaFields, DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().defaultValue()); + } + + protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields, String hiveStylePartitioningValue) { Properties properties = new Properties(); if (!populateMetaFields) { properties.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName()); @@ -77,6 +81,7 @@ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarn properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME); properties.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); } + properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), hiveStylePartitioningValue); return getConfigBuilder(basePath, timelineServicePort).withProperties(properties).build(); } diff --git a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java index fd943b72e..f31a34471 100644 --- a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java @@ -18,15 +18,18 @@ package org.apache.hudi.internal; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.SparkDatasetTestUtils; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -109,6 +112,51 @@ public class TestHoodieBulkInsertDataInternalWriter extends } } + @Test + public void testDataInternalWriterHiveStylePartitioning() throws Exception { + boolean sorted = true; + boolean populateMetaFields = false; + // init config and table + HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true"); + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + for (int i = 0; i < 1; i++) { + String instantTime = "00" + i; + // init writer + HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), + STRUCT_TYPE, populateMetaFields, sorted); + + int size = 10 + RANDOM.nextInt(1000); + // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file + int batches = 3; + Dataset totalInputRows = null; + + for (int j = 0; j < batches; j++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; + Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); + writeRows(inputRows, writer); + if (totalInputRows == null) { + totalInputRows = inputRows; + } else { + totalInputRows = totalInputRows.union(inputRows); + } + } + + BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit(); + Option> fileAbsPaths = Option.of(new ArrayList<>()); + Option> fileNames = Option.of(new ArrayList<>()); + + // verify write statuses + assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames); + + // verify rows + Dataset result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); + assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields); + + result.collectAsList().forEach(entry -> Assertions.assertTrue(entry.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString() + .contains(SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME + "="))); + } + } + /** * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected * to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk.