diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index ee105aa6b..d822ad658 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -24,12 +24,12 @@ import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; import org.apache.hudi.cli.testutils.HoodieTestCommitUtilities; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; @@ -75,7 +75,6 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .forTable("test-trip-table").build(); // Create six commits @@ -90,6 +89,11 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness { HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, hadoopConf()); } + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + HoodieTestUtils.createCompactionCommitInMetadataTable( + hadoopConf(), metaClient.getFs(), tablePath, "105"); + metaClient = HoodieTableMetaClient.reload(metaClient); // reload the timeline and get all the commits before archive metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index aefd209fc..b23c6fd15 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -48,6 +48,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.shell.core.CommandResult; import java.io.IOException; @@ -60,6 +61,7 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -204,15 +206,16 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness { /** * Test case of 'commits showarchived' command. */ - @Test - public void testShowArchivedCommits() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testShowArchivedCommits(boolean enableMetadataTable) throws Exception { // Generate archive HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1) .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) .forTable("test-trip-table").build(); // generate data and metadata @@ -229,6 +232,12 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness { Option.of(value[0]), Option.of(value[1])); } + if (enableMetadataTable) { + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + createCompactionCommitInMetadataTable(hadoopConf(), metaClient.getFs(), tablePath1, "104"); + } + // archive metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); @@ -251,15 +260,16 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness { assertEquals(expected, got); } - @Test - public void testShowArchivedCommitsWithMultiCommitsFile() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testShowArchivedCommitsWithMultiCommitsFile(boolean enableMetadataTable) throws Exception { // Generate archive HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1) .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) .forTable("test-trip-table").build(); // generate data and metadata @@ -269,6 +279,12 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness { data.put(String.valueOf(i), new Integer[] {i, i}); } + if (enableMetadataTable) { + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + createCompactionCommitInMetadataTable(hadoopConf(), metaClient.getFs(), tablePath1, "194"); + } + for (Map.Entry entry : data.entrySet()) { String key = entry.getKey(); Integer[] value = entry.getValue(); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java index c8bb94257..17c1002f6 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java @@ -24,7 +24,9 @@ import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; -import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.fs.NoOpConsistencyGuard; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -35,6 +37,7 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.CompactionTestUtils; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -152,7 +155,11 @@ public class TestCompactionCommand extends CLIFunctionalTestHarness { activeTimeline.transitionCompactionInflightToComplete( new HoodieInstant(HoodieInstant.State.INFLIGHT, COMPACTION_ACTION, timestamp), Option.empty()); }); - + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + HoodieTestUtils.createCompactionCommitInMetadataTable(hadoopConf(), + new HoodieWrapperFileSystem( + FSUtils.getFs(tablePath, hadoopConf()), new NoOpConsistencyGuard()), tablePath, "007"); } private void generateArchive() throws IOException { @@ -162,7 +169,6 @@ public class TestCompactionCommand extends CLIFunctionalTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .forTable("test-trip-table").build(); // archive HoodieTableMetaClient metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index c694e2480..a826cfa08 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -19,8 +19,6 @@ package org.apache.hudi.client; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; import org.apache.hudi.client.utils.MetadataConversionUtils; @@ -59,6 +57,8 @@ import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -429,7 +429,7 @@ public class HoodieTimelineArchiver { .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(), HoodieInstant.getComparableAction(i.getAction())))); - // If metadata table is enabled, do not archive instants which are more recent that the last compaction on the + // If metadata table is enabled, do not archive instants which are more recent than the last compaction on the // metadata table. if (config.isMetadataTableEnabled()) { try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(), @@ -447,7 +447,7 @@ public class HoodieTimelineArchiver { throw new HoodieException("Error limiting instant archival based on metadata table", e); } } - + return instants.flatMap(hoodieInstant -> groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index f24c4279b..3aeca0f27 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -19,7 +19,6 @@ package org.apache.hudi.client; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -540,7 +539,6 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) .withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 457b8b526..df0fed027 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -18,7 +18,6 @@ package org.apache.hudi.client; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; @@ -69,14 +68,9 @@ public class TestMultiFS extends HoodieClientTestHarness { } protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { - return getHoodieWriteConfig(basePath, HoodieMetadataConfig.ENABLE.defaultValue()); - } - - protected HoodieWriteConfig getHoodieWriteConfig(String basePath, boolean enableMetadata) { return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()) .build(); } @@ -84,21 +78,21 @@ public class TestMultiFS extends HoodieClientTestHarness { public void readLocalWriteHDFS() throws Exception { // Initialize table and filesystem HoodieTableMetaClient.withPropertyBuilder() - .setTableType(tableType) - .setTableName(tableName) - .setPayloadClass(HoodieAvroPayload.class) - .initTable(hadoopConf, dfsBasePath); + .setTableType(tableType) + .setTableName(tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(hadoopConf, dfsBasePath); // Create write client to write some records in - HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath, false); - HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath, false); + HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath); + HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath); HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setTableName(tableName) .setPayloadClass(HoodieAvroPayload.class) .setRecordKeyFields(localConfig.getProps().getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())) - .setPartitionFields(localConfig.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) + .setPartitionFields(localConfig.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) .initTable(hadoopConf, tablePath); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index 154ce5294..2ff67c3c9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -19,7 +19,6 @@ package org.apache.hudi.client.functional; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -105,10 +104,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness { private HoodieWriteConfig config; private void setUp(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields, true, true); + setUp(indexType, populateMetaFields, true); } - private void setUp(IndexType indexType, boolean populateMetaFields, boolean enableMetadata, boolean rollbackUsingMarkers) throws Exception { + private void setUp(IndexType indexType, boolean populateMetaFields, boolean rollbackUsingMarkers) throws Exception { this.indexType = indexType; initPath(); initSparkContexts(); @@ -122,8 +121,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness { config = getConfigBuilder() .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()) .withRollbackUsingMarkers(rollbackUsingMarkers) - .withIndexConfig(indexBuilder - .build()).withAutoCommit(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build()) + .withIndexConfig(indexBuilder.build()) + .withAutoCommit(false) .withLayoutConfig(HoodieLayoutConfig.newBuilder().fromProperties(indexBuilder.build().getProps()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()).build(); writeClient = getHoodieWriteClient(config); @@ -238,7 +237,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @ParameterizedTest @MethodSource("indexTypeParams") public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType, boolean populateMetaFields) throws Exception { - setUp(indexType, populateMetaFields, true, false); + setUp(indexType, populateMetaFields, false); String newCommitTime = writeClient.startCommit(); int totalRecords = 20 + random.nextInt(20); List records = dataGen.generateInserts(newCommitTime, totalRecords); @@ -385,8 +384,6 @@ public class TestHoodieIndex extends HoodieClientTestHarness { .withGlobalSimpleIndexUpdatePartitionPath(true) .withBloomIndexUpdatePartitionPath(true) .build()) - .withMetadataConfig( - HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); writeClient = getHoodieWriteClient(config); index = writeClient.getIndex(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java index 057968f6f..bdbc9e72d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java @@ -278,7 +278,7 @@ public class TestHoodieMetadataBootstrap extends TestHoodieMetadataBase { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .forTable("test-trip-table").build(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java index de17ddf0a..87bcad04b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java @@ -21,7 +21,6 @@ package org.apache.hudi.index.hbase; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; @@ -342,8 +341,7 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { // Load to memory HoodieWriteConfig config = getConfigBuilder(100, false, false) - .withRollbackUsingMarkers(false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); + .withRollbackUsingMarkers(false).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); @@ -431,8 +429,7 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness public void testEnsureTagLocationUsesCommitTimeline() throws Exception { // Load to memory HoodieWriteConfig config = getConfigBuilder(100, false, false) - .withRollbackUsingMarkers(false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); + .withRollbackUsingMarkers(false).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index b2f0ef4ea..652dbcb15 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -19,6 +19,7 @@ package org.apache.hudi.io; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; @@ -47,7 +48,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hadoop.conf.Configuration; @@ -58,6 +58,7 @@ import org.apache.log4j.Logger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; @@ -72,6 +73,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -213,7 +215,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception { - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200); // do ingestion and trigger archive actions here. for (int i = 1; i < 8; i++) { @@ -264,7 +266,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testMergeSmallArchiveFilesRecoverFromMergeFailed(boolean enableArchiveMerge) throws Exception { - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200); // do ingestion and trigger archive actions here. for (int i = 1; i < 8; i++) { @@ -317,7 +319,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testMergeSmallArchiveFilesRecoverFromDeleteFailed(boolean enableArchiveMerge) throws Exception { - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200); // do ingestion and trigger archive actions here. for (int i = 1; i < 8; i++) { @@ -362,7 +364,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerge) throws Exception { - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200); // do ingestion and trigger archive actions here. for (int i = 1; i < 8; i++) { @@ -390,7 +392,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testLoadArchiveTimelineWithUncompletedMergeArchiveFile(boolean enableArchiveMerge) throws Exception { - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200); + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 3, 2, enableArchiveMerge, 3, 209715200); for (int i = 1; i < 8; i++) { testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); archiveAndGetCommitsList(writeConfig); @@ -451,15 +453,16 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { assertEquals(originalCommits, commitsAfterArchival); } - @Test - public void testArchiveCommitSavepointNoHole() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable) throws Exception { init(); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table") .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) .build(); HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf()); @@ -472,6 +475,12 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { HoodieTable table = HoodieSparkTable.create(cfg, context); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); + if (enableMetadataTable) { + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "105"); + } + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); assertTrue(archiver.archiveIfRequired(context)); @@ -593,8 +602,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { verifyArchival(archivedInstants, getActiveCommitInstants(Arrays.asList("00000007", "00000008"), HoodieTimeline.DELTA_COMMIT_ACTION), commitsAfterArchival); } - @Test - public void testArchiveCommitTimeline() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchiveCommitTimeline(boolean enableMetadataTable) throws Exception { init(); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) @@ -602,7 +612,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -619,6 +629,12 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { HoodieTestDataGenerator.createCommitFile(basePath, "4", wrapperFs.getConf()); HoodieTestDataGenerator.createCommitFile(basePath, "5", wrapperFs.getConf()); + if (enableMetadataTable) { + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "5"); + } + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); boolean result = archiver.archiveIfRequired(context); @@ -713,10 +729,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { @Test public void testArchiveRollbacksAndCleanTestTable() throws Exception { - boolean enableMetadata = false; int minArchiveCommits = 2; int maxArchiveCommits = 9; - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, minArchiveCommits, maxArchiveCommits, 2); + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, minArchiveCommits, maxArchiveCommits, 2); // trigger 1 commit to add lot of files so that future cleans can clean them up testTable.doWriteOperation("00000001", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20); @@ -751,8 +766,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testArchiveCompletedRollbackAndClean(boolean isEmpty) throws Exception { + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + public void testArchiveCompletedRollbackAndClean(boolean isEmpty, boolean enableMetadataTable) throws Exception { init(); int minInstantsToKeep = 2; int maxInstantsToKeep = 10; @@ -762,7 +777,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -775,6 +790,12 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false, isEmpty || i % 2 == 0); } + if (enableMetadataTable) { + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, Integer.toString(99)); + } + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); @@ -790,8 +811,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), "Should have min instant"); } - @Test - public void testArchiveInflightClean() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchiveInflightClean(boolean enableMetadataTable) throws Exception { init(); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) @@ -799,7 +821,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -809,6 +831,12 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false); HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true); + if (enableMetadataTable) { + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "14"); + } + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); @@ -889,6 +917,35 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { "00000009", "00000010", "00000011", "00000012")), getActiveCommitInstants(Arrays.asList("00000013", "00000014")), commitsAfterArchival); } + @Test + public void testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 20); + int startInstantTime = 100; + int numCommits = 15; + int numExpectedArchived = 6; // "100" till "105" should be archived in this case + + for (int i = startInstantTime; i < startInstantTime + numCommits; i++) { + HoodieTestDataGenerator.createCommitFile(basePath, Integer.toString(i), wrapperFs.getConf()); + } + // Simulate a compaction commit in metadata table timeline + // so the archival in data table can happen + createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "105"); + + HoodieTable table = HoodieSparkTable.create(writeConfig, context); + HoodieTimelineArchiver archiveLog = new HoodieTimelineArchiver(writeConfig, table); + + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + assertEquals(numCommits, timeline.countInstants(), String.format("Loaded %d commits and the count should match", numCommits)); + assertTrue(archiveLog.archiveIfRequired(context)); + timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); + assertEquals(numCommits - numExpectedArchived, timeline.countInstants(), + "Since we have a compaction commit of 105 in metadata table timeline, we should never archive any commit after that"); + for (int i = startInstantTime + numExpectedArchived; i < startInstantTime + numCommits; i++) { + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, Integer.toString(i))), + String.format("Commit %d should not be archived", i)); + } + } + private Pair, List> archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException { metaClient.reloadActiveTimeline(); HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java index 0f308425b..5a19f0afe 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; @@ -36,8 +37,9 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -170,19 +172,29 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness { assertRows(inputRows, result, instantTime, fileNames); } - @Test - public void testInstantiationFailure() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testInstantiationFailure(boolean enableMetadataTable) { // init config and table HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) - .withPath("/dummypath/abc/").build(); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + .withPath("/dummypath/abc/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) + .build(); try { + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE); fail("Should have thrown exception"); } catch (HoodieInsertException ioe) { - // expected + // expected without metadata table + if (enableMetadataTable) { + fail("Should have thrown TableNotFoundException"); + } + } catch (TableNotFoundException e) { + // expected with metadata table + if (!enableMetadataTable) { + fail("Should have thrown HoodieInsertException"); + } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 8a1f4abd2..f1c83e188 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -114,6 +114,7 @@ import java.util.stream.Stream; import scala.Tuple3; +import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename; import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; @@ -272,7 +273,6 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build()) .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { @@ -442,7 +442,6 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); @@ -519,7 +518,6 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); @@ -648,7 +646,7 @@ public class TestCleaner extends HoodieClientTestBase { public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) @@ -811,7 +809,7 @@ public class TestCleaner extends HoodieClientTestBase { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); @@ -848,8 +846,8 @@ public class TestCleaner extends HoodieClientTestBase { public void testKeepLatestCommitsMOR() throws Exception { HoodieWriteConfig config = - HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build()) + HoodieWriteConfig.newBuilder().withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()) .build(); @@ -889,12 +887,15 @@ public class TestCleaner extends HoodieClientTestBase { @Test public void testCleanWithReplaceCommits() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withAssumeDatePartitioning(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .build(); - HoodieTestTable testTable = HoodieTestTable.of(metaClient); + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); + HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); String p0 = "2020/01/01"; String p1 = "2020/01/02"; @@ -903,7 +904,7 @@ public class TestCleaner extends HoodieClientTestBase { String file1P1C0 = UUID.randomUUID().toString(); testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); - HoodieCommitMetadata commitMetadata = generateCommitMetadata( + HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", Collections.unmodifiableMap(new HashMap>() { { put(p0, CollectionUtils.createImmutableList(file1P0C0)); @@ -911,6 +912,7 @@ public class TestCleaner extends HoodieClientTestBase { } }) ); + metadataWriter.update(commitMetadata, "00000000000001", false); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); @@ -926,7 +928,8 @@ public class TestCleaner extends HoodieClientTestBase { // notice that clustering generates empty inflight commit files Map partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0); String file2P0C1 = partitionAndFileId002.get(p0); - Pair replaceMetadata = generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1); + Pair replaceMetadata = + generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, file2P0C1); testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner @@ -940,7 +943,7 @@ public class TestCleaner extends HoodieClientTestBase { // notice that clustering generates empty inflight commit files Map partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1); String file3P1C2 = partitionAndFileId003.get(p1); - replaceMetadata = generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2); + replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, file1P1C0, file3P1C2); testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner @@ -955,11 +958,11 @@ public class TestCleaner extends HoodieClientTestBase { // notice that clustering generates empty inflight commit files Map partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0); String file4P0C3 = partitionAndFileId004.get(p0); - replaceMetadata = generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3); + replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, file2P0C1, file4P0C3); testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); // run cleaner - List hoodieCleanStatsFour = runCleaner(config); + List hoodieCleanStatsFour = runCleaner(config, 5); assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); @@ -969,12 +972,12 @@ public class TestCleaner extends HoodieClientTestBase { // make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created // notice that clustering generates empty inflight commit files - Map partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1); + Map partitionAndFileId005 = testTable.forReplaceCommit("00000000000006").getFileIdsWithBaseFilesInPartitions(p1); String file4P1C4 = partitionAndFileId005.get(p1); - replaceMetadata = generateReplaceCommitMetadata(p0, file3P1C2, file4P1C4); - testTable.addReplaceCommit("00000000000005", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); + replaceMetadata = generateReplaceCommitMetadata("00000000000006", p0, file3P1C2, file4P1C4); + testTable.addReplaceCommit("00000000000006", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue()); - List hoodieCleanStatsFive = runCleaner(config, 2); + List hoodieCleanStatsFive = runCleaner(config, 7); assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); @@ -982,9 +985,8 @@ public class TestCleaner extends HoodieClientTestBase { assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); } - private Pair generateReplaceCommitMetadata(String partition, - String replacedFileId, - String newFileId) { + private Pair generateReplaceCommitMetadata( + String instantTime, String partition, String replacedFileId, String newFileId) { HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata(); requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString()); requestedReplaceMetadata.setVersion(1); @@ -1005,7 +1007,7 @@ public class TestCleaner extends HoodieClientTestBase { if (!StringUtils.isNullOrEmpty(newFileId)) { HoodieWriteStat writeStat = new HoodieWriteStat(); writeStat.setPartitionPath(partition); - writeStat.setPath(newFileId); + writeStat.setPath(partition + "/" + getBaseFilename(instantTime, newFileId)); writeStat.setFileId(newFileId); replaceMetadata.addWriteStat(partition, writeStat); } @@ -1196,7 +1198,7 @@ public class TestCleaner extends HoodieClientTestBase { @MethodSource("argumentsForTestKeepLatestCommits") public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withIncrementalCleaningMode(enableIncrementalClean) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) @@ -1216,7 +1218,7 @@ public class TestCleaner extends HoodieClientTestBase { : UUID.randomUUID().toString(); testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); - HoodieCommitMetadata commitMetadata = generateCommitMetadata( + HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001", Collections.unmodifiableMap(new HashMap>() { { put(p0, CollectionUtils.createImmutableList(file1P0C0)); @@ -1240,7 +1242,7 @@ public class TestCleaner extends HoodieClientTestBase { String file2P0C1 = partitionAndFileId002.get(p0); String file2P1C1 = partitionAndFileId002.get(p1); testTable.forCommit("00000000000002").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); - commitMetadata = generateCommitMetadata(new HashMap>() { + commitMetadata = generateCommitMetadata("00000000000002", new HashMap>() { { put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); @@ -1261,9 +1263,9 @@ public class TestCleaner extends HoodieClientTestBase { .withBaseFilesInPartition(p0, file1P0C0) .withBaseFilesInPartition(p0, file2P0C1) .getFileIdsWithBaseFilesInPartitions(p0).get(p0); - commitMetadata = generateCommitMetadata(CollectionUtils - .createImmutableMap(p0, - CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); + commitMetadata = generateCommitMetadata("00000000000003", + CollectionUtils.createImmutableMap( + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); @@ -1278,8 +1280,9 @@ public class TestCleaner extends HoodieClientTestBase { .withBaseFilesInPartition(p0, file1P0C0) .withBaseFilesInPartition(p0, file2P0C1) .getFileIdsWithBaseFilesInPartitions(p0).get(p0); - commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap( - p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); + commitMetadata = generateCommitMetadata("00000000000004", + CollectionUtils.createImmutableMap( + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000004"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); @@ -1305,8 +1308,8 @@ public class TestCleaner extends HoodieClientTestBase { // No cleaning on partially written file, with no commit. testTable.forCommit("00000000000005").withBaseFilesInPartition(p0, file3P0C2); - commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0, - CollectionUtils.createImmutableList(file3P0C2))); + commitMetadata = generateCommitMetadata("00000000000005", + CollectionUtils.createImmutableMap(p0, CollectionUtils.createImmutableList(file3P0C2))); metaClient.getActiveTimeline().createNewInstant( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005")); metaClient.getActiveTimeline().transitionRequestedToInflight( @@ -1378,7 +1381,7 @@ public class TestCleaner extends HoodieClientTestBase { @Test public void testCleaningWithZeroPartitionPaths() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .build(); @@ -1402,7 +1405,7 @@ public class TestCleaner extends HoodieClientTestBase { @Test public void testKeepLatestCommitsWithPendingCompactions() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .build(); @@ -1426,7 +1429,7 @@ public class TestCleaner extends HoodieClientTestBase { public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(true).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()) .build(); @@ -1677,14 +1680,15 @@ public class TestCleaner extends HoodieClientTestBase { return Stream.concat(stream1, stream2); } - private static HoodieCommitMetadata generateCommitMetadata(Map> partitionToFilePaths) { + private static HoodieCommitMetadata generateCommitMetadata( + String instantTime, Map> partitionToFilePaths) { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - partitionToFilePaths.forEach((key, value) -> value.forEach(f -> { + partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> { HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setPartitionPath(key); - writeStat.setPath(f); + writeStat.setPartitionPath(partitionPath); + writeStat.setPath(partitionPath + "/" + getBaseFilename(instantTime, f)); writeStat.setFileId(f); - metadata.addWriteStat(key, writeStat); + metadata.addWriteStat(partitionPath, writeStat); })); return metadata; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index e6df53740..1b613949c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -208,8 +207,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness boolean populateMetaFields = true; // insert 100 records HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM, - 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); + 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig config = cfgBuilder.build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 1e5f8029a..61d9d1d03 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -21,7 +21,6 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -219,7 +218,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { final String testPartitionPath = "2016/09/26"; int totalInsertNum = 2000; - HoodieWriteConfig config = makeHoodieClientConfigBuilder().withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + HoodieWriteConfig config = makeHoodieClientConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0) .insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index 7c73c74f3..87d861330 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -20,7 +20,6 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -53,7 +52,6 @@ public class TestAsyncCompaction extends CompactionTestBase { private HoodieWriteConfig getConfig(Boolean autoCommit) { return getConfigBuilder(autoCommit) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index c28758241..9afe5f353 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -21,7 +21,6 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; @@ -159,7 +158,6 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { // insert 100 records HoodieWriteConfig config = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { String newCommitTime = "100"; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 877f0e29c..86dd3b361 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -20,7 +20,6 @@ package org.apache.hudi.table.action.rollback; import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieLogFile; @@ -163,7 +162,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .withRollbackUsingMarkers(false) - .withPath(basePath).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); + .withPath(basePath).build(); try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { client.startCommitWithTime("001"); client.insert(jsc.emptyRDD(), "001"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java index 272208558..5df7b4dae 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java @@ -20,7 +20,6 @@ package org.apache.hudi.table.functional; import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; @@ -86,7 +85,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF Properties props = new Properties(); props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString()); HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); - HoodieWriteConfig cfg = getConfigBuilder(true).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build(); + HoodieWriteConfig cfg = getConfigBuilder(true).build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { /* diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 47e99353e..d55295503 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -155,7 +155,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction HoodieMetadataConfig.newBuilder() .enable(false) .build()); - + addConfigsForPopulateMetaFields(cfgBuilder, true); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -325,8 +325,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction boolean populateMetaFields = true; HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false) // Timeline-server-based markers are not used for multi-rollback tests - .withMarkersType(MarkerType.DIRECT.name()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); + .withMarkersType(MarkerType.DIRECT.name()); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -387,8 +386,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction // WriteClient with custom config (disable small file handling) HoodieWriteConfig smallFileWriteConfig = getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields) // Timeline-server-based markers are not used for multi-rollback tests - .withMarkersType(MarkerType.DIRECT.name()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); + .withMarkersType(MarkerType.DIRECT.name()).build(); try (SparkRDDWriteClient nClient = getHoodieWriteClient(smallFileWriteConfig)) { nClient.startCommitWithTime(newCommitTime); @@ -519,8 +517,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction void testMORTableRestore(boolean restoreAfterCompaction) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false) // Timeline-server-based markers are not used for multi-rollback tests - .withMarkersType(MarkerType.DIRECT.name()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); + .withMarkersType(MarkerType.DIRECT.name()); HoodieWriteConfig cfg = cfgBuilder.build(); Properties properties = new Properties(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index c623c2f5d..f9c9898f2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.testutils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; @@ -25,6 +26,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.metadata.HoodieTableMetadata; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; @@ -176,4 +178,17 @@ public class HoodieTestUtils { } return writeStatList; } + + public static void createCompactionCommitInMetadataTable( + Configuration hadoopConf, HoodieWrapperFileSystem wrapperFs, String basePath, + String instantTime) throws IOException { + // This is to simulate a completed compaction commit in metadata table timeline, + // so that the commits on data table timeline can be archived + // Note that, if metadata table is enabled, instants in data table timeline, + // which are more recent than the last compaction on the metadata table, + // are not archived (HoodieTimelineArchiveLog::getInstantsToArchive) + String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); + HoodieTestUtils.init(hadoopConf, metadataTableBasePath, HoodieTableType.MERGE_ON_READ); + HoodieTestDataGenerator.createCommitFile(metadataTableBasePath, instantTime + "001", wrapperFs.getConf()); + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index 21a645660..0d7b7ebbc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -253,7 +253,6 @@ public class TestBootstrap extends HoodieClientTestBase { .withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName()) .withBootstrapParallelism(3) .withBootstrapModeSelector(bootstrapModeSelectorClass).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .build(); SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); client.bootstrap(Option.empty()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index 1992b9777..66bf0be0e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -29,7 +29,6 @@ import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelect import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.index.BootstrapIndex; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieFileFormat; @@ -246,7 +245,6 @@ public class TestOrcBootstrap extends HoodieClientTestBase { .withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName()) .withBootstrapParallelism(3) .withBootstrapModeSelector(bootstrapModeSelectorClass).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .build(); SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); client.bootstrap(Option.empty()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index d94c9acfd..58b36f833 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -62,7 +62,8 @@ class TestCOWDataSource extends HoodieClientTestBase { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1" ) val verificationCol: String = "driver" @@ -465,15 +466,10 @@ class TestCOWDataSource extends HoodieClientTestBase { } private def getDataFrameWriter(keyGenerator: String): DataFrameWriter[Row] = { - getDataFrameWriter(keyGenerator, true) - } - - private def getDataFrameWriter(keyGenerator: String, enableMetadata: Boolean): DataFrameWriter[Row] = { val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) - val opts = commonOpts ++ Map(HoodieMetadataConfig.ENABLE.key() -> String.valueOf(enableMetadata)) inputDF.write.format("hudi") - .options(opts) + .options(commonOpts) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key, keyGenerator) .mode(SaveMode.Overwrite) } @@ -501,7 +497,7 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0) // Mixed fieldType - writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false) + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") @@ -513,7 +509,7 @@ class TestCOWDataSource extends HoodieClientTestBase { concat(col("driver"), lit("/"), col("rider"), lit("/"), udf_date_format(col("current_ts")))).count() == 0) // Test invalid partitionKeyType - writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, false) + writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName) writer = writer.partitionBy("current_ts:DUMMY") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") @@ -780,7 +776,6 @@ class TestCOWDataSource extends HoodieClientTestBase { .option("hoodie.keep.min.commits", "4") .option("hoodie.keep.max.commits", "5") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(HoodieMetadataConfig.ENABLE.key(), value = false) .mode(SaveMode.Append) .save(basePath) } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index afd888e96..a6fdf00d1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -22,16 +22,19 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -44,6 +47,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; @@ -811,7 +815,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0")); - cfg.configs.add(HoodieMetadataConfig.ENABLE.key() + "=false"); + cfg.configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); @@ -862,8 +866,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1")); configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2")); configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3")); - configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, asyncClean)); - configs.add(HoodieMetadataConfig.ENABLE.key() + "=false"); + configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN.key(), asyncClean)); + configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1")); + if (asyncClean) { + configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())); + configs.add(String.format("%s=%s", HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(), + HoodieFailedWritesCleaningPolicy.LAZY.name())); + configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + InProcessLockProvider.class.getName())); + } cfg.configs = configs; cfg.continuousMode = false; ds = new HoodieDeltaStreamer(cfg, jsc); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index 87034bd8f..708d45477 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -64,7 +64,11 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness { @Test public void testHoodieIncrSource() throws IOException { HoodieWriteConfig writeConfig = getConfigBuilder(basePath) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2,3).retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build(); + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .archiveCommitsWith(2, 3).retainCommits(1).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .build(); SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); Pair> inserts = writeRecords(writeClient, true, null, "100");