diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index f4f47d375..3b30c5b76 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -21,6 +21,9 @@ package org.apache.hudi.table.functional; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -43,12 +46,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE; @@ -56,6 +63,17 @@ import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE; @Tag("functional") public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFunctionalTestHarness { + private static Stream writeLogTest() { + // enable metadata table, enable embedded time line server + Object[][] data = new Object[][] { + {true, true}, + {true, false}, + {false, true}, + {false, false} + }; + return Stream.of(data).map(Arguments::of); + } + private HoodieTestDataGenerator dataGen; private SparkRDDWriteClient client; private HoodieTableMetaClient metaClient; @@ -104,6 +122,44 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti Assertions.assertEquals(300, readTableTotalRecordsNum()); } + @ParameterizedTest + @MethodSource("writeLogTest") + public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .forTable("test-trip-table") + .withPath(basePath()) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withAutoCommit(true) + .withEmbeddedTimelineServerEnabled(enableTimelineServer) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withLayoutConfig(HoodieLayoutConfig.newBuilder() + .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) + .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build(); + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + client = getHoodieWriteClient(config); + + final List records = dataGen.generateInserts("001", 100); + JavaRDD writeRecords = jsc().parallelize(records, 2); + + // initialize 100 records + client.upsert(writeRecords, client.startCommit()); + // update 100 records + client.upsert(writeRecords, client.startCommit()); + // schedule compaction + client.scheduleCompaction(Option.empty()); + // delete 50 records + List toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList()); + JavaRDD deleteRecords = jsc().parallelize(toBeDeleted, 2); + client.delete(deleteRecords, client.startCommit()); + // insert the same 100 records again + client.upsert(writeRecords, client.startCommit()); + Assertions.assertEquals(100, readTableTotalRecordsNum()); + } + private long readTableTotalRecordsNum() { return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), Arrays.stream(dataGen.getPartitionPaths()).map(p -> Paths.get(basePath(), p).toString()).collect(Collectors.toList()), basePath()).size(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 32c7125e3..4683fd691 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -162,7 +162,7 @@ public class FileSystemViewManager { HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); if (metadataConfig.enabled()) { ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view"); - return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), metadataSupplier.get()); } return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());