From c8617d93904bbc17088bc7e8727b04e0a2be3395 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Fri, 19 Nov 2021 17:02:21 -0800 Subject: [PATCH] [HUDI-2472] Enabling metadata table for TestHoodieMergeOnReadTable and TestHoodieCompactor (#4023) --- .../testutils/HoodieWriteableTestTable.java | 36 ++++++++++------ .../HoodieFlinkWriteableTestTable.java | 18 +++++--- .../table/TestHoodieMergeOnReadTable.java | 43 +++++++++++-------- .../action/compact/TestHoodieCompactor.java | 20 ++------- .../HoodieSparkWriteableTestTable.java | 22 ++++++++-- .../common/testutils/HoodieTestTable.java | 1 + 6 files changed, 84 insertions(+), 56 deletions(-) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index 0c4c7712a..e8fda35b3 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -33,8 +33,9 @@ import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.testutils.FileCreateUtils; -import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.io.storage.HoodieAvroParquetConfig; import org.apache.hudi.io.storage.HoodieOrcConfig; @@ -47,6 +48,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.orc.CompressionKind; @@ -56,7 +58,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; import java.nio.file.Paths; -import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,15 +66,21 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; -public class HoodieWriteableTestTable extends HoodieTestTable { +public class HoodieWriteableTestTable extends HoodieMetadataTestTable { private static final Logger LOG = LogManager.getLogger(HoodieWriteableTestTable.class); protected final Schema schema; protected final BloomFilter filter; protected final boolean populateMetaFields; - protected HoodieWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) { - super(basePath, fs, metaClient); + protected HoodieWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, + Schema schema, BloomFilter filter) { + this(basePath, fs, metaClient, schema, filter, null); + } + + protected HoodieWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, + BloomFilter filter, HoodieTableMetadataWriter metadataWriter) { + super(basePath, fs, metaClient, metadataWriter); this.schema = schema; this.filter = filter; this.populateMetaFields = metaClient.getTableConfig().populateMetaFields(); @@ -139,19 +147,18 @@ public class HoodieWriteableTestTable extends HoodieTestTable { return this; } - public HoodieWriteableTestTable withLogAppends(HoodieRecord... records) throws Exception { - return withLogAppends(Arrays.asList(records)); - } - - public HoodieWriteableTestTable withLogAppends(List records) throws Exception { - for (List groupedRecords: records.stream() + public Map> withLogAppends(List records) throws Exception { + Map> partitionToLogfilesMap = new HashMap<>(); + for (List groupedRecords : records.stream() .collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) { - appendRecordsToLogFile(groupedRecords); + final Pair appendedLogFile = appendRecordsToLogFile(groupedRecords); + partitionToLogfilesMap.computeIfAbsent( + appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue()); } - return this; + return partitionToLogfilesMap; } - private void appendRecordsToLogFile(List groupedRecords) throws Exception { + private Pair appendRecordsToLogFile(List groupedRecords) throws Exception { String partitionPath = groupedRecords.get(0).getPartitionPath(); HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation(); try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) @@ -170,6 +177,7 @@ public class HoodieWriteableTestTable extends HoodieTestTable { return null; } }).collect(Collectors.toList()), header)); + return Pair.of(partitionPath, logWriter.getLogFile()); } } } diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java index 60ae294e6..50e8f776a 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; @@ -38,7 +39,9 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -105,14 +108,18 @@ public class HoodieFlinkWriteableTestTable extends HoodieWriteableTestTable { return (HoodieFlinkWriteableTestTable) withInserts(partition, fileId, records, new org.apache.hudi.client.FlinkTaskContextSupplier(null)); } - public HoodieFlinkWriteableTestTable withLogAppends(List records) throws Exception { - for (List groupedRecords: records.stream().collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) { - appendRecordsToLogFile(groupedRecords); + public Map> withLogAppends(List records) throws Exception { + Map> partitionToLogfilesMap = new HashMap<>(); + for (List groupedRecords : records.stream().collect( + Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) { + final Pair appendedLogFile = appendRecordsToLogFile(groupedRecords); + partitionToLogfilesMap.computeIfAbsent( + appendedLogFile.getKey(), k -> new ArrayList<>()).add(appendedLogFile.getValue()); } - return this; + return partitionToLogfilesMap; } - private void appendRecordsToLogFile(List groupedRecords) throws Exception { + private Pair appendRecordsToLogFile(List groupedRecords) throws Exception { String partitionPath = groupedRecords.get(0).getPartitionPath(); HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation(); try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) @@ -131,6 +138,7 @@ public class HoodieFlinkWriteableTestTable extends HoodieWriteableTestTable { return null; } }).collect(Collectors.toList()), header)); + return Pair.of(partitionPath, logWriter.getLogFile()); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 595d4df2a..7674c3489 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -34,13 +34,14 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.Transformations; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor; import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; @@ -63,6 +64,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -190,11 +192,13 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness } } + // TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed @ParameterizedTest - @ValueSource(booleans = {true, false}) + @ValueSource(booleans = {true}) public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws Exception { // insert 100 records - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig config = cfgBuilder.build(); @@ -208,37 +212,40 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness // Update all the 100 records newCommitTime = "101"; - writeClient.startCommitWithTime(newCommitTime); - List updatedRecords = dataGen.generateUpdates(newCommitTime, records); JavaRDD updatedRecordsRDD = jsc().parallelize(updatedRecords, 1); HoodieReadClient readClient = new HoodieReadClient(context(), config); - updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect(); + JavaRDD updatedTaggedRecordsRDD = readClient.tagLocation(updatedRecordsRDD); + + writeClient.startCommitWithTime(newCommitTime); + writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect(); // Write them to corresponding avro logfiles metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); - HoodieSparkWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS) - .withLogAppends(updatedRecords); - // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state - ((SyncableFileSystemView) (table.getSliceView())).reset(); + + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( + writeClient.getEngineContext().getHadoopConf().get(), config, writeClient.getEngineContext()); + HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable + .of(metaClient, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, metadataWriter); + + Set allPartitions = updatedRecords.stream() + .map(record -> record.getPartitionPath()) + .collect(Collectors.groupingBy(partitionPath -> partitionPath)) + .keySet(); + assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length); // Verify that all data file has one log file + HoodieTable table = HoodieSparkTable.create(config, context(), metaClient, true); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); for (FileSlice fileSlice : groupedLogFiles) { - assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file"); + assertEquals(1, fileSlice.getLogFiles().count(), + "There should be 1 log file written for the latest data file - " + fileSlice); } } - // Mark 2nd delta-instant as completed - metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, - HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime)); - metaClient.getActiveTimeline().saveAsComplete( - new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); - // Do a compaction String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); JavaRDD result = (JavaRDD) writeClient.compact(compactionInstantTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 6b837e317..454c289db 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -46,7 +46,6 @@ import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; @@ -57,9 +56,6 @@ import org.junit.jupiter.api.Test; import java.util.List; import java.util.stream.Collectors; -import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; -import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -163,7 +159,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { // insert 100 records HoodieWriteConfig config = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { String newCommitTime = "100"; @@ -176,19 +172,14 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { // Update all the 100 records HoodieTable table = HoodieSparkTable.create(config, context); newCommitTime = "101"; - writeClient.startCommitWithTime(newCommitTime); List updatedRecords = dataGen.generateUpdates(newCommitTime, records); JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); HoodieIndex index = new HoodieBloomIndex<>(config, SparkHoodieBloomIndexHelper.getInstance()); - updatedRecords = tagLocation(index, updatedRecordsRDD, table).collect(); + JavaRDD updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table); - // Write them to corresponding avro logfiles. Also, set the state transition properly. - HoodieSparkWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS) - .withLogAppends(updatedRecords); - metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, - HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); - writeClient.commit(newCommitTime, jsc.emptyRDD(), Option.empty()); + writeClient.startCommitWithTime(newCommitTime); + writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect(); metaClient.reloadActiveTimeline(); // Verify that all data file has one log file @@ -200,9 +191,6 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file"); } } - createDeltaCommit(basePath, newCommitTime); - createRequestedDeltaCommit(basePath, newCommitTime); - createInflightDeltaCommit(basePath, newCommitTime); // Do a compaction table = HoodieSparkTable.create(config, context); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java index 8e37c92d3..ca7bb4e01 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkWriteableTestTable.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; @@ -39,12 +40,20 @@ import java.util.UUID; public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable { private static final Logger LOG = LogManager.getLogger(HoodieSparkWriteableTestTable.class); - private HoodieSparkWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) { - super(basePath, fs, metaClient, schema, filter); + private HoodieSparkWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, + BloomFilter filter, HoodieTableMetadataWriter metadataWriter) { + super(basePath, fs, metaClient, schema, filter, metadataWriter); } public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) { - return new HoodieSparkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter); + return new HoodieSparkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), + metaClient, schema, filter, null); + } + + public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter, + HoodieTableMetadataWriter metadataWriter) { + return new HoodieSparkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), + metaClient, schema, filter, metadataWriter); } public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) { @@ -53,6 +62,13 @@ public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable { return of(metaClient, schema, filter); } + public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, + HoodieTableMetadataWriter metadataWriter) { + BloomFilter filter = BloomFilterFactory + .createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); + return of(metaClient, schema, filter, metadataWriter); + } + public static HoodieSparkWriteableTestTable of(HoodieTable hoodieTable, Schema schema) { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); return of(metaClient, schema); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 95d0657cb..b6ea32db2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -131,6 +131,7 @@ public class HoodieTestTable { this.basePath = basePath; this.fs = fs; this.metaClient = metaClient; + testTableState = HoodieTestTableState.of(); } public static HoodieTestTable of(HoodieTableMetaClient metaClient) {