From fe508376faaa3c36e9f821f3d6fee731983798c4 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 2 Aug 2021 09:45:09 -0400 Subject: [PATCH] [HUDI-2177][HUDI-2200] Adding virtual keys support for MOR table (#3315) --- .../org/apache/hudi/keygen/KeyGenUtils.java | 2 +- ...ExecuteClusteringCommitActionExecutor.java | 19 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 24 +-- .../hudi/client/TestTableSchemaEvolution.java | 2 +- .../metadata/TestHoodieBackedMetadata.java | 160 +++++++---------- .../table/TestHoodieMergeOnReadTable.java | 169 ++++++++++++------ .../action/compact/CompactionTestBase.java | 2 +- .../hudi/testutils/HoodieClientTestBase.java | 6 +- .../testutils/HoodieClientTestHarness.java | 6 +- .../hudi/testutils/HoodieClientTestUtils.java | 43 +++-- .../testutils/HoodieMergeOnReadTestUtils.java | 19 +- .../hudi/common/table/HoodieTableConfig.java | 19 ++ .../common/table/HoodieTableMetaClient.java | 20 +++ .../log/AbstractHoodieLogRecordScanner.java | 25 ++- .../table/log/HoodieFileSliceReader.java | 20 ++- .../hudi/common/util/SpillableMapUtils.java | 12 +- .../metadata/HoodieBackedTableMetadata.java | 12 +- .../common/testutils/SampleTestRecord.java | 16 +- .../hudi/common/testutils/SchemaTestUtil.java | 9 +- .../apache/hudi/hadoop/InputSplitUtils.java | 8 + .../HoodieHFileRealtimeInputFormat.java | 24 +-- .../HoodieParquetRealtimeInputFormat.java | 4 +- .../realtime/HoodieRealtimeFileSplit.java | 18 +- .../hadoop/realtime/HoodieVirtualKeyInfo.java | 65 +++++++ .../RealtimeBootstrapBaseFileSplit.java | 10 ++ .../RealtimeCompactedRecordReader.java | 6 +- .../hudi/hadoop/realtime/RealtimeSplit.java | 27 +++ .../utils/HoodieRealtimeInputFormatUtils.java | 61 +++++-- .../realtime/TestHoodieRealtimeFileSplit.java | 5 +- .../TestHoodieRealtimeRecordReader.java | 21 ++- .../apache/hudi/HoodieMergeOnReadRDD.scala | 4 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 29 +-- .../org/apache/hudi/IncrementalRelation.scala | 3 + .../hudi/MergeOnReadIncrementalRelation.scala | 6 +- .../hudi/MergeOnReadSnapshotRelation.scala | 10 +- .../org/apache/hudi/client/TestBootstrap.java | 6 +- .../HoodieSparkSqlWriterSuite.scala | 2 +- 37 files changed, 633 insertions(+), 261 deletions(-) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 899f041d4..05a6adab5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -60,7 +60,7 @@ public class KeyGenUtils { * @return the partition path for the passed in generic record. */ public static String getPartitionPathFromGenericRecord(GenericRecord genericRecord, Option keyGeneratorOpt) { - return keyGeneratorOpt.isPresent() ? keyGeneratorOpt.get().getRecordKey(genericRecord) : genericRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + return keyGeneratorOpt.isPresent() ? keyGeneratorOpt.get().getPartitionPath(genericRecord) : genericRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index 4cee1d2aa..94c7e937d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -18,10 +18,6 @@ package org.apache.hudi.table.action.cluster; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -36,6 +32,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieFileSliceReader; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -55,6 +52,11 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -98,7 +100,7 @@ public class SparkExecuteClusteringCommitActionExecutor[] writeStatuses = convertStreamToArray(writeStatusRDDStream); JavaRDD writeStatusRDD = engineContext.union(writeStatuses); - + HoodieWriteMetadata> writeMetadata = buildWriteMetadata(writeStatusRDD); JavaRDD statuses = updateIndex(writeStatusRDD, writeMetadata); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect()); @@ -129,7 +131,7 @@ public class SparkExecuteClusteringCommitActionExecutor> writeMetadata) { @@ -211,8 +213,11 @@ public class SparkExecuteClusteringCommitActionExecutor recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1); HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) .combineInput(true, true); - addAppropriatePropsForPopulateMetaFields(configBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); try (SparkRDDWriteClient client = getHoodieWriteClient(configBuilder.build());) { client.startCommitWithTime(newCommitTime); @@ -365,7 +365,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @MethodSource("populateMetaFieldsParams") public void testUpserts(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsert, false); } @@ -376,7 +376,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @MethodSource("populateMetaFieldsParams") public void testUpsertsPrepped(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsertPreppedRecords, true); } @@ -526,7 +526,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @MethodSource("populateMetaFieldsParams") public void testInsertsWithHoodieConcatHandle(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); testHoodieConcatHandle(cfgBuilder.build(), false); } @@ -537,7 +537,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @MethodSource("populateMetaFieldsParams") public void testInsertsPreppedWithHoodieConcatHandle(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); testHoodieConcatHandle(cfgBuilder.build(), true); } @@ -588,7 +588,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @MethodSource("populateMetaFieldsParams") public void testDeletes(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build()); /** * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records @@ -639,7 +639,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @MethodSource("populateMetaFieldsParams") public void testDeletesForInsertsInSameBatch(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build()); /** * Write 200 inserts and issue deletes to a subset(50) of inserts. @@ -1209,7 +1209,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // complete another commit after pending clustering HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig config = cfgBuilder.build(); SparkRDDWriteClient client = getHoodieWriteClient(config); dataGen = new HoodieTestDataGenerator(); @@ -1582,7 +1582,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { public void testCommitWritesRelativePaths(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build());) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); HoodieSparkTable table = HoodieSparkTable.create(cfgBuilder.build(), context, metaClient); @@ -1628,7 +1628,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @MethodSource("populateMetaFieldsParams") public void testMetadataStatsOnCommit(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); @@ -2002,7 +2002,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { public void testMultiOperationsPerCommit(boolean populateMetaFields) throws IOException { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false) .withAllowMultiWriteOnSameInstant(true); - addAppropriatePropsForPopulateMetaFields(cfgBuilder, populateMetaFields); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); String firstInstantTime = "0000"; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 88a95064f..29bad0d94 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -451,7 +451,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException { if (tableType == HoodieTableType.COPY_ON_WRITE) { HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline(); - assertEquals(numExpectedRecords, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, instantTime)); + assertEquals(numExpectedRecords, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of(instantTime))); } else { // TODO: This code fails to read records under the following conditions: // 1. No parquet files yet (i.e. no compaction done yet) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index da57fa85e..fd9b1ddb1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -43,7 +43,6 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; @@ -61,10 +60,10 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.nio.file.Files; @@ -72,9 +71,7 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Properties; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -94,15 +91,16 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { private HoodieTableType tableType; - public void init(HoodieTableType tableType, boolean populateMetaFields) throws IOException { + public void init(HoodieTableType tableType) throws IOException { this.tableType = tableType; initPath(); initSparkContexts("TestHoodieMetadata"); initFileSystem(); fs.mkdirs(new Path(basePath)); - metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); + initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); + } @AfterEach @@ -110,25 +108,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { cleanupResources(); } - private static Stream populateMetaFieldsParams() { - return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of); - } - - private static Stream tableTypePopulateMetaFieldsParams() { - return Stream.of( - Arguments.of(HoodieTableType.COPY_ON_WRITE, true), - Arguments.of(HoodieTableType.COPY_ON_WRITE, false), - Arguments.of(HoodieTableType.MERGE_ON_READ, true) - ); - } - /** * Metadata Table bootstrap scenarios. */ - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testMetadataTableBootstrap(boolean populateMetaFields) throws Exception { - init(HoodieTableType.COPY_ON_WRITE, populateMetaFields); + @Test + public void testMetadataTableBootstrap() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); // Metadata table should not exist until created for the first time @@ -137,7 +122,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Metadata table is not created if disabled by config String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { client.startCommitWithTime(firstCommitTime); client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 5)), firstCommitTime); assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created"); @@ -146,7 +131,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Metadata table should not be created if any non-complete instants are present String secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, true, populateMetaFields), true)) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, true), true)) { client.startCommitWithTime(secondCommitTime); client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime); // AutoCommit is false so no bootstrap @@ -159,7 +144,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Metadata table created when enabled by config & sync is called secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields), true)) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { client.startCommitWithTime(secondCommitTime); client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime); client.syncTableMetadata(); @@ -182,7 +167,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { }); String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields), true)) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { client.startCommitWithTime(thirdCommitTime); client.insert(jsc.parallelize(dataGen.generateUpdates(thirdCommitTime, 2)), thirdCommitTime); client.syncTableMetadata(); @@ -199,11 +184,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Only valid partition directories are added to the metadata. */ - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testOnlyValidPartitionsAdded(boolean populateMetaFields) throws Exception { + @Test + public void testOnlyValidPartitionsAdded() throws Exception { // This test requires local file system - init(HoodieTableType.COPY_ON_WRITE, populateMetaFields); + init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); // Create an empty directory which is not a partition directory (lacks partition metadata) @@ -223,7 +207,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10); final HoodieWriteConfig writeConfig = - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false, populateMetaFields) + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { client.startCommitWithTime("005"); @@ -253,12 +237,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { * Test various table operations sync to Metadata Table correctly. */ @ParameterizedTest - @MethodSource("tableTypePopulateMetaFieldsParams") - public void testTableOperations(HoodieTableType tableType, boolean populateMetaFields) throws Exception { - init(tableType, populateMetaFields); + @EnumSource(HoodieTableType.class) + public void testTableOperations(HoodieTableType tableType) throws Exception { + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // Write 1 (Bulk insert) String newCommitTime = "001"; @@ -341,12 +325,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { * Test rollback of various table operations sync to Metadata Table correctly. */ @ParameterizedTest - @MethodSource("tableTypePopulateMetaFieldsParams") - public void testRollbackOperations(HoodieTableType tableType, boolean populateMetaFields) throws Exception { - init(tableType, populateMetaFields); + @EnumSource(HoodieTableType.class) + public void testRollbackOperations(HoodieTableType tableType) throws Exception { + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // Write 1 (Bulk insert) String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); List records = dataGen.generateInserts(newCommitTime, 20); @@ -419,7 +403,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Rollback of partial commits try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(false, true, false, populateMetaFields).withRollbackUsingMarkers(false).build())) { + getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) { // Write updates and inserts String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); @@ -433,7 +417,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Marker based rollback of partial commits try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(false, true, false, populateMetaFields).withRollbackUsingMarkers(true).build())) { + getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) { // Write updates and inserts String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); @@ -451,12 +435,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { * Once explicit sync is called, metadata should match. */ @ParameterizedTest - @MethodSource("tableTypePopulateMetaFieldsParams") - public void testRollbackUnsyncedCommit(HoodieTableType tableType, boolean populateMetaFields) throws Exception { - init(tableType, populateMetaFields); + @EnumSource(HoodieTableType.class) + public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception { + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // Initialize table with metadata String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); List records = dataGen.generateInserts(newCommitTime, 20); @@ -466,7 +450,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { validateMetadata(client); } String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { // Commit with metadata disabled client.startCommitWithTime(newCommitTime); List records = dataGen.generateUpdates(newCommitTime, 10); @@ -475,7 +459,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { client.rollback(newCommitTime); } - try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) { assertFalse(metadata(client).isInSync()); client.syncTableMetadata(); validateMetadata(client); @@ -486,10 +470,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { * Test sync of table operations. */ @ParameterizedTest - @MethodSource("tableTypePopulateMetaFieldsParams") + @EnumSource(HoodieTableType.class) @Disabled - public void testSync(HoodieTableType tableType, boolean populateMetaFields) throws Exception { - init(tableType, populateMetaFields); + public void testSync(HoodieTableType tableType) throws Exception { + init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); String newCommitTime; @@ -497,7 +481,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { List writeStatuses; // Initial commits without metadata table enabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); records = dataGen.generateInserts(newCommitTime, 5); client.startCommitWithTime(newCommitTime); @@ -512,7 +496,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } // Enable metadata table so it initialized by listing from file system - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // inserts newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); @@ -528,7 +512,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { String restoreToInstant; String inflightActionTimestamp; String beforeInflightActionTimestamp; - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { // updates newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); @@ -600,7 +584,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp)); fs.create(inflightCleanPath).close(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details client.syncTableMetadata(); @@ -629,7 +613,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } // Enable metadata table and ensure it is synced - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { client.restoreToInstant(restoreToInstant); assertFalse(metadata(client).isInSync()); @@ -645,14 +629,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config. */ - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testCleaningArchivingAndCompaction(boolean populateMetaFields) throws Exception { - init(HoodieTableType.COPY_ON_WRITE, populateMetaFields); + @Test + public void testCleaningArchivingAndCompaction() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); final int maxDeltaCommitsBeforeCompaction = 4; - HoodieWriteConfig config = getWriteConfigBuilder(true, true, false, populateMetaFields) + HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) .archiveCommitsWith(6, 8).retainCommits(1) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) @@ -693,15 +676,14 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test various error scenarios. */ - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testErrorCases(boolean populateMetaFields) throws Exception { - init(HoodieTableType.COPY_ON_WRITE, populateMetaFields); + @Test + public void testErrorCases() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); // TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table // should be rolled back to last valid commit. - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields), true)) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 10); @@ -722,7 +704,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { commitInstantFileName), false)); } - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields), true)) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { String newCommitTime = client.startCommit(); // Next insert List records = dataGen.generateInserts(newCommitTime, 5); @@ -739,11 +721,11 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { */ //@Test public void testNonPartitioned() throws Exception { - init(HoodieTableType.COPY_ON_WRITE, true); + init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); HoodieTestDataGenerator nonPartitionedGenerator = new HoodieTestDataGenerator(new String[] {""}); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, true))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // Write 1 (Bulk insert) String newCommitTime = "001"; List records = nonPartitionedGenerator.generateInserts(newCommitTime, 10); @@ -759,13 +741,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test various metrics published by metadata table. */ - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testMetadataMetrics(boolean populateMetaFields) throws Exception { - init(HoodieTableType.COPY_ON_WRITE, populateMetaFields); + @Test + public void testMetadataMetrics() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfigBuilder(true, true, true, populateMetaFields).build())) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfigBuilder(true, true, true).build())) { // Write String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); List records = dataGen.generateInserts(newCommitTime, 20); @@ -788,16 +769,15 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test when reading from metadata table which is out of sync with dataset that results are still consistent. */ - @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testMetadataOutOfSync(boolean populateMetaFields) throws Exception { - init(HoodieTableType.COPY_ON_WRITE, populateMetaFields); + @Test + public void testMetadataOutOfSync() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields)); + SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true)); // Enable metadata so table is initialized - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // Perform Bulk Insert String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); @@ -806,7 +786,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } // Perform commit operations with metadata disabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { // Perform Insert String newCommitTime = "002"; client.startCommitWithTime(newCommitTime); @@ -831,7 +811,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { validateMetadata(unsyncedClient); // Perform clean operation with metadata disabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { // One more commit needed to trigger clean so upsert and compact String newCommitTime = "005"; client.startCommitWithTime(newCommitTime); @@ -853,7 +833,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { validateMetadata(unsyncedClient); // Perform restore with metadata disabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false, populateMetaFields))) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { client.restoreToInstant("004"); } @@ -1028,20 +1008,18 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } } - private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata, boolean populateMetaFields) { - return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false, populateMetaFields).build(); + private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) { + return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build(); } - private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics, boolean populateMetaFields) { - return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics, populateMetaFields); + private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { + return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics); } - private HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, - boolean enableMetrics, boolean populateMetaFields) { + private HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2) .withAutoCommit(autoCommit) - .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1) .withFailedWritesCleaningPolicy(policy) @@ -1050,7 +1028,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() .withEnableBackupForRemoteFileSystemView(false).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(populateMetaFields ? HoodieIndex.IndexType.BLOOM : HoodieIndex.IndexType.SIMPLE).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(useFileListingMetadata) .enableMetrics(enableMetrics).build()) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index ab55462a5..e7deb044d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -82,6 +82,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.File; import java.io.IOException; @@ -93,6 +96,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -113,7 +117,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { private HoodieFileFormat baseFileFormat; - public void init(HoodieFileFormat baseFileFormat) throws IOException { + public void init(HoodieFileFormat baseFileFormat, boolean populateMetaFields) throws IOException { this.baseFileFormat = baseFileFormat; initDFS(); initSparkContexts("TestHoodieMergeOnReadTable"); @@ -122,7 +126,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { context = new HoodieSparkEngineContext(jsc); initPath(); dfs.mkdirs(new Path(basePath)); - metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, baseFileFormat); + + Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.key(), baseFileFormat.toString()); + + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, properties); initTestDataGenerator(); roSnapshotJobConf = new JobConf(hadoopConf); @@ -132,7 +140,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { @BeforeEach public void init() throws IOException { - init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue()); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), true); } @AfterEach @@ -140,9 +148,19 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { cleanupResources(); } - @Test - public void testSimpleInsertAndUpdate() throws Exception { - HoodieWriteConfig cfg = getConfig(true); + private static Stream populateMetaFieldsParams() { + return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testSimpleInsertAndUpdate(boolean populateMetaFields) throws Exception { + clean(); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); + + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -179,17 +197,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime)); - assertEquals(200, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"), - "Must contain 200 records"); + if (cfg.populateMetaFields()) { + assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of("000")), + "Must contain 200 records"); + } else { + assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.empty())); + } } } @Test public void testSimpleInsertAndUpdateHFile() throws Exception { clean(); - init(HoodieFileFormat.HFILE); - - HoodieWriteConfig cfg = getConfig(true); + init(HoodieFileFormat.HFILE, true); + HoodieWriteConfig cfg = getConfigBuilder(true).build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -227,26 +248,35 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime)); - assertEquals(200, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"), + assertEquals(200, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of("000")), "Must contain 200 records"); } } - @Test - public void testSimpleClusteringNoUpdates() throws Exception { - testClustering(false); + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testSimpleClusteringNoUpdates(boolean populateMetaFields) throws Exception { + clean(); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); + testClustering(false, populateMetaFields); } - @Test - public void testSimpleClusteringWithUpdates() throws Exception { - testClustering(true); + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testSimpleClusteringWithUpdates(boolean populateMetaFields) throws Exception { + clean(); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); + testClustering(true, populateMetaFields); } - private void testClustering(boolean doUpdates) throws Exception { + private void testClustering(boolean doUpdates, boolean populateMetaFields) throws Exception { // set low compaction small File Size to generate more file groups. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); - HoodieWriteConfig cfg = getConfigBuilder(true, 10L, clusteringConfig).build(); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, 10L, clusteringConfig); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -302,8 +332,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { "Expecting a single commit."); assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp()); assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction()); - assertEquals(400, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"), - "Must contain 200 records"); + if (cfg.populateMetaFields()) { + assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of("000")), + "Must contain 200 records"); + } else { + assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.empty())); + } } } @@ -431,9 +465,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } } - @Test - public void testSimpleInsertUpdateAndDelete() throws Exception { - HoodieWriteConfig cfg = getConfig(true); + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception { + clean(); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -505,7 +544,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertTrue(dataFilesToRead.findAny().isPresent()); List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath, new JobConf(hadoopConf), true, false); // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 assertEquals(0, recordsRead.size(), "Must contain 0 records"); } @@ -575,8 +614,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { testCOWToMORConvertedTableRollback(true); } - private void testRollbackWithDeltaAndCompactionCommit(Boolean rollbackUsingMarkers) throws Exception { - HoodieWriteConfig cfg = getConfig(false, rollbackUsingMarkers); + private void testRollbackWithDeltaAndCompactionCommit(Boolean rollbackUsingMarkers, boolean populateMetaFields) throws Exception { + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, IndexType.SIMPLE); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { @@ -621,7 +662,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { */ final String commitTime1 = "002"; // WriteClient with custom config (disable small file handling) - try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) { + try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) { secondClient.startCommitWithTime(commitTime1); List copyOfRecords = new ArrayList<>(records); @@ -723,19 +764,31 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } } - @Test - public void testRollbackWithDeltaAndCompactionCommitUsingFileList() throws Exception { - testRollbackWithDeltaAndCompactionCommit(false); + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testRollbackWithDeltaAndCompactionCommitUsingFileList(boolean populateMetaFields) throws Exception { + clean(); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); + testRollbackWithDeltaAndCompactionCommit(false, populateMetaFields); } - @Test - public void testRollbackWithDeltaAndCompactionCommitUsingMarkers() throws Exception { - testRollbackWithDeltaAndCompactionCommit(true); + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testRollbackWithDeltaAndCompactionCommitUsingMarkers(boolean populateMetaFields) throws Exception { + clean(); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); + testRollbackWithDeltaAndCompactionCommit(true, populateMetaFields); } - @Test - public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { - HoodieWriteConfig cfg = getConfig(false); + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testMultiRollbackWithDeltaAndCompactionCommit(boolean populateMetaFields) throws Exception { + clean(); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); + try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** * Write 1 (only inserts) @@ -776,7 +829,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { */ newCommitTime = "002"; // WriteClient with custom config (disable small file handling) - SparkRDDWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff()); + SparkRDDWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(populateMetaFields)); nClient.startCommitWithTime(newCommitTime); List copyOfRecords = new ArrayList<>(records); @@ -885,20 +938,29 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } } - protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff() { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) { + HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2) .withAutoCommit(false) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024) .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withEmbeddedTimelineServerEnabled(true) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024).parquetMaxFileSize(1024).build()).forTable("test-trip-table") - .build(); + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024).parquetMaxFileSize(1024).build()).forTable("test-trip-table"); + + if (!populateMetaFields) { + addConfigsForPopulateMetaFields(cfgBuilder, false); + } + return cfgBuilder.build(); } - @Test - public void testUpsertPartitioner() throws Exception { - HoodieWriteConfig cfg = getConfig(true); + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { + clean(); + init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -969,16 +1031,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, - basePath); + basePath, new JobConf(hadoopConf), true, false); // Wrote 20 records in 2 batches assertEquals(40, recordsRead.size(), "Must contain 40 records"); } } - @Test - public void testLogFileCountsAfterCompaction() throws Exception { + @ParameterizedTest + @MethodSource("populateMetaFieldsParams") + public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws Exception { // insert 100 records - HoodieWriteConfig config = getConfig(true); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig config = cfgBuilder.build(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -1052,6 +1118,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // insert 100 records // Setting IndexType to be InMemory to simulate Global Index nature HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -1094,6 +1161,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // insert 100 records // Setting IndexType to be InMemory to simulate Global Index nature HoodieWriteConfig config = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY).build(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -1253,6 +1321,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY) .withAutoCommit(false).build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { metaClient = getHoodieMetaClient(hadoopConf, basePath); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index 4af39739a..4c9ab3dc9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -198,7 +198,7 @@ public class CompactionTestBase extends HoodieClientTestBase { assertEquals(latestCompactionCommitTime, compactionInstantTime, "Expect compaction instant time to be the latest commit time"); assertEquals(expectedNumRecs, - HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"), + HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of("000")), "Must contain expected records"); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 0c88777be..aa8814ad6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -518,12 +518,12 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { if (filterForCommitTimeWithAssert) { // Check that the incremental consumption from prevCommitTime assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, prevCommitTime), + HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of(prevCommitTime)), "Incremental consumption from " + prevCommitTime + " should give all records in latest commit"); if (commitTimesBetweenPrevAndNew.isPresent()) { commitTimesBetweenPrevAndNew.get().forEach(ct -> { assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, ct), + HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of(ct)), "Incremental consumption from " + ct + " should give all records in latest commit"); }); } @@ -590,7 +590,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { if (filerForCommitTimeWithAssert) { // Check that the incremental consumption from prevCommitTime assertEquals(HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, prevCommitTime), + HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of(prevCommitTime)), "Incremental consumption from " + prevCommitTime + " should give no records in latest commit," + " since it is a delete operation"); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index e618fa833..233b679d8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.table.WorkloadStat; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -234,10 +235,13 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im properties.put(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), "false"); properties.put("hoodie.datasource.write.recordkey.field","_row_key"); properties.put("hoodie.datasource.write.partitionpath.field","partition_path"); + properties.put(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key(), "_row_key"); + properties.put(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key(), "partition_path"); + properties.put(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key(), SimpleKeyGenerator.class.getName()); return properties; } - protected void addAppropriatePropsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) { + protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) { if (!populateMetaFields) { configBuilder.withProperties(getPropertiesForKeyGen()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 53db293c1..15fb50c26 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -141,33 +141,46 @@ public class HoodieClientTestUtils { } /** - * Obtain all new data written into the Hoodie table since the given timestamp. + * Obtain all new data written into the Hoodie table with an optional from timestamp. */ - public static long countRecordsSince(JavaSparkContext jsc, String basePath, SQLContext sqlContext, - HoodieTimeline commitTimeline, String lastCommitTime) { + public static long countRecordsOptionallySince(JavaSparkContext jsc, String basePath, SQLContext sqlContext, + HoodieTimeline commitTimeline, Option lastCommitTimeOpt) { List commitsToReturn = - commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE).getInstants().collect(Collectors.toList()); + lastCommitTimeOpt.isPresent() ? commitTimeline.findInstantsAfter(lastCommitTimeOpt.get(), Integer.MAX_VALUE).getInstants().collect(Collectors.toList()) : + commitTimeline.getInstants().collect(Collectors.toList()); try { // Go over the commit metadata, and obtain the new files that need to be read. HashMap fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn); String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]); if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return sqlContext.read().parquet(paths) - .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime)) - .count(); + Dataset rows = sqlContext.read().parquet(paths); + if (lastCommitTimeOpt.isPresent()) { + return rows.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTimeOpt.get())) + .count(); + } else { + return rows.count(); + } } else if (paths[0].endsWith(HoodieFileFormat.HFILE.getFileExtension())) { - return readHFile(jsc, paths) - .filter(gr -> HoodieTimeline.compareTimestamps(lastCommitTime, HoodieActiveTimeline.LESSER_THAN, - gr.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString())) - .count(); + Stream genericRecordStream = readHFile(jsc, paths); + if (lastCommitTimeOpt.isPresent()) { + return genericRecordStream.filter(gr -> HoodieTimeline.compareTimestamps(lastCommitTimeOpt.get(), HoodieActiveTimeline.LESSER_THAN, + gr.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString())) + .count(); + } else { + return genericRecordStream.count(); + } } else if (paths[0].endsWith(HoodieFileFormat.ORC.getFileExtension())) { - return sqlContext.read().orc(paths) - .filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTime)) - .count(); + Dataset rows = sqlContext.read().orc(paths); + if (lastCommitTimeOpt.isPresent()) { + return rows.filter(String.format("%s >'%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, lastCommitTimeOpt.get())) + .count(); + } else { + return rows.count(); + } } throw new HoodieException("Unsupported base file format for file :" + paths[0]); } catch (IOException e) { - throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTime, e); + throw new HoodieException("Error pulling data incrementally from commitTimestamp :" + lastCommitTimeOpt.get(), e); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index 5b37b3b2d..8cd68ff83 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -58,20 +58,30 @@ public class HoodieMergeOnReadTestUtils { public static List getRecordsUsingInputFormat(Configuration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime) { + return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, true); + } + + public static List getRecordsUsingInputFormat(Configuration conf, List inputPaths, + String basePath, JobConf jobConf, boolean realtime, boolean populateMetaFieldsConfigValue) { Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, schema, - HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); + HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>(), populateMetaFieldsConfigValue); } public static List getRecordsUsingInputFormat(Configuration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, String rawHiveColumnTypes, boolean projectCols, List projectedColumns) { + return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, rawSchema, rawHiveColumnTypes, projectCols, projectedColumns, true); + } + + public static List getRecordsUsingInputFormat(Configuration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, + String rawHiveColumnTypes, boolean projectCols, List projectedColumns, boolean populateMetaFieldsConfigValue) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build(); FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(), realtime, jobConf); Schema schema = HoodieAvroUtils.addMetadataFields(rawSchema); String hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(rawHiveColumnTypes); - setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedColumns); + setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedColumns, populateMetaFieldsConfigValue); final List fields; if (projectCols) { fields = schema.getFields().stream().filter(f -> projectedColumns.contains(f.name())) @@ -112,6 +122,11 @@ public class HoodieMergeOnReadTestUtils { } private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, List projectedCols) { + setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedCols, true); + } + + private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, List projectedCols, + boolean populateMetaFieldsConfigValue) { List fields = schema.getFields(); final List projectedColNames; if (!projectCols) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 987d91033..853fffdca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -157,6 +157,11 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable { .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated " + "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing"); + public static final ConfigProperty HOODIE_TABLE_KEY_GENERATOR_CLASS = ConfigProperty + .key("hoodie.table.keygenerator.class") + .noDefaultValue() + .withDocumentation("Key Generator class property for the hoodie table"); + public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName(); public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { @@ -276,6 +281,13 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable { return Option.empty(); } + /** + * @returns the partition field prop. + */ + public String getPartitionFieldProp() { + return getString(HOODIE_TABLE_PARTITION_FIELDS_PROP); + } + /** * Read the payload class for HoodieRecords from the table properties. */ @@ -344,6 +356,13 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable { return Boolean.parseBoolean(getStringOrDefault(HOODIE_POPULATE_META_FIELDS)); } + /** + * @returns the record key field prop. + */ + public String getRecordKeyFieldProp() { + return getString(HOODIE_TABLE_RECORDKEY_FIELDS); + } + public Map propsMap() { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 74c985965..71a834e7b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -324,6 +324,14 @@ public class HoodieTableMetaClient implements Serializable { && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()))) { throw new HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back"); } + + // meta fields can be disabled only with SimpleKeyGenerator + if (!getTableConfig().populateMetaFields() + && !properties.getProperty(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key(), "org.apache.hudi.keygen.SimpleKeyGenerator") + .equals("org.apache.hudi.keygen.SimpleKeyGenerator")) { + throw new HoodieException("Only simple key generator is supported when meta fields are disabled. KeyGenerator used : " + + properties.getProperty(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS.key())); + } } /** @@ -617,6 +625,7 @@ public class HoodieTableMetaClient implements Serializable { private String bootstrapIndexClass; private String bootstrapBasePath; private Boolean populateMetaFields; + private String keyGeneratorClassProp; private PropertyBuilder() { @@ -695,6 +704,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setKeyGeneratorClassProp(String keyGeneratorClassProp) { + this.keyGeneratorClassProp = keyGeneratorClassProp; + return this; + } + public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) { return setTableType(metaClient.getTableType()) .setTableName(metaClient.getTableConfig().getTableName()) @@ -748,6 +762,9 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS)) { setPopulateMetaFields(hoodieConfig.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS)); } + if (hoodieConfig.contains(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS)) { + setKeyGeneratorClassProp(hoodieConfig.getString(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS)); + } return this; } @@ -804,6 +821,9 @@ public class HoodieTableMetaClient implements Serializable { if (null != populateMetaFields) { tableConfig.setValue(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS, Boolean.toString(populateMetaFields)); } + if (null != keyGeneratorClassProp) { + tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS, keyGeneratorClassProp); + } return tableConfig.getProps(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index ec7714ec7..95ed80afa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -18,15 +18,11 @@ package org.apache.hudi.common.table.log; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; @@ -37,8 +33,15 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -80,6 +83,8 @@ public abstract class AbstractHoodieLogRecordScanner { private final HoodieTableMetaClient hoodieTableMetaClient; // Merge strategy to use when combining records from log private final String payloadClassFQN; + // simple key gen fields + private Option> simpleKeyGenFields = Option.empty(); // Log File Paths protected final List logFilePaths; // Read Lazily flag @@ -115,6 +120,10 @@ public abstract class AbstractHoodieLogRecordScanner { this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); // load class from the payload fully qualified class name this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass(); + HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig(); + if (!tableConfig.populateMetaFields()) { + this.simpleKeyGenFields = Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp())); + } this.totalLogFiles.addAndGet(logFilePaths.size()); this.logFilePaths = logFilePaths; this.readBlocksLazily = readBlocksLazily; @@ -302,7 +311,11 @@ public abstract class AbstractHoodieLogRecordScanner { } protected HoodieRecord createHoodieRecord(IndexedRecord rec) { - return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN); + if (!simpleKeyGenFields.isPresent()) { + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN); + } else { + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get()); + } } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index 3b73f41cb..1453d09a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -18,13 +18,16 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SpillableMapUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.io.storage.HoodieFileReader; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.SpillableMapUtils; -import org.apache.hudi.io.storage.HoodieFileReader; import java.io.IOException; import java.util.Iterator; @@ -36,11 +39,14 @@ public class HoodieFileSliceReader implements Iterator> recordsIterator; public static HoodieFileSliceReader getFileSliceReader( - HoodieFileReader baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass) throws IOException { + HoodieFileReader baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, + Option> simpleKeyGenFieldsOpt) throws IOException { Iterator baseIterator = baseFileReader.getRecordIterator(schema); while (baseIterator.hasNext()) { - GenericRecord record = (GenericRecord) baseIterator.next(); - HoodieRecord hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass); + GenericRecord record = (GenericRecord) baseIterator.next(); + HoodieRecord hoodieRecord = simpleKeyGenFieldsOpt.isPresent() + ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get()) + : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass); scanner.processNextRecord(hoodieRecord); } return new HoodieFileSliceReader(scanner.iterator()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index 2dbc9123f..43f8ba5e2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.avro.generic.GenericRecord; @@ -110,8 +111,15 @@ public class SpillableMapUtils { * Utility method to convert bytes to HoodieRecord using schema and payload class. */ public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz) { - String recKey = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - String partitionPath = rec.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD)); + } + + /** + * Utility method to convert bytes to HoodieRecord using schema and payload class. + */ + public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, Pair recordKeyPartitionPathPair) { + String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString(); + String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString(); HoodieRecord hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath), ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class)); return (R) hoodieRecord; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 1667f188e..8995ab491 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -37,6 +38,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.TableNotFoundException; @@ -70,6 +72,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { private String metadataBasePath; // Metadata table's timeline and metaclient private HoodieTableMetaClient metaClient; + private HoodieTableConfig tableConfig; private List latestFileSystemMetadataSlices; // should we reuse the open file handles, across calls private final boolean reuse; @@ -98,16 +101,19 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath); try { this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build(); + this.tableConfig = metaClient.getTableConfig(); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); latestFileSystemMetadataSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList()); } catch (TableNotFoundException e) { LOG.warn("Metadata table was not found at path " + metadataBasePath); this.enabled = false; this.metaClient = null; + this.tableConfig = null; } catch (Exception e) { LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e); this.enabled = false; this.metaClient = null; + this.tableConfig = null; } } } @@ -126,8 +132,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { HoodieTimer readTimer = new HoodieTimer().startTimer(); Option baseRecord = baseFileReader.getRecordByKey(key); if (baseRecord.isPresent()) { - hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), - metaClient.getTableConfig().getPayloadClass()); + hoodieRecord = tableConfig.populateMetaFields() ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), + tableConfig.getPayloadClass()) : SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), + tableConfig.getPayloadClass(), Pair.of(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp())); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SampleTestRecord.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SampleTestRecord.java index f8e6252a2..c4a3d4031 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SampleTestRecord.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SampleTestRecord.java @@ -75,11 +75,17 @@ public class SampleTestRecord implements Serializable { private String[] stringArray; public SampleTestRecord(String instantTime, int recordNumber, String fileId) { - this._hoodie_commit_time = instantTime; - this._hoodie_record_key = "key" + recordNumber; - this._hoodie_partition_path = instantTime; - this._hoodie_file_name = fileId; - this._hoodie_commit_seqno = instantTime + recordNumber; + this(instantTime, recordNumber, fileId, true); + } + + public SampleTestRecord(String instantTime, int recordNumber, String fileId, boolean populateMetaFields) { + if (populateMetaFields) { + this._hoodie_commit_time = instantTime; + this._hoodie_record_key = "key" + recordNumber; + this._hoodie_partition_path = instantTime; + this._hoodie_file_name = fileId; + this._hoodie_commit_seqno = instantTime + recordNumber; + } String commitTimeSuffix = "@" + instantTime; int commitHashCode = instantTime.hashCode(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java index 420c6cce7..cde87d467 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java @@ -185,8 +185,13 @@ public final class SchemaTestUtil { } public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber, String instantTime, - String fileId) throws IOException { - SampleTestRecord record = new SampleTestRecord(instantTime, recordNumber, fileId); + String fileId) throws IOException { + return generateAvroRecordFromJson(schema, recordNumber, instantTime, fileId, true); + } + + public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber, String instantTime, + String fileId, boolean populateMetaFields) throws IOException { + SampleTestRecord record = new SampleTestRecord(instantTime, recordNumber, fileId, populateMetaFields); MercifulJsonConverter converter = new MercifulJsonConverter(); return converter.convert(record.toJsonString(), schema); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java index 745657183..e485e72c2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java @@ -45,6 +45,14 @@ public class InputSplitUtils { return new String(bytes, StandardCharsets.UTF_8); } + public static void writeBoolean(Boolean valueToWrite, DataOutput out) throws IOException { + out.writeBoolean(valueToWrite); + } + + public static boolean readBoolean(DataInput in) throws IOException { + return in.readBoolean(); + } + /** * Return correct base-file schema based on split. * diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java index e75cff641..63728e38f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java @@ -18,9 +18,14 @@ package org.apache.hudi.hadoop.realtime; -import java.io.IOException; -import java.util.Arrays; -import java.util.stream.Stream; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.hadoop.HoodieHFileInputFormat; +import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; +import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -31,16 +36,13 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.hadoop.HoodieHFileInputFormat; -import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; -import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; -import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; -import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Stream; + /** * HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base file format. */ @@ -90,7 +92,7 @@ public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat { // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction // time. HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf); - HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf); + HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, Option.empty()); this.conf = jobConf; this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 6b827d3ee..028641c62 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; +import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; @@ -36,7 +37,6 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -100,7 +100,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i // time. HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConf); if (!realtimeSplit.getDeltaLogPaths().isEmpty()) { - HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf); + HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getHoodieVirtualKeyInfo()); } this.conf = jobConf; this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java index fe481f0a2..6423f2cfd 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -18,6 +18,8 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.util.Option; + import org.apache.hadoop.mapred.FileSplit; import java.io.DataInput; @@ -36,16 +38,20 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit private String basePath; + private Option hoodieVirtualKeyInfo = Option.empty(); + public HoodieRealtimeFileSplit() { super(); } - public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogPaths, String maxCommitTime) + public HoodieRealtimeFileSplit(FileSplit baseSplit, String basePath, List deltaLogPaths, String maxCommitTime, + Option hoodieVirtualKeyInfo) throws IOException { super(baseSplit.getPath(), baseSplit.getStart(), baseSplit.getLength(), baseSplit.getLocations()); this.deltaLogPaths = deltaLogPaths; this.maxCommitTime = maxCommitTime; this.basePath = basePath; + this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo; } public List getDeltaLogPaths() { @@ -60,6 +66,16 @@ public class HoodieRealtimeFileSplit extends FileSplit implements RealtimeSplit return basePath; } + @Override + public void setHoodieVirtualKeyInfo(Option hoodieVirtualKeyInfo) { + this.hoodieVirtualKeyInfo = hoodieVirtualKeyInfo; + } + + @Override + public Option getHoodieVirtualKeyInfo() { + return hoodieVirtualKeyInfo; + } + public void setDeltaLogPaths(List deltaLogPaths) { this.deltaLogPaths = deltaLogPaths; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java new file mode 100644 index 000000000..763b80d4a --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import java.io.Serializable; + +/** + * Class to hold virtual key info when meta fields are disabled. + */ +public class HoodieVirtualKeyInfo implements Serializable { + + private final String recordKeyField; + private final String partitionPathField; + private final int recordKeyFieldIndex; + private final int partitionPathFieldIndex; + + public HoodieVirtualKeyInfo(String recordKeyField, String partitionPathField, int recordKeyFieldIndex, int partitionPathFieldIndex) { + this.recordKeyField = recordKeyField; + this.partitionPathField = partitionPathField; + this.recordKeyFieldIndex = recordKeyFieldIndex; + this.partitionPathFieldIndex = partitionPathFieldIndex; + } + + public String getRecordKeyField() { + return recordKeyField; + } + + public String getPartitionPathField() { + return partitionPathField; + } + + public int getRecordKeyFieldIndex() { + return recordKeyFieldIndex; + } + + public int getPartitionPathFieldIndex() { + return partitionPathFieldIndex; + } + + @Override + public String toString() { + return "HoodieVirtualKeyInfo{" + + "recordKeyField='" + recordKeyField + '\'' + + ", partitionPathField='" + partitionPathField + '\'' + + ", recordKeyFieldIndex=" + recordKeyFieldIndex + + ", partitionPathFieldIndex=" + partitionPathFieldIndex + + '}'; + } +} \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java index fd3b5b810..4da310da4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; import org.apache.hadoop.mapred.FileSplit; @@ -77,6 +78,11 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple return basePath; } + @Override + public Option getHoodieVirtualKeyInfo() { + return Option.empty(); + } + @Override public void setDeltaLogPaths(List deltaLogPaths) { this.deltaLogPaths = deltaLogPaths; @@ -91,4 +97,8 @@ public class RealtimeBootstrapBaseFileSplit extends BootstrapBaseFileSplit imple public void setBasePath(String basePath) { this.basePath = basePath; } + + @Override + public void setHoodieVirtualKeyInfo(Option hoodieVirtualKeyInfo) {} + } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index c552e3a78..bad5e1982 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -53,6 +53,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader private final Map> deltaRecordMap; private final Set deltaRecordKeys; + private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; private Iterator deltaItr; public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job, @@ -61,6 +62,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader this.parquetReader = realReader; this.deltaRecordMap = getMergedLogRecordScanner().getRecords(); this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet()); + if (split.getHoodieVirtualKeyInfo().isPresent()) { + this.recordKeyIndex = split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex(); + } } /** @@ -102,7 +106,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader // with a new block of values while (this.parquetReader.next(aVoid, arrayWritable)) { if (!deltaRecordMap.isEmpty()) { - String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString(); + String key = arrayWritable.get()[recordKeyIndex].toString(); if (deltaRecordMap.containsKey(key)) { // mark the key as handled this.deltaRecordKeys.remove(key); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java index 0fab73434..108613c18 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.InputSplitUtils; import org.apache.hadoop.fs.Path; @@ -52,8 +53,15 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { */ String getBasePath(); + /** + * Returns Virtual key info if meta fields are disabled. + * @return + */ + Option getHoodieVirtualKeyInfo(); + /** * Update Log File Paths. + * * @param deltaLogPaths */ void setDeltaLogPaths(List deltaLogPaths); @@ -70,6 +78,8 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { */ void setBasePath(String basePath); + void setHoodieVirtualKeyInfo(Option hoodieVirtualKeyInfo); + default void writeToOutput(DataOutput out) throws IOException { InputSplitUtils.writeString(getBasePath(), out); InputSplitUtils.writeString(getMaxCommitTime(), out); @@ -77,6 +87,15 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { for (String logFilePath : getDeltaLogPaths()) { InputSplitUtils.writeString(logFilePath, out); } + if (!getHoodieVirtualKeyInfo().isPresent()) { + InputSplitUtils.writeBoolean(false, out); + } else { + InputSplitUtils.writeBoolean(true, out); + InputSplitUtils.writeString(getHoodieVirtualKeyInfo().get().getRecordKeyField(), out); + InputSplitUtils.writeString(getHoodieVirtualKeyInfo().get().getPartitionPathField(), out); + InputSplitUtils.writeString(String.valueOf(getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex()), out); + InputSplitUtils.writeString(String.valueOf(getHoodieVirtualKeyInfo().get().getPartitionPathFieldIndex()), out); + } } default void readFromInput(DataInput in) throws IOException { @@ -88,6 +107,14 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo { deltaLogPaths.add(InputSplitUtils.readString(in)); } setDeltaLogPaths(deltaLogPaths); + boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in); + if (hoodieVirtualKeyPresent) { + String recordKeyField = InputSplitUtils.readString(in); + String partitionPathField = InputSplitUtils.readString(in); + int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in)); + int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in)); + setHoodieVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex))); + } } /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 22d30899c..f84e34405 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -18,14 +18,15 @@ package org.apache.hudi.hadoop.utils; -import org.apache.hadoop.mapred.JobConf; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewManager; @@ -37,21 +38,24 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; +import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit; +import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SplitLocationInfo; -import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; import java.io.IOException; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -77,6 +81,24 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { // grouped on file id List rtSplits = new ArrayList<>(); try { + // Pre process tableConfig from first partition to fetch virtual key info + Option hoodieVirtualKeyInfo = Option.empty(); + if (partitionsToParquetSplits.size() > 0) { + HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionsToParquetSplits.keySet().iterator().next()); + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + if (!tableConfig.populateMetaFields()) { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { + MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); + hoodieVirtualKeyInfo = Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), + parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp()))); + } catch (Exception exception) { + throw new HoodieException("Fetching table schema failed with exception ", exception); + } + } + } + Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; partitionsToParquetSplits.keySet().forEach(partitionPath -> { // for each partition path obtain the data & log file groupings, then map back to inputsplits HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); @@ -121,7 +143,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, metaClient.getBasePath(), logFilePaths, maxCommitTime, eSplit.getBootstrapFileSplit())); } else { - rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime)); + rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime, finalHoodieVirtualKeyInfo)); } } catch (IOException e) { throw new HoodieIOException("Error creating hoodie real time split ", e); @@ -173,7 +195,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { }); return baseAndLogsList; } - + /** * Add a field to the existing fields projected. @@ -204,23 +226,34 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { return conf; } - public static void addRequiredProjectionFields(Configuration configuration) { + public static void addRequiredProjectionFields(Configuration configuration, Option hoodieVirtualKeyInfo) { // Need this to do merge records in HoodieRealtimeRecordReader - addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS); - addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS); - addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS); + if (!hoodieVirtualKeyInfo.isPresent()) { + addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS); + addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS); + addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_PARTITION_PATH_COL_POS); + } else { + HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get(); + addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex()); + addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField(), hoodieVirtualKey.getPartitionPathFieldIndex()); + } } - public static boolean requiredProjectionFieldsExistInConf(Configuration configuration) { + public static boolean requiredProjectionFieldsExistInConf(Configuration configuration, Option hoodieVirtualKeyInfo) { String readColNames = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""); - return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD) - && readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD) - && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + if (!hoodieVirtualKeyInfo.isPresent()) { + return readColNames.contains(HoodieRecord.RECORD_KEY_METADATA_FIELD) + && readColNames.contains(HoodieRecord.COMMIT_TIME_METADATA_FIELD) + && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + } else { + return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField()) + && readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField()); + } } public static boolean canAddProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) { return jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null - || (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf)); + || (!realtimeSplit.getDeltaLogPaths().isEmpty() && !HoodieRealtimeInputFormatUtils.requiredProjectionFieldsExistInConf(jobConf, realtimeSplit.getHoodieVirtualKeyInfo())); } /** diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java index 4e0adb0e3..ac857868c 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java @@ -18,6 +18,8 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.util.Option; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; @@ -67,7 +69,7 @@ public class TestHoodieRealtimeFileSplit { baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {}); maxCommitTime = "10001"; - split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime); + split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime, Option.empty()); } @Test @@ -97,6 +99,7 @@ public class TestHoodieRealtimeFileSplit { inorder.verify(out, times(1)).writeInt(eq(deltaLogPaths.size())); inorder.verify(out, times(1)).writeInt(eq(deltaLogPaths.get(0).length())); inorder.verify(out, times(1)).write(aryEq(deltaLogPaths.get(0).getBytes(StandardCharsets.UTF_8))); + inorder.verify(out, times(1)).writeBoolean(false); // verify there are no more interactions happened on the mocked object inorder.verifyNoMoreInteractions(); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 278539775..39112c14f 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -31,6 +32,7 @@ import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; @@ -69,6 +71,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -119,6 +122,16 @@ public class TestHoodieRealtimeRecordReader { jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveOrderedColumnNames); } + protected Properties getPropertiesForKeyGen() { + Properties properties = new Properties(); + properties.put(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), "false"); + properties.put("hoodie.datasource.write.recordkey.field","_row_key"); + properties.put("hoodie.datasource.write.partitionpath.field","partition_path"); + properties.put(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS.key(), "_row_key"); + properties.put(HoodieTableConfig.HOODIE_TABLE_PARTITION_FIELDS_PROP.key(), "partition_path"); + return properties; + } + @ParameterizedTest @MethodSource("testArguments") public void testReader(ExternalSpillableMap.DiskMapType diskMapType, @@ -175,7 +188,7 @@ public class TestHoodieRealtimeRecordReader { new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + baseInstant + ".parquet"), 0, 1, baseJobConf), basePath.toUri().toString(), fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) .map(h -> h.getPath().toString()).collect(Collectors.toList()), - instantTime); + instantTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -246,7 +259,7 @@ public class TestHoodieRealtimeRecordReader { String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime); + basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -326,7 +339,7 @@ public class TestHoodieRealtimeRecordReader { String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime); + basePath.toUri().toString(), Collections.singletonList(logFilePath), newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -472,7 +485,7 @@ public class TestHoodieRealtimeRecordReader { // create a split with baseFile (parquet file written earlier) and new log file(s) HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1_" + instantTime + ".parquet"), 0, 1, baseJobConf), - basePath.toUri().toString(), logFilePaths, newCommitTime); + basePath.toUri().toString(), logFilePaths, newCommitTime, Option.empty()); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 45b145288..398377dc6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -50,6 +50,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val confBroadcast = sc.broadcast(new SerializableWritable(config)) private val preCombineField = tableState.preCombineField + private val recordKeyFieldOpt = tableState.recordKeyFieldOpt private val payloadProps = if (preCombineField.isDefined) { Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps) } else { @@ -209,6 +210,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala private val keyToSkip = mutable.Set.empty[String] + private val recordKeyPosition = if (recordKeyFieldOpt.isEmpty) HOODIE_RECORD_KEY_COL_POS else tableState.tableStructSchema.fieldIndex(recordKeyFieldOpt.get) private var recordToLoad: InternalRow = _ @@ -216,7 +218,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override def hasNext: Boolean = { if (baseFileIterator.hasNext) { val curRow = baseFileIterator.next() - val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS) + val curKey = curRow.getString(recordKeyPosition) if (logRecords.containsKey(curKey)) { // duplicate key found, merging keyToSkip.add(curKey) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 47981d6cc..c283e93ba 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -120,6 +120,7 @@ object HoodieSparkSqlWriter { val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP) val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY) + val populateMetaFields = parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) @@ -130,7 +131,9 @@ object HoodieSparkSqlWriter { .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) .setPartitionFields(partitionColumns) - .setPopulateMetaFields(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean) + .setPopulateMetaFields(populateMetaFields) + .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD_OPT_KEY)) + .setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_OPT_KEY)) .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig } @@ -284,18 +287,22 @@ object HoodieSparkSqlWriter { val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP) val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY) + val keyGenProp = hoodieConfig.getString(HoodieTableConfig.HOODIE_TABLE_KEY_GENERATOR_CLASS) + val populateMetaFields = parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean HoodieTableMetaClient.withPropertyBuilder() - .setTableType(HoodieTableType.valueOf(tableType)) - .setTableName(tableName) - .setRecordKeyFields(recordKeyFields) - .setArchiveLogFolder(archiveLogFolder) - .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_OPT_KEY)) - .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) - .setBootstrapIndexClass(bootstrapIndexClass) - .setBootstrapBasePath(bootstrapBasePath) - .setPartitionFields(partitionColumns) - .initTable(sparkContext.hadoopConfiguration, path) + .setTableType(HoodieTableType.valueOf(tableType)) + .setTableName(tableName) + .setRecordKeyFields(recordKeyFields) + .setArchiveLogFolder(archiveLogFolder) + .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_OPT_KEY)) + .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) + .setBootstrapIndexClass(bootstrapIndexClass) + .setBootstrapBasePath(bootstrapBasePath) + .setPartitionFields(partitionColumns) + .setPopulateMetaFields(populateMetaFields) + .setKeyGeneratorClassProp(keyGenProp) + .initTable(sparkContext.hadoopConfiguration, path) } val jsc = new JavaSparkContext(sqlContext.sparkContext) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index af0d7a6c3..d7022f23b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -65,6 +65,9 @@ class IncrementalRelation(val sqlContext: SQLContext, throw new HoodieException(s"Specify the begin instant time to pull from using " + s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key}") } + if (!metaClient.getTableConfig.populateMetaFields()) { + throw new HoodieException("Incremental queries are not supported when meta fields are disabled") + } val useEndInstantSchema = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY.key, DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY.defaultValue).toBoolean diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 68da5f7ae..0d72698a4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -59,6 +59,9 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, throw new HoodieException(s"Specify the begin instant time to pull from using " + s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY.key}") } + if (!metaClient.getTableConfig.populateMetaFields()) { + throw new HoodieException("Incremental queries are not supported when meta fields are disabled") + } private val lastInstant = commitTimeline.lastInstant().get() private val mergeType = optParams.getOrElse( @@ -125,7 +128,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, tableAvroSchema.toString, requiredAvroSchema.toString, fileIndex, - preCombineField + preCombineField, + Option.empty ) val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( sparkSession = sqlContext.sparkSession, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 381df1132..07213feac 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -49,7 +49,8 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType, tableAvroSchema: String, requiredAvroSchema: String, hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit], - preCombineField: Option[String]) + preCombineField: Option[String], + recordKeyFieldOpt: Option[String]) class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val optParams: Map[String, String], @@ -87,6 +88,10 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key) } } + private var recordKeyFieldOpt = Option.empty[String] + if (!metaClient.getTableConfig.populateMetaFields()) { + recordKeyFieldOpt = Option(metaClient.getTableConfig.getRecordKeyFieldProp) + } override def schema: StructType = tableStructSchema override def needConversion: Boolean = false @@ -104,7 +109,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, tableAvroSchema.toString, requiredAvroSchema.toString, fileIndex, - preCombineField + preCombineField, + recordKeyFieldOpt ) val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( sparkSession = sqlContext.sparkSession, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java index 2df811374..34cb811d3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java @@ -110,7 +110,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class TestBootstrap extends HoodieClientTestBase { - public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,double,double,double,double," + public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,string,double,double,double,double," + "struct,array>,boolean"; @TempDir @@ -576,11 +576,11 @@ public class TestBootstrap extends HoodieClientTestBase { if (isPartitioned) { df = df.withColumn("datestr", callUDF("partgen", new Column("_row_key"))); // Order the columns to ensure generated avro schema aligns with Hive schema - df = df.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", + df = df.select("timestamp", "_row_key", "partition_path", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", "tip_history", "_hoodie_is_deleted", "datestr"); } else { // Order the columns to ensure generated avro schema aligns with Hive schema - df = df.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", + df = df.select("timestamp", "_row_key", "partition_path", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", "tip_history", "_hoodie_is_deleted"); } return df; diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 512760a11..be95d7737 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -394,7 +394,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { List((DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), true), (DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.ORC.name(), true), (DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), true), (DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.ORC.name(), true), - (DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), false)) + (DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), false), (DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, HoodieFileFormat.PARQUET.name(), false)) .foreach(t => { val tableType = t._1 val baseFileFormat = t._2