From eeafd24f4c3a65e107867e30eb499b4aec69d7e5 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 24 Sep 2021 01:10:11 +0530 Subject: [PATCH] [HUDI-2395] Metadata tests rewrite (#3695) - Added commit metadata infra to test table so that we can test entire metadata using test table itself. These tests don't care about the contents of files as such and hence we should be able to test all code paths for metadata using test table. Co-authored-by: Sivabalan Narayanan --- .../functional/TestHoodieBackedMetadata.java | 1195 ++++++----------- .../testutils/HoodieClientTestHarness.java | 204 ++- .../org/apache/hudi/common/fs/FSUtils.java | 3 + .../apache/hudi/common/fs/TestFSUtils.java | 4 + .../common/testutils/FileCreateUtils.java | 18 + .../common/testutils/FileSystemTestUtils.java | 6 +- .../common/testutils/HoodieTestTable.java | 543 +++++++- 7 files changed, 1197 insertions(+), 776 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index b091359b2..803c5b9d8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,7 +18,6 @@ package org.apache.hudi.client.functional; -import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -27,6 +26,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; @@ -52,10 +52,8 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.exception.HoodieMetadataException; -import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; -import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieMetadataMetrics; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -67,32 +65,42 @@ import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT; +import static org.apache.hudi.common.model.WriteOperationType.DELETE; +import static org.apache.hudi.common.model.WriteOperationType.INSERT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("functional") @@ -100,12 +108,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class); - @TempDir - public java.nio.file.Path tempFolder; - + private static HoodieTestTable testTable; private String metadataTableBasePath; - private HoodieTableType tableType; + private HoodieWriteConfig writeConfig; public void init(HoodieTableType tableType) throws IOException { this.tableType = tableType; @@ -116,7 +122,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); - + writeConfig = getWriteConfig(true, true); + testTable = HoodieTestTable.of(metaClient); } @AfterEach @@ -124,136 +131,51 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { cleanupResources(); } - /** - * Metadata Table bootstrap scenarios. - */ - @Test - public void testMetadataTableBootstrap() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - // Metadata table should not exist until created for the first time - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); - assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); - - // Metadata table is not created if disabled by config - String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - client.startCommitWithTime(firstCommitTime); - client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 5)), firstCommitTime); - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created"); - assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); - } - - // Metadata table should not be created if any non-complete instants are present - String secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, true), true)) { - client.startCommitWithTime(secondCommitTime); - client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime); - // AutoCommit is false so no bootstrap - client.syncTableMetadata(); - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created"); - assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); - // rollback this commit - client.rollback(secondCommitTime); - } - - // Metadata table created when enabled by config & sync is called - secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime(secondCommitTime); - client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime); - client.syncTableMetadata(); - assertTrue(fs.exists(new Path(metadataTableBasePath))); - validateMetadata(client); - } - - // Delete all existing instants on dataset to simulate archiving. This should trigger a re-bootstrap of the metadata - // table as last synched instant has been "archived". - final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); - - Arrays.stream(fs.listStatus(new Path(metaClient.getMetaPath()))).filter(status -> status.getPath().getName().matches("^\\d+\\..*")) - .forEach(status -> { - try { - fs.delete(status.getPath(), false); - } catch (IOException e) { - LOG.warn("Error when deleting instant " + status + ": " + e); - } - }); - - String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime(thirdCommitTime); - client.insert(jsc.parallelize(dataGen.generateUpdates(thirdCommitTime, 2)), thirdCommitTime); - client.syncTableMetadata(); - assertTrue(fs.exists(new Path(metadataTableBasePath))); - validateMetadata(client); - - // Metadata Table should not have previous delta-commits as it was re-bootstrapped - assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); - assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime)))); - } + public static List bootstrapAndTableOperationTestArgs() { + return asList( + Arguments.of(COPY_ON_WRITE, true), + Arguments.of(COPY_ON_WRITE, false), + Arguments.of(MERGE_ON_READ, true), + Arguments.of(MERGE_ON_READ, false) + ); } /** - * Test enable/disable sync via the config. + * Metadata Table bootstrap scenarios. */ - @Test - public void testSyncConfig() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + @ParameterizedTest + @MethodSource("bootstrapAndTableOperationTestArgs") + public void testMetadataTableBootstrap(HoodieTableType tableType, boolean addRollback) throws Exception { + init(tableType); + // bootstrap with few commits + doWriteOperationsAndBootstrapMetadata(testTable); - // Create the metadata table - String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime(firstCommitTime); - client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 2)), firstCommitTime); - client.syncTableMetadata(); - assertTrue(fs.exists(new Path(metadataTableBasePath))); - validateMetadata(client); + if (addRollback) { + // trigger an UPSERT that will be rolled back + testTable.doWriteOperation("003", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); + // rollback last commit + testTable = testTable.doRollback("003", "004"); + syncAndValidate(testTable); } - // If sync is disabled, the table will not sync - HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(true).enableMetrics(false).enableSync(false).build()).build(); - final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; - String secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config, true)) { - client.startCommitWithTime(secondCommitTime); - client.insert(jsc.parallelize(dataGen.generateInserts(secondCommitTime, 2)), secondCommitTime); - client.syncTableMetadata(); + testTable.doWriteOperation("005", INSERT, asList("p1", "p2"), 4); + syncAndValidate(testTable); - // Metadata Table should not have synced - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); - assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); - } - - // If sync is enabled, the table will sync - String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime(thirdCommitTime); - client.insert(jsc.parallelize(dataGen.generateInserts(thirdCommitTime, 2)), thirdCommitTime); - client.syncTableMetadata(); - - // Metadata Table should have synced - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime)))); - } + // trigger an upsert and validate + testTable.doWriteOperation("006", UPSERT, singletonList("p3"), + asList("p1", "p2", "p3"), 4); + syncAndValidate(testTable, true); } /** * Only valid partition directories are added to the metadata. */ - @Test - public void testOnlyValidPartitionsAdded() throws Exception { + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Exception { // This test requires local file system - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - + init(tableType); // Create an empty directory which is not a partition directory (lacks partition metadata) final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition"; Files.createDirectories(Paths.get(basePath, nonPartitionDirectory)); @@ -265,49 +187,82 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { final String filteredDirectoryThree = ".backups"; // Create some commits - HoodieTestTable testTable = HoodieTestTable.of(metaClient); testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne, filteredDirectoryTwo, filteredDirectoryThree) .addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10) .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10); - final HoodieWriteConfig writeConfig = - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false) + writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { - client.startCommitWithTime("005"); - client.insert(jsc.emptyRDD(), "005"); + testTable.doWriteOperation("003", UPSERT, emptyList(), asList("p1", "p2"), 1, true); + syncTableMetadata(writeConfig); - List partitions = metadataWriter(client).metadata().getAllPartitionPaths(); - assertFalse(partitions.contains(nonPartitionDirectory), - "Must not contain the non-partition " + nonPartitionDirectory); - assertTrue(partitions.contains("p1"), "Must contain partition p1"); - assertTrue(partitions.contains("p2"), "Must contain partition p2"); + List partitions = metadataWriter(writeConfig).metadata().getAllPartitionPaths(); + assertFalse(partitions.contains(nonPartitionDirectory), + "Must not contain the non-partition " + nonPartitionDirectory); + assertTrue(partitions.contains("p1"), "Must contain partition p1"); + assertTrue(partitions.contains("p2"), "Must contain partition p2"); - assertFalse(partitions.contains(filteredDirectoryOne), - "Must not contain the filtered directory " + filteredDirectoryOne); - assertFalse(partitions.contains(filteredDirectoryTwo), - "Must not contain the filtered directory " + filteredDirectoryTwo); - assertFalse(partitions.contains(filteredDirectoryThree), - "Must not contain the filtered directory " + filteredDirectoryThree); + assertFalse(partitions.contains(filteredDirectoryOne), + "Must not contain the filtered directory " + filteredDirectoryOne); + assertFalse(partitions.contains(filteredDirectoryTwo), + "Must not contain the filtered directory " + filteredDirectoryTwo); + assertFalse(partitions.contains(filteredDirectoryThree), + "Must not contain the filtered directory " + filteredDirectoryThree); - FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1")); - assertEquals(2, statuses.length); - statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2")); - assertEquals(5, statuses.length); - Map partitionsToFilesMap = metadata(client).getAllFilesInPartitions( - Arrays.asList(basePath + "/p1", basePath + "/p2")); - assertEquals(2, partitionsToFilesMap.size()); - assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length); - assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length); - } + FileStatus[] statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p1")); + assertEquals(tableType == COPY_ON_WRITE ? 3 : 4, statuses.length); + statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p2")); + assertEquals(tableType == COPY_ON_WRITE ? 6 : 7, statuses.length); + Map partitionsToFilesMap = metadata(writeConfig, context).getAllFilesInPartitions(asList(basePath + "/p1", basePath + "/p2")); + assertEquals(2, partitionsToFilesMap.size()); + assertEquals(tableType == COPY_ON_WRITE ? 3 : 4, partitionsToFilesMap.get(basePath + "/p1").length); + assertEquals(tableType == COPY_ON_WRITE ? 6 : 7, partitionsToFilesMap.get(basePath + "/p2").length); } /** * Test various table operations sync to Metadata Table correctly. */ @ParameterizedTest + @MethodSource("bootstrapAndTableOperationTestArgs") + public void testTableOperations(HoodieTableType tableType, boolean doNotSyncFewCommits) throws Exception { + init(tableType); + // bootstrap w/ 2 commits + doWriteOperationsAndBootstrapMetadata(testTable); + + // trigger an upsert + testTable.doWriteOperation("003", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable); + + // trigger compaction + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("004", asList("p1", "p2")); + syncAndValidate(testTable); + } + + // trigger an upsert + testTable.doWriteOperation("005", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + if (doNotSyncFewCommits) { + syncAndValidate(testTable, emptyList(), true, false, true); + } + + // trigger clean + testTable.doCleanBasedOnCommits("006", singletonList("001")); + if (doNotSyncFewCommits) { + syncAndValidate(testTable, emptyList(), true, false, false); + } + + // trigger delete + testTable.doWriteOperation("007", DELETE, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), true, true, false); + } + + /** + * Test several table operations with restore. This test uses SparkRDDWriteClient. + * Once the restore support is ready in HoodieTestTable, then rewrite this test. + */ + @ParameterizedTest @EnumSource(HoodieTableType.class) - public void testTableOperations(HoodieTableType tableType) throws Exception { + public void testTableOperationsWithRestore(HoodieTableType tableType) throws Exception { init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); @@ -391,218 +346,64 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } /** - * Test rollback of various table operations sync to Metadata Table correctly. + * Tests rollback of a commit with metadata enabled. */ @ParameterizedTest @EnumSource(HoodieTableType.class) public void testRollbackOperations(HoodieTableType tableType) throws Exception { init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + // bootstrap w/ 2 commits + doWriteOperationsAndBootstrapMetadata(testTable); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Write 1 (Bulk insert) - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); + // trigger an upsert + testTable.doWriteOperation("003", UPSERT, emptyList(), asList("p1", "p2"), 2); + syncAndValidate(testTable); - // Write 2 (inserts) + Rollback of inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 20); - writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); + // trigger a commit and rollback + testTable.doWriteOperation("004", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 3); + syncTableMetadata(writeConfig); + // rollback last commit + testTable = testTable.doRollback("004", "005"); + syncAndValidate(testTable); - // Write 3 (updates) + Rollback of updates - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 20); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); + // trigger few upserts and validate + for (int i = 6; i < 10; i++) { + testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + } + syncAndValidate(testTable); - // Rollback of updates and inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 10); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); + testTable.doWriteOperation("010", UPSERT, emptyList(), asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable); - // Rollback of Compaction - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - validateMetadata(client); - } + // rollback last commit. sync and validate. + testTable.doRollback("010", "011"); + syncTableMetadata(writeConfig); - // Rollback of Deletes - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateDeletes(newCommitTime, 10); - JavaRDD deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey()); - client.startCommitWithTime(newCommitTime); - writeStatuses = client.delete(deleteKeys, newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); - - // Rollback of Clean - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.clean(newCommitTime); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); + // rollback of compaction + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("012", asList("p1", "p2")); + syncTableMetadata(writeConfig); + testTable.doRollback("012", "013"); + syncTableMetadata(writeConfig); } - // Rollback of partial commits - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) { - // Write updates and inserts - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); - } + // roll back of delete + testTable.doWriteOperation("014", DELETE, emptyList(), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); + testTable.doRollback("014", "015"); + syncTableMetadata(writeConfig); - // Marker based rollback of partial commits - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) { - // Write updates and inserts - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); - } - } + // rollback partial commit + writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build(); + testTable.doWriteOperation("016", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + testTable.doRollback("016", "017"); + syncTableMetadata(writeConfig); - /** - * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata. - * Once explicit sync is called, metadata should match. - */ - @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception { - init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Initialize table with metadata - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - } - - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // Commit with metadata disabled - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - client.rollback(newCommitTime); - } - - try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) { - assertFalse(metadata(client).isInSync()); - client.syncTableMetadata(); - validateMetadata(client); - } - - // If an unsynced commit is automatically rolled back during next commit, the rollback commit gets a timestamp - // greater than than the new commit which is started. Ensure that in this case the rollback is not processed - // as the earlier failed commit would not have been committed. - // - // Dataset: C1 C2 C3.inflight[failed] C4 R5[rolls back C3] - // Metadata: C1.delta C2.delta - // - // When R5 completes, C3.xxx will be deleted. When C4 completes, C4 and R5 will be committed to Metadata Table in - // that order. R5 should be neglected as C3 was never committed to metadata table. - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) { - // Metadata disabled and no auto-commit - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - // Not committed so left in inflight state - client.syncTableMetadata(); - assertTrue(metadata(client).isInSync()); - validateMetadata(client); - } - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) { - // Metadata enabled - // The previous commit will be rolled back automatically - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - validateMetadata(client); - } - - // In this scenario an async operations is started and completes around the same time of the failed commit. - // Rest of the reasoning is same as above test. - // C4.clean was an asynchronous clean started along with C3. The clean completed but C3 commit failed. - // - // Dataset: C1 C2 C3.inflight[failed] C4.clean C5 R6[rolls back C3] - // Metadata: C1.delta C2.delta - // - // When R6 completes, C3.xxx will be deleted. When C5 completes, C4, C5 and R6 will be committed to Metadata Table - // in that order. R6 should be neglected as C3 was never committed to metadata table. - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) { - // Metadata disabled and no auto-commit - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - // Not committed so left in inflight state - client.clean(); - client.syncTableMetadata(); - assertTrue(metadata(client).isInSync()); - validateMetadata(client); - } - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) { - // Metadata enabled - // The previous commit will be rolled back automatically - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - validateMetadata(client); - } + // marker-based rollback of partial commit + writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(true).build(); + testTable.doWriteOperation("018", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + testTable.doRollback("018", "019"); + syncAndValidate(testTable, true); } /** @@ -613,65 +414,50 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { @EnumSource(HoodieTableType.class) public void testManualRollbacks(HoodieTableType tableType) throws Exception { init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + doWriteOperationsAndBootstrapMetadata(testTable); // Setting to archive more aggressively on the Metadata Table than the Dataset final int maxDeltaCommitsBeforeCompaction = 4; final int minArchiveCommitsMetadata = 2; final int minArchiveCommitsDataset = 4; - HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) + writeConfig = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) .archiveCommitsWith(minArchiveCommitsMetadata, minArchiveCommitsMetadata + 1).retainCommits(1) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1) .retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build()) .build(); - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) { - // Initialize table with metadata - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - - // Perform multiple commits - for (int i = 1; i < 10; ++i) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - if (i == 1) { - records = dataGen.generateInserts(newCommitTime, 5); - } else { - records = dataGen.generateUpdates(newCommitTime, 2); - } - client.startCommitWithTime(newCommitTime); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); + for (int i = 3; i < 10; i++) { + if (i == 3) { + testTable.doWriteOperation("00" + i, UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); + } else { + testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); } - - // We can only rollback those commits whose deltacommit have not been archived yet. - int numRollbacks = 0; - boolean exceptionRaised = false; - - List allInstants = metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants() - .collect(Collectors.toList()); - for (HoodieInstant instantToRollback : allInstants) { - try { - client.rollback(instantToRollback.getTimestamp()); - client.syncTableMetadata(); - ++numRollbacks; - } catch (HoodieMetadataException e) { - exceptionRaised = true; - break; - } - } - - assertTrue(exceptionRaised, "Rollback of archived instants should fail"); - // Since each rollback also creates a deltacommit, we can only support rolling back of half of the original - // instants present before rollback started. - assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2, - "Rollbacks of non archived instants should work"); } + syncAndValidate(testTable, true); + + // We can only rollback those commits whose deltacommit have not been archived yet. + int numRollbacks = 0; + boolean exceptionRaised = false; + + List allInstants = metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants().collect(Collectors.toList()); + for (HoodieInstant instantToRollback : allInstants) { + try { + testTable.doRollback(instantToRollback.getTimestamp(), String.valueOf(Time.now())); + syncTableMetadata(writeConfig); + ++numRollbacks; + } catch (HoodieMetadataException e) { + exceptionRaised = true; + break; + } + } + + assertTrue(exceptionRaised, "Rollback of archived instants should fail"); + // Since each rollback also creates a deltacommit, we can only support rolling back of half of the original + // instants present before rollback started. + assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2, + "Rollbacks of non archived instants should work"); } /** @@ -679,148 +465,69 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { */ @ParameterizedTest @EnumSource(HoodieTableType.class) - @Disabled public void testSync(HoodieTableType tableType) throws Exception { init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - String newCommitTime; - List records; - List writeStatuses; - // Initial commits without metadata table enabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateInserts(newCommitTime, 5); - client.startCommitWithTime(newCommitTime); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateInserts(newCommitTime, 5); - client.startCommitWithTime(newCommitTime); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - } - + writeConfig = getWriteConfigBuilder(true, false, false).build(); + testTable.doWriteOperation("001", BULK_INSERT, asList("p1", "p2"), asList("p1", "p2"), 1); + testTable.doWriteOperation("002", BULK_INSERT, asList("p1", "p2"), 1); // Enable metadata table so it initialized by listing from file system - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 5); - writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - validateMetadata(client); - assertTrue(metadata(client).isInSync()); - } - + testTable.doWriteOperation("003", INSERT, asList("p1", "p2"), 1); + syncAndValidate(testTable, emptyList(), true, true, true); // Various table operations without metadata table enabled - String restoreToInstant; - String inflightActionTimestamp; - String beforeInflightActionTimestamp; - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // updates - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 5); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); + testTable.doWriteOperation("004", UPSERT, asList("p1", "p2"), 1); + testTable.doWriteOperation("005", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable, emptyList(), false, true, true); - // updates and inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 10); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - - // Compaction - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - assertTrue(metadata(client).isInSync()); - } - - // Savepoint - restoreToInstant = newCommitTime; - if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { - client.savepoint("hoodie", "metadata test"); - assertTrue(metadata(client).isInSync()); - } - - // Record a timestamp for creating an inflight instance for sync testing - inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime(); - beforeInflightActionTimestamp = newCommitTime; - - // Deletes - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateDeletes(newCommitTime, 5); - JavaRDD deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey()); - client.startCommitWithTime(newCommitTime); - client.delete(deleteKeys, newCommitTime); - assertTrue(metadata(client).isInSync()); - - // Clean - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.clean(newCommitTime); - assertTrue(metadata(client).isInSync()); - - // updates - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 10); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - - // insert overwrite to test replacecommit - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); - records = dataGen.generateInserts(newCommitTime, 5); - HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime); - writeStatuses = replaceResult.getWriteStatuses().collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); + // trigger compaction + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("006", asList("p1", "p2")); + syncAndValidate(testTable, emptyList(), false, true, true); } - // If there is an incomplete operation, the Metadata Table is not updated beyond that operations but the + // trigger an upsert + testTable.doWriteOperation("008", UPSERT, asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), false, true, true); + + // savepoint + if (COPY_ON_WRITE.equals(tableType)) { + testTable.doSavepoint("008"); + syncAndValidate(testTable, emptyList(), false, true, true); + } + + // trigger delete + testTable.doWriteOperation("009", DELETE, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), false, true, true); + + // trigger clean + testTable.doCleanBasedOnCommits("010", asList("001", "002")); + syncAndValidate(testTable, emptyList(), false, true, true); + + // trigger another upsert + testTable.doWriteOperation("011", UPSERT, asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), false, true, true); + + // trigger clustering + testTable.doCluster("012", new HashMap<>()); + syncAndValidate(testTable, emptyList(), false, true, true); + + // If there is an inflight operation, the Metadata Table is not updated beyond that operations but the // in-memory merge should consider all the completed operations. - Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp)); - fs.create(inflightCleanPath).close(); + HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("007", UPSERT, emptyList(), + asList("p1", "p2", "p3"), 2, false, true); + // trigger upsert + testTable.doWriteOperation("013", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + // testTable validation will fetch only files pertaining to completed commits. So, validateMetadata() will skip files for 007 + // while validating against actual metadata table. + syncAndValidate(testTable, singletonList("007"), true, true, false); + // Remove the inflight instance holding back table sync + testTable.moveInflightCommitToComplete("007", inflightCommitMeta); + syncTableMetadata(writeConfig); + // A regular commit should get synced + testTable.doWriteOperation("014", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), true, true, true); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details - client.syncTableMetadata(); - - // Table should sync only before the inflightActionTimestamp - HoodieBackedTableMetadataWriter writer = - (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), beforeInflightActionTimestamp); - - // Reader should sync to all the completed instants - HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), - client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); - assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), newCommitTime); - - // Remove the inflight instance holding back table sync - fs.delete(inflightCleanPath, false); - client.syncTableMetadata(); - - writer = - (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime); - - // Reader should sync to all the completed instants - metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), - client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime); - } - - // Enable metadata table and ensure it is synced + /* TODO: Restore to savepoint, enable metadata table and ensure it is synced try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { client.restoreToInstant(restoreToInstant); assertFalse(metadata(client).isInSync()); @@ -831,47 +538,41 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { validateMetadata(client); assertTrue(metadata(client).isInSync()); - } + }*/ } /** * Instants on Metadata Table should be archived as per config but we always keep atlest the number of instants * as on the dataset. Metadata Table should be automatically compacted as per config. */ - @Test - public void testCleaningArchivingAndCompaction() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testCleaningArchivingAndCompaction(HoodieTableType tableType) throws Exception { + init(tableType); + doWriteOperationsAndBootstrapMetadata(testTable); final int maxDeltaCommitsBeforeCompaction = 4; final int minArchiveLimit = 4; final int maxArchiveLimit = 6; - HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) + writeConfig = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) .archiveCommitsWith(minArchiveLimit - 2, maxArchiveLimit - 2).retainCommits(1) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveLimit, maxArchiveLimit) .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(true).build()) .build(); - - List records; - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) { - for (int i = 1; i < 10; ++i) { - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - if (i == 1) { - records = dataGen.generateInserts(newCommitTime, 5); - } else { - records = dataGen.generateUpdates(newCommitTime, 2); - } - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); + for (int i = 3; i < 10; i++) { + if (i == 3) { + testTable.doWriteOperation("00" + i, UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); + } else { + testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); } } + syncAndValidate(testTable, true); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); - HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()).build(); + HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(writeConfig.getBasePath()).build(); HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline(); // check that there are compactions. assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants() > 0); @@ -887,194 +588,109 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test various error scenarios. */ - @Test - public void testErrorCases() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testErrorCases(HoodieTableType tableType) throws Exception { + init(tableType); // TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table // should be rolled back to last valid commit. - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateInserts(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 5); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed - // instant so that only the inflight is left over. - String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime); - assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME, - commitInstantFileName), false)); - } - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - String newCommitTime = client.startCommit(); - // Next insert - List records = dataGen.generateInserts(newCommitTime, 5); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - // Post rollback commit and metadata should be valid - validateMetadata(client); - } + testTable.doWriteOperation("001", UPSERT, asList("p1", "p2"), asList("p1", "p2"), 1); + syncAndValidate(testTable); + testTable.doWriteOperation("002", BULK_INSERT, emptyList(), asList("p1", "p2"), 1); + syncAndValidate(testTable); + // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed + // instant so that only the inflight is left over. + String commitInstantFileName = HoodieTimeline.makeCommitFileName("002"); + assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME, + commitInstantFileName), false)); + // Next upsert + testTable.doWriteOperation("003", UPSERT, emptyList(), asList("p1", "p2"), 1); + // Post rollback commit and metadata should be valid + syncTableMetadata(writeConfig); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieActiveTimeline timeline = metadataMetaClient.getActiveTimeline(); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "003"))); } /** * Test non-partitioned datasets. */ - //@Test - public void testNonPartitioned() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - HoodieTestDataGenerator nonPartitionedGenerator = new HoodieTestDataGenerator(new String[] {""}); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Write 1 (Bulk insert) - String newCommitTime = "001"; - List records = nonPartitionedGenerator.generateInserts(newCommitTime, 10); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - validateMetadata(client); - - List metadataPartitions = metadata(client).getAllPartitionPaths(); - assertTrue(metadataPartitions.contains(""), "Must contain empty partition"); - } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testNonPartitioned(HoodieTableType tableType) throws Exception { + init(tableType); + // Non-partitioned bulk insert + testTable.doWriteOperation("001", BULK_INSERT, emptyList(), 1); + syncTableMetadata(writeConfig); + List metadataPartitions = metadata(writeConfig, context).getAllPartitionPaths(); + assertTrue(metadataPartitions.isEmpty(), "Must contain empty partition"); } /** * Test various metrics published by metadata table. */ - @Test - public void testMetadataMetrics() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfigBuilder(true, true, true).build())) { - // Write - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - - Registry metricsRegistry = Registry.getRegistry("HoodieMetadata"); - assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); - assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); - assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L); - assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size")); - assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size")); - assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count")); - assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count")); - } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataMetrics(HoodieTableType tableType) throws Exception { + init(tableType); + writeConfig = getWriteConfigBuilder(true, true, true).build(); + testTable.doWriteOperation(HoodieActiveTimeline.createNewInstantTime(), INSERT, asList("p1", "p2"), + asList("p1", "p2"), 2, true); + syncTableMetadata(writeConfig); + Registry metricsRegistry = Registry.getRegistry("HoodieMetadata"); + assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); + assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); + assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L); + assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size")); + assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size")); + assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count")); + assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count")); } /** * Test when reading from metadata table which is out of sync with dataset that results are still consistent. */ - @Test - public void testMetadataOutOfSync() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true)); - - // Enable metadata so table is initialized - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Perform Bulk Insert - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateInserts(newCommitTime, 20); - client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataOutOfSync(HoodieTableType tableType) throws Exception { + init(tableType); + testTable.doWriteOperation("001", BULK_INSERT, asList("p1", "p2"), asList("p1", "p2"), 1); + // Enable metadata so table is initialized but do not sync + syncAndValidate(testTable, emptyList(), true, false, false); + // Perform an insert and upsert + testTable.doWriteOperation("002", INSERT, asList("p1", "p2"), 1); + testTable.doWriteOperation("003", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 1); + // Run compaction for MOR table + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("004", asList("p1", "p2")); } - - // Perform commit operations with metadata disabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // Perform Insert - String newCommitTime = "002"; - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateInserts(newCommitTime, 20); - client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - - // Perform Upsert - newCommitTime = "003"; - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 20); - client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - - // Compaction - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = "004"; - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - } + assertFalse(metadata(writeConfig, context).isInSync()); + testTable.doWriteOperation("005", UPSERT, asList("p1", "p2", "p3"), 1); + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("006", asList("p1", "p2")); } - - assertFalse(metadata(unsyncedClient).isInSync()); - validateMetadata(unsyncedClient); - - // Perform clean operation with metadata disabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // One more commit needed to trigger clean so upsert and compact - String newCommitTime = "005"; - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 20); - client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = "006"; - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - } - - // Clean - newCommitTime = "007"; - client.clean(newCommitTime); - } - - assertFalse(metadata(unsyncedClient).isInSync()); - validateMetadata(unsyncedClient); - - // Perform restore with metadata disabled + testTable.doCleanBasedOnCommits("007", singletonList("001")); + /* TODO: Perform restore with metadata disabled try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { client.restoreToInstant("004"); - } - - assertFalse(metadata(unsyncedClient).isInSync()); - validateMetadata(unsyncedClient); + }*/ + assertFalse(metadata(writeConfig, context).isInSync()); + syncAndValidate(testTable, emptyList(), true, true, true, true); } /** * Test that failure to perform deltacommit on the metadata table does not lead to missed sync. */ - @Test - public void testMetdataTableCommitFailure() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Write 1 - String newCommitTime = "001"; - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - // Write 2 - newCommitTime = "002"; - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 20); - writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetdataTableCommitFailure(HoodieTableType tableType) throws Exception { + init(tableType); + testTable.doWriteOperation("001", INSERT, asList("p1", "p2"), asList("p1", "p2"), 2, true); + syncTableMetadata(writeConfig); + testTable.doWriteOperation("002", INSERT, asList("p1", "p2"), 2, true); + syncTableMetadata(writeConfig); // At this time both commits 001 and 002 must be synced to the metadata table HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); @@ -1089,29 +705,112 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { assertTrue(timeline.containsInstant(new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); // In this commit deltacommit "002" will be rolled back and attempted again. - String latestCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - String newCommitTime = "003"; - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - - records = dataGen.generateInserts(latestCommitTime, 20); - client.startCommitWithTime(latestCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), latestCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - } + testTable.doWriteOperation("003", BULK_INSERT, singletonList("p3"), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); timeline = metadataMetaClient.reloadActiveTimeline(); assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); - assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, latestCommitTime))); - assertTrue(timeline.getRollbackTimeline().countInstants() == 1); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "003"))); + assertEquals(1, timeline.getRollbackTimeline().countInstants()); } /** - * Validate the metadata tables contents to ensure it matches what is on the file system. + * Tests that if timeline has an inflight commit midway, metadata syncs only completed commits (including later to inflight commit). */ + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testInFlightCommit(HoodieTableType tableType) throws Exception { + init(tableType); + // bootstrap w/ 2 commits + doWriteOperationsAndBootstrapMetadata(testTable); + + // trigger an upsert + testTable.doWriteOperation("003", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable); + + // trigger an upsert + testTable.doWriteOperation("005", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable); + + // create an inflight commit. + HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("006", UPSERT, emptyList(), + asList("p1", "p2", "p3"), 2, false, true); + + // trigger upsert + testTable.doWriteOperation("007", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + // testTable validation will fetch only files pertaining to completed commits. So, validateMetadata() will skip files for 006 + // while validating against actual metadata table. + syncAndValidate(testTable, singletonList("006"), writeConfig.isMetadataTableEnabled(), writeConfig.getMetadataConfig().enableSync(), false); + + // Remove the inflight instance holding back table sync + testTable.moveInflightCommitToComplete("006", inflightCommitMeta); + syncTableMetadata(writeConfig); + + // A regular commit should get synced + testTable.doWriteOperation("008", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, true); + } + + private void doWriteOperationsAndBootstrapMetadata(HoodieTestTable testTable) throws Exception { + testTable.doWriteOperation("001", INSERT, asList("p1", "p2"), asList("p1", "p2"), + 2, true); + testTable.doWriteOperation("002", UPSERT, asList("p1", "p2"), + 2, true); + syncAndValidate(testTable); + } + + private void syncAndValidate(HoodieTestTable testTable) throws IOException { + syncAndValidate(testTable, emptyList(), writeConfig.isMetadataTableEnabled(), writeConfig.getMetadataConfig().enableSync(), true); + } + + private void syncAndValidate(HoodieTestTable testTable, boolean doFullValidation) throws IOException { + syncAndValidate(testTable, emptyList(), writeConfig.isMetadataTableEnabled(), writeConfig.getMetadataConfig().enableSync(), true, doFullValidation); + } + + private void syncAndValidate(HoodieTestTable testTable, List inflightCommits, boolean enableMetadata, + boolean enableMetadataSync, boolean enableValidation) throws IOException { + syncAndValidate(testTable, inflightCommits, enableMetadata, enableMetadataSync, enableValidation, false); + } + + private void syncAndValidate(HoodieTestTable testTable, List inflightCommits, boolean enableMetadata, + boolean enableMetadataSync, boolean enableValidation, boolean doFullValidation) throws IOException { + writeConfig.getMetadataConfig().setValue(HoodieMetadataConfig.ENABLE, String.valueOf(enableMetadata)); + writeConfig.getMetadataConfig().setValue(HoodieMetadataConfig.SYNC_ENABLE, String.valueOf(enableMetadataSync)); + syncTableMetadata(writeConfig); + validateMetadata(testTable, inflightCommits, writeConfig, metadataTableBasePath, doFullValidation); + } + + private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) { + return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build(); + } + + private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { + return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics); + } + + private HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2) + .withAutoCommit(autoCommit) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1) + .withFailedWritesCleaningPolicy(policy) + .withAutoClean(false).retainCommits(1).retainFileVersions(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build()) + .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(useFileListingMetadata) + .enableMetrics(enableMetrics).build()) + .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) + .withExecutorMetrics(true).build()) + .withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() + .usePrefix("unit-test").build()); + } + private void validateMetadata(SparkRDDWriteClient testClient) throws IOException { HoodieWriteConfig config = testClient.getConfig(); @@ -1273,44 +972,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { clientConfig.getSpillableMapBasePath()); } - // TODO: this can be moved to TestHarness after merge from master - private void assertNoWriteErrors(List statuses) { - // Verify there are no errors - for (WriteStatus status : statuses) { - assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId()); - } - } - - private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) { - return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build(); - } - - private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { - return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics); - } - - private HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2) - .withAutoCommit(autoCommit) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1) - .withFailedWritesCleaningPolicy(policy) - .withAutoClean(false).retainCommits(1).retainFileVersions(1).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build()) - .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") - .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() - .withEnableBackupForRemoteFileSystemView(false).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(useFileListingMetadata) - .enableMetrics(enableMetrics).build()) - .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) - .withExecutorMetrics(true).build()) - .withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() - .usePrefix("unit-test").build()); - } - @Override protected HoodieTableType getTableType() { return tableType; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 1e52e4494..c734daecd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -21,7 +21,11 @@ import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; @@ -29,15 +33,26 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadStat; import org.apache.hadoop.conf.Configuration; @@ -53,19 +68,31 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import scala.Tuple2; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + /** * The test harness for resource initialization and cleanup. */ @@ -149,7 +176,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } /** - * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) + * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) * with a default name matching the name of the class. */ protected void initSparkContexts() { @@ -376,9 +403,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, - FileStatus[] fileStatuses) { + FileStatus[] fileStatuses) { if (tableView == null) { - tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); + tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); } else { tableView.init(metaClient, visibleActiveTimeline, fileStatuses); } @@ -418,4 +445,175 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } return Pair.of(partitionPathStatMap, globalStat); } + + /** + * Validate the metadata tables contents to ensure it matches what is on the file system. + */ + public void validateMetadata(HoodieTestTable testTable, List inflightCommits, HoodieWriteConfig writeConfig, + String metadataTableBasePath, boolean doFullValidation) throws IOException { + HoodieTableMetadata tableMetadata = metadata(writeConfig, context); + assertNotNull(tableMetadata, "MetadataReader should have been initialized"); + if (!writeConfig.isMetadataTableEnabled() || !writeConfig.getMetadataConfig().validateFileListingMetadata()) { + return; + } + + assertEquals(inflightCommits, testTable.inflightCommits()); + + HoodieTimer timer = new HoodieTimer().startTimer(); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + // Partitions should match + List fsPartitionPaths = testTable.getAllPartitionPaths(); + List fsPartitions = new ArrayList<>(); + fsPartitionPaths.forEach(entry -> fsPartitions.add(entry.getFileName().toString())); + List metadataPartitions = tableMetadata.getAllPartitionPaths(); + + Collections.sort(fsPartitions); + Collections.sort(metadataPartitions); + + assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match"); + assertEquals(fsPartitions, metadataPartitions, "Partitions should match"); + + // Files within each partition should match + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext); + TableFileSystemView tableView = table.getHoodieView(); + List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); + Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); + assertEquals(fsPartitions.size(), partitionToFilesMap.size()); + + fsPartitions.forEach(partition -> { + try { + validateFilesPerPartition(testTable, tableMetadata, tableView, partitionToFilesMap, partition); + } catch (IOException e) { + fail("Exception should not be raised: " + e); + } + }); + if (doFullValidation) { + runFullValidation(writeConfig, metadataTableBasePath, engineContext); + } + + LOG.info("Validation time=" + timer.endTimer()); + } + + public void syncTableMetadata(HoodieWriteConfig writeConfig) { + if (!writeConfig.getMetadataConfig().enableSync()) { + return; + } + // Open up the metadata table again, for syncing + try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, writeConfig, context)) { + LOG.info("Successfully synced to metadata table"); + } catch (Exception e) { + throw new HoodieMetadataException("Error syncing to metadata table.", e); + } + } + + public HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteConfig clientConfig) { + return (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter + .create(hadoopConf, clientConfig, new HoodieSparkEngineContext(jsc)); + } + + public HoodieTableMetadata metadata(HoodieWriteConfig clientConfig, HoodieEngineContext hoodieEngineContext) { + return HoodieTableMetadata.create(hoodieEngineContext, clientConfig.getMetadataConfig(), clientConfig.getBasePath(), + clientConfig.getSpillableMapBasePath()); + } + + private void validateFilesPerPartition(HoodieTestTable testTable, HoodieTableMetadata tableMetadata, TableFileSystemView tableView, + Map partitionToFilesMap, String partition) throws IOException { + Path partitionPath; + if (partition.equals("")) { + // Should be the non-partitioned case + partitionPath = new Path(basePath); + } else { + partitionPath = new Path(basePath, partition); + } + + FileStatus[] fsStatuses = testTable.listAllFilesInPartition(partition); + FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath); + List fsFileNames = Arrays.stream(fsStatuses) + .map(s -> s.getPath().getName()).collect(Collectors.toList()); + List metadataFilenames = Arrays.stream(metaStatuses) + .map(s -> s.getPath().getName()).collect(Collectors.toList()); + Collections.sort(fsFileNames); + Collections.sort(metadataFilenames); + + assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length); + + if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) { + LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray())); + LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray())); + + for (String fileName : fsFileNames) { + if (!metadataFilenames.contains(fileName)) { + LOG.error(partition + "FsFilename " + fileName + " not found in Meta data"); + } + } + for (String fileName : metadataFilenames) { + if (!fsFileNames.contains(fileName)) { + LOG.error(partition + "Metadata file " + fileName + " not found in original FS"); + } + } + } + + // Block sizes should be valid + Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0)); + List fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList()); + List metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList()); + assertEquals(fsBlockSizes, metadataBlockSizes); + + assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match"); + assertEquals(fsFileNames, metadataFilenames, "Files within partition " + partition + " should match"); + + // FileSystemView should expose the same data + List fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList()); + fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList())); + + fileGroups.forEach(g -> LogManager.getLogger(getClass()).info(g)); + fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(getClass()).info(b))); + fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(getClass()).info(s))); + + long numFiles = fileGroups.stream() + .mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum()) + .sum(); + assertEquals(metadataFilenames.size(), numFiles); + } + + private void runFullValidation(HoodieWriteConfig writeConfig, String metadataTableBasePath, HoodieSparkEngineContext engineContext) { + HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(writeConfig); + assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + + // Validate write config for metadata table + HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); + assertFalse(metadataWriteConfig.isMetadataTableEnabled(), "No metadata table for metadata table"); + + // Metadata table should be in sync with the dataset + assertTrue(metadata(writeConfig, engineContext).isInSync()); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + + // Metadata table is MOR + assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR"); + + // Metadata table is HFile format + assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE, + "Metadata Table base file format should be HFile"); + + // Metadata table has a fixed number of partitions + // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory + // in the .hoodie folder. + List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, HoodieTableMetadata.getMetadataTableBasePath(basePath), + false, false, false); + Assertions.assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size()); + + // Metadata table should automatically compact and clean + // versions are +1 as autoclean / compaction happens end of commits + int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1; + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); + metadataTablePartitions.forEach(partition -> { + List latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList()); + assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file"); + assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice"); + assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to " + + numFileVersions + " but was " + latestSlices.size()); + }); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 5c439f51a..e96fcce6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -167,6 +167,9 @@ public class FSUtils { } public static String getCommitTime(String fullFileName) { + if (isLogFile(new Path(fullFileName))) { + return fullFileName.split("_")[1].split("\\.")[0]; + } return fullFileName.split("_")[2].split("\\.")[0]; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index c345cc7af..ef8b09b51 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -50,6 +50,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -157,6 +158,9 @@ public class TestFSUtils extends HoodieCommonTestHarness { String fileName = UUID.randomUUID().toString(); String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName); assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); + // test log file name + fullFileName = FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), instantTime, 1, TEST_WRITE_TOKEN); + assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); } @Test diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index bb6c0b491..600ee1673 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -39,6 +39,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.io.RandomAccessFile; @@ -48,8 +50,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.FileTime; import java.time.Instant; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan; @@ -59,6 +64,8 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serial public class FileCreateUtils { + private static final Logger LOG = LogManager.getLogger(FileCreateUtils.class); + private static final String WRITE_TOKEN = "1-0-1"; private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); @@ -216,6 +223,10 @@ public class FileCreateUtils { createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION); } + public static void createInflightCompaction(String basePath, String instantTime) throws IOException { + createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION); + } + public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); @@ -307,6 +318,13 @@ public class FileCreateUtils { .endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, ioType))).count(); } + public static List getPartitionPaths(Path basePath) throws IOException { + if (Files.notExists(basePath)) { + return Collections.emptyList(); + } + return Files.list(basePath).filter(entry -> !entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()); + } + /** * Find total basefiles for passed in paths. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java index 76fdf18d4..95188bb0b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java @@ -75,7 +75,11 @@ public class FileSystemTestUtils { } public static List listRecursive(FileSystem fs, Path path) throws IOException { - RemoteIterator itr = fs.listFiles(path, true); + return listFiles(fs, path, true); + } + + public static List listFiles(FileSystem fs, Path path, boolean recursive) throws IOException { + RemoteIterator itr = fs.listFiles(path, recursive); List statuses = new ArrayList<>(); while (itr.hasNext()) { statuses.add(itr.next()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index e6c488e49..099fec287 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -19,52 +19,79 @@ package org.apache.hudi.common.testutils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata; +import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Paths; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import static java.time.temporal.ChronoUnit.SECONDS; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.apache.hudi.common.model.WriteOperationType.CLUSTER; +import static org.apache.hudi.common.model.WriteOperationType.COMPACT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; import static org.apache.hudi.common.testutils.FileCreateUtils.createCleanFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCleanFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile; @@ -77,9 +104,19 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDe import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createRollbackFile; import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName; +import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; +import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap; +import static org.apache.hudi.common.util.CommitUtils.buildMetadata; +import static org.apache.hudi.common.util.CommitUtils.getCommitActionType; +import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; public class HoodieTestTable { + private static final Logger LOG = LogManager.getLogger(HoodieTestTable.class); + private static final Random RANDOM = new Random(); + private static HoodieTestTableState testTableState; + private final List inflightCommits = new ArrayList<>(); + protected final String basePath; protected final FileSystem fs; protected HoodieTableMetaClient metaClient; @@ -94,6 +131,7 @@ public class HoodieTestTable { } public static HoodieTestTable of(HoodieTableMetaClient metaClient) { + testTableState = HoodieTestTableState.of(); return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient); } @@ -130,6 +168,7 @@ public class HoodieTestTable { public HoodieTestTable addInflightCommit(String instantTime) throws Exception { createRequestedCommit(basePath, instantTime); createInflightCommit(basePath, instantTime); + inflightCommits.add(instantTime); currentInstantTime = instantTime; metaClient = HoodieTableMetaClient.reload(metaClient); return this; @@ -144,6 +183,28 @@ public class HoodieTestTable { return this; } + public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, HoodieTestTableState testTableState) { + String actionType = getCommitActionType(operationType, metaClient.getTableType()); + return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, false, actionType); + } + + public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, + HoodieTestTableState testTableState, boolean bootstrap) { + String actionType = getCommitActionType(operationType, metaClient.getTableType()); + return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, bootstrap, actionType); + } + + public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, + Map> partitionToReplaceFileIds, + HoodieTestTableState testTableState, boolean bootstrap, String action) { + List writeStats = generateHoodieWriteStatForPartition(testTableState.getPartitionToBaseFileInfoMap(commitTime), commitTime, bootstrap); + if (MERGE_ON_READ.equals(metaClient.getTableType()) && UPSERT.equals(operationType)) { + writeStats.addAll(generateHoodieWriteStatForPartitionLogFiles(testTableState.getPartitionToLogFileInfoMap(commitTime), commitTime, bootstrap)); + } + Map extraMetadata = createImmutableMap("test", "test"); + return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, EMPTY_STRING, action); + } + public HoodieTestTable addCommit(String instantTime, HoodieCommitMetadata metadata) throws Exception { createRequestedCommit(basePath, instantTime); createInflightCommit(basePath, instantTime); @@ -153,6 +214,14 @@ public class HoodieTestTable { return this; } + public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { + createCommit(basePath, instantTime, metadata); + inflightCommits.remove(instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + public HoodieTestTable addDeltaCommit(String instantTime) throws Exception { createRequestedDeltaCommit(basePath, instantTime); createInflightDeltaCommit(basePath, instantTime); @@ -199,6 +268,31 @@ public class HoodieTestTable { return this; } + public HoodieTestTable addClean(String instantTime) throws IOException { + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING), EMPTY_STRING, new HashMap<>(), + CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + HoodieCleanStat cleanStats = new HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, + HoodieTestUtils.DEFAULT_PARTITION_PATHS[RANDOM.nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + instantTime); + HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); + return HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata); + } + + public Pair getHoodieCleanMetadata(String commitTime, HoodieTestTableState testTableState) { + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING), EMPTY_STRING, new HashMap<>(), + CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + List cleanStats = new ArrayList<>(); + for (Map.Entry> entry : testTableState.getPartitionToFileIdMapForCleaner(commitTime).entrySet()) { + cleanStats.add(new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, + entry.getKey(), entry.getValue(), entry.getValue(), Collections.emptyList(), commitTime)); + } + return Pair.of(cleanerPlan, convertCleanMetadata(commitTime, Option.of(0L), cleanStats)); + } + public HoodieTestTable addInflightRollback(String instantTime) throws IOException { createInflightRollbackFile(basePath, instantTime); currentInstantTime = instantTime; @@ -214,6 +308,61 @@ public class HoodieTestTable { return this; } + public HoodieRollbackMetadata getRollbackMetadata(String instantTimeToDelete, Map> partitionToFilesMeta) throws Exception { + HoodieRollbackMetadata rollbackMetadata = new HoodieRollbackMetadata(); + rollbackMetadata.setCommitsRollback(Collections.singletonList(instantTimeToDelete)); + rollbackMetadata.setStartRollbackTime(instantTimeToDelete); + Map partitionMetadataMap = new HashMap<>(); + for (Map.Entry> entry : partitionToFilesMeta.entrySet()) { + HoodieRollbackPartitionMetadata rollbackPartitionMetadata = new HoodieRollbackPartitionMetadata(); + rollbackPartitionMetadata.setPartitionPath(entry.getKey()); + rollbackPartitionMetadata.setSuccessDeleteFiles(entry.getValue()); + rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>()); + rollbackPartitionMetadata.setWrittenLogFiles(getWrittenLogFiles(instantTimeToDelete, entry)); + rollbackPartitionMetadata.setRollbackLogFiles(createImmutableMap(logFileName(instantTimeToDelete, UUID.randomUUID().toString(), 0), (long) (100 + RANDOM.nextInt(500)))); + partitionMetadataMap.put(entry.getKey(), rollbackPartitionMetadata); + } + rollbackMetadata.setPartitionMetadata(partitionMetadataMap); + rollbackMetadata.setInstantsRollback(Collections.singletonList(new HoodieInstantInfo(instantTimeToDelete, HoodieTimeline.ROLLBACK_ACTION))); + return rollbackMetadata; + } + + /** + * Return a map of log file name to file size that were expected to be rolled back in that partition. + */ + private Map getWrittenLogFiles(String instant, Map.Entry> entry) { + Map writtenLogFiles = new HashMap<>(); + for (String fileName : entry.getValue()) { + if (FSUtils.isLogFile(new Path(fileName))) { + if (testTableState.getPartitionToLogFileInfoMap(instant) != null + && testTableState.getPartitionToLogFileInfoMap(instant).containsKey(entry.getKey())) { + List> fileInfos = testTableState.getPartitionToLogFileInfoMap(instant).get(entry.getKey()); + for (Pair fileInfo : fileInfos) { + if (fileName.equals(logFileName(instant, fileInfo.getLeft(), fileInfo.getRight()[0]))) { + writtenLogFiles.put(fileName, Long.valueOf(fileInfo.getRight()[1])); + } + } + } + } + } + return writtenLogFiles; + } + + public HoodieSavepointMetadata getSavepointMetadata(String instant, Map> partitionToFilesMeta) { + HoodieSavepointMetadata savepointMetadata = new HoodieSavepointMetadata(); + savepointMetadata.setSavepointedAt(Long.valueOf(instant)); + Map partitionMetadataMap = new HashMap<>(); + for (Map.Entry> entry : partitionToFilesMeta.entrySet()) { + HoodieSavepointPartitionMetadata savepointPartitionMetadata = new HoodieSavepointPartitionMetadata(); + savepointPartitionMetadata.setPartitionPath(entry.getKey()); + savepointPartitionMetadata.setSavepointDataFile(entry.getValue()); + partitionMetadataMap.put(entry.getKey(), savepointPartitionMetadata); + } + savepointMetadata.setPartitionMetadata(partitionMetadataMap); + savepointMetadata.setSavepointedBy("test"); + return savepointMetadata; + } + public HoodieTestTable addRequestedCompaction(String instantTime) throws IOException { createRequestedCompaction(basePath, instantTime); currentInstantTime = instantTime; @@ -235,6 +384,13 @@ public class HoodieTestTable { return addRequestedCompaction(instantTime, plan); } + public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception { + createRequestedCompaction(basePath, instantTime); + createInflightCompaction(basePath, instantTime); + return HoodieTestTable.of(metaClient) + .addCommit(instantTime, commitMetadata); + } + public HoodieTestTable forCommit(String instantTime) { currentInstantTime = instantTime; return this; @@ -311,6 +467,13 @@ public class HoodieTestTable { return this; } + public HoodieTestTable withBaseFilesInPartition(String partition, List> fileInfos) throws Exception { + for (Pair fileInfo : fileInfos) { + FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, fileInfo.getKey(), fileInfo.getValue()); + } + return this; + } + public String getFileIdWithLogFile(String partitionPath) throws Exception { String fileId = UUID.randomUUID().toString(); withLogFile(partitionPath, fileId); @@ -328,6 +491,13 @@ public class HoodieTestTable { return this; } + public HoodieTestTable withLogFilesInPartition(String partition, List> fileInfos) throws Exception { + for (Pair fileInfo : fileInfos) { + FileCreateUtils.createLogFile(basePath, partition, currentInstantTime, fileInfo.getKey(), fileInfo.getValue()[0], fileInfo.getValue()[1]); + } + return this; + } + public boolean inflightCommitExists(String instantTime) { try { return fs.exists(getInflightCommitFilePath(instantTime)); @@ -388,6 +558,11 @@ public class HoodieTestTable { return new Path(Paths.get(basePath, partition).toUri()); } + public List getAllPartitionPaths() throws IOException { + java.nio.file.Path basePathPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).getParent().getParent(); + return FileCreateUtils.getPartitionPaths(basePathPath); + } + public Path getBaseFilePath(String partition, String fileId) { return new Path(Paths.get(basePath, partition, getBaseFileNameById(fileId)).toUri()); } @@ -396,6 +571,24 @@ public class HoodieTestTable { return baseFileName(currentInstantTime, fileId); } + public Path getLogFilePath(String partition, String fileId, int version) { + return new Path(Paths.get(basePath, partition, getLogFileNameById(fileId, version)).toString()); + } + + public String getLogFileNameById(String fileId, int version) { + return logFileName(currentInstantTime, fileId, version); + } + + public List getEarliestFilesInPartition(String partition, int count) throws IOException { + List fileStatuses = Arrays.asList(listAllFilesInPartition(partition)); + fileStatuses.sort(Comparator.comparing(FileStatus::getModificationTime)); + return fileStatuses.subList(0, count).stream().map(entry -> entry.getPath().getName()).collect(Collectors.toList()); + } + + public List inflightCommits() { + return this.inflightCommits; + } + public FileStatus[] listAllBaseFiles() throws IOException { return listAllBaseFiles(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension()); } @@ -421,16 +614,356 @@ public class HoodieTestTable { } public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException { - return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]); + return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream() + .filter(entry -> { + boolean toReturn = true; + String fileName = entry.getPath().getName(); + if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + toReturn = false; + } else { + for (String inflight : inflightCommits) { + if (fileName.contains(inflight)) { + toReturn = false; + break; + } + } + } + return toReturn; + }).toArray(FileStatus[]::new); } public FileStatus[] listAllFilesInTempFolder() throws IOException { return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]); } + public void deleteFilesInPartition(String partitionPath, List filesToDelete) throws IOException { + FileStatus[] allFiles = listAllFilesInPartition(partitionPath); + Arrays.stream(allFiles).filter(entry -> filesToDelete.contains(entry.getPath().getName())).forEach(entry -> { + try { + Files.delete(Paths.get(basePath, partitionPath, entry.getPath().getName())); + } catch (IOException e) { + throw new HoodieTestTableException(e); + } + }); + } + + public HoodieTestTable doRollback(String commitTimeToRollback, String commitTime) throws Exception { + Option commitMetadata = getMetadataForInstant(commitTimeToRollback); + if (!commitMetadata.isPresent()) { + throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTimeToRollback); + } + Map> partitionFiles = getPartitionFiles(commitMetadata.get()); + HoodieRollbackMetadata rollbackMetadata = getRollbackMetadata(commitTimeToRollback, partitionFiles); + for (Map.Entry> entry : partitionFiles.entrySet()) { + deleteFilesInPartition(entry.getKey(), entry.getValue()); + } + return addRollback(commitTime, rollbackMetadata); + } + + public HoodieTestTable doCluster(String commitTime, Map> partitionToReplaceFileIds) throws Exception { + Map>> partitionToReplaceFileIdsWithLength = new HashMap<>(); + for (Map.Entry> entry : partitionToReplaceFileIds.entrySet()) { + String partition = entry.getKey(); + partitionToReplaceFileIdsWithLength.put(entry.getKey(), new ArrayList<>()); + for (String fileId : entry.getValue()) { + int length = 100 + RANDOM.nextInt(500); + partitionToReplaceFileIdsWithLength.get(partition).add(Pair.of(fileId, length)); + } + } + List writeStats = generateHoodieWriteStatForPartition(partitionToReplaceFileIdsWithLength, commitTime, false); + HoodieReplaceCommitMetadata replaceMetadata = + (HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, EMPTY_STRING, REPLACE_COMMIT_ACTION); + return addReplaceCommit(commitTime, Option.empty(), Option.empty(), replaceMetadata); + } + + public HoodieCleanMetadata doClean(String commitTime, Map partitionFileCountsToDelete) throws IOException { + Map> partitionFilesToDelete = new HashMap<>(); + for (Map.Entry entry : partitionFileCountsToDelete.entrySet()) { + partitionFilesToDelete.put(entry.getKey(), getEarliestFilesInPartition(entry.getKey(), entry.getValue())); + } + HoodieTestTableState testTableState = new HoodieTestTableState(); + for (Map.Entry> entry : partitionFilesToDelete.entrySet()) { + testTableState = testTableState.createTestTableStateForCleaner(commitTime, entry.getKey(), entry.getValue()); + deleteFilesInPartition(entry.getKey(), entry.getValue()); + } + Pair cleanerMeta = getHoodieCleanMetadata(commitTime, testTableState); + addClean(commitTime, cleanerMeta.getKey(), cleanerMeta.getValue()); + return cleanerMeta.getValue(); + } + + public HoodieCleanMetadata doCleanBasedOnCommits(String cleanCommitTime, List commitsToClean) throws IOException { + Map partitionFileCountsToDelete = new HashMap<>(); + for (String commitTime : commitsToClean) { + Option commitMetadata = getMetadataForInstant(commitTime); + if (commitMetadata.isPresent()) { + Map> partitionFiles = getPartitionFiles(commitMetadata.get()); + for (String partition : partitionFiles.keySet()) { + partitionFileCountsToDelete.put(partition, partitionFiles.get(partition).size() + partitionFileCountsToDelete.getOrDefault(partition, 0)); + } + } + } + return doClean(cleanCommitTime, partitionFileCountsToDelete); + } + + public HoodieSavepointMetadata doSavepoint(String commitTime) throws IOException { + Option commitMetadata = getMetadataForInstant(commitTime); + if (!commitMetadata.isPresent()) { + throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTime); + } + Map> partitionFiles = getPartitionFiles(commitMetadata.get()); + HoodieSavepointMetadata savepointMetadata = getSavepointMetadata(commitTime, partitionFiles); + for (Map.Entry> entry : partitionFiles.entrySet()) { + deleteFilesInPartition(entry.getKey(), entry.getValue()); + } + return savepointMetadata; + } + + public HoodieTestTable doCompaction(String commitTime, List partitions) throws Exception { + this.currentInstantTime = commitTime; + if (partitions.isEmpty()) { + partitions = Collections.singletonList(EMPTY_STRING); + } + HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(COMPACT, metaClient.getTableType(), commitTime, partitions, 1); + HoodieCommitMetadata commitMetadata = createCommitMetadata(COMPACT, commitTime, testTableState); + for (String partition : partitions) { + this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition)); + } + return addCompaction(commitTime, commitMetadata); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List partitions, int filesPerPartition) throws Exception { + return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, false); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List newPartitionsToAdd, List partitions, + int filesPerPartition) throws Exception { + return doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitions, filesPerPartition, false); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List newPartitionsToAdd, List partitions, + int filesPerPartition, boolean bootstrap) throws Exception { + return doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitions, filesPerPartition, bootstrap, false); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List partitions, int filesPerPartition, boolean bootstrap) throws Exception { + return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, bootstrap, false); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List newPartitionsToAdd, List partitions, + int filesPerPartition, boolean bootstrap, boolean createInflightCommit) throws Exception { + if (partitions.isEmpty()) { + partitions = Collections.singletonList(EMPTY_STRING); + } + HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType, metaClient.getTableType(), commitTime, partitions, filesPerPartition); + HoodieCommitMetadata commitMetadata = createCommitMetadata(operationType, commitTime, testTableState, bootstrap); + for (String str : newPartitionsToAdd) { + this.withPartitionMetaFiles(str); + } + if (createInflightCommit) { + this.addInflightCommit(commitTime); + } else { + this.addCommit(commitTime, commitMetadata); + } + for (String partition : partitions) { + this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition)); + if (MERGE_ON_READ.equals(metaClient.getTableType()) && UPSERT.equals(operationType)) { + this.withLogFilesInPartition(partition, testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition)); + } + } + return commitMetadata; + } + + private Option getMetadataForInstant(String instantTime) { + Option hoodieInstant = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().filter(i -> i.getTimestamp().equals(instantTime)).firstInstant(); + try { + if (hoodieInstant.isPresent()) { + switch (hoodieInstant.get().getAction()) { + case HoodieTimeline.REPLACE_COMMIT_ACTION: + HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant.get()).get(), HoodieReplaceCommitMetadata.class); + return Option.of(replaceCommitMetadata); + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMMIT_ACTION: + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant.get()).get(), HoodieCommitMetadata.class); + return Option.of(commitMetadata); + default: + throw new IllegalArgumentException("Unknown instant action" + hoodieInstant.get().getAction()); + } + } else { + return Option.empty(); + } + } catch (IOException io) { + throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstant.get(), io); + } + } + + private static Map> getPartitionFiles(HoodieCommitMetadata commitMetadata) { + Map> partitionFilesToDelete = new HashMap<>(); + Map> partitionToWriteStats = commitMetadata.getPartitionToWriteStats(); + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + partitionFilesToDelete.put(entry.getKey(), new ArrayList<>()); + entry.getValue().forEach(writeStat -> partitionFilesToDelete.get(entry.getKey()).add(writeStat.getFileId())); + } + return partitionFilesToDelete; + } + + private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType, HoodieTableType tableType, String commitTime, + List partitions, int filesPerPartition) { + for (String partition : partitions) { + Stream fileLengths = IntStream.range(0, filesPerPartition).map(i -> 100 + RANDOM.nextInt(500)).boxed(); + if (MERGE_ON_READ.equals(tableType) && UPSERT.equals(operationType)) { + List> fileVersionAndLength = fileLengths.map(len -> Pair.of(0, len)).collect(Collectors.toList()); + testTableState = testTableState.createTestTableStateForBaseAndLogFiles(commitTime, partition, fileVersionAndLength); + } else { + testTableState = testTableState.createTestTableStateForBaseFilesOnly(commitTime, partition, fileLengths.collect(Collectors.toList())); + } + } + return testTableState; + } + + private static List generateHoodieWriteStatForPartition(Map>> partitionToFileIdMap, + String commitTime, boolean bootstrap) { + List writeStats = new ArrayList<>(); + for (Map.Entry>> entry : partitionToFileIdMap.entrySet()) { + String partition = entry.getKey(); + for (Pair fileIdInfo : entry.getValue()) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + String fileName = bootstrap ? fileIdInfo.getKey() : + FileCreateUtils.baseFileName(commitTime, fileIdInfo.getKey()); + writeStat.setFileId(fileName); + writeStat.setPartitionPath(partition); + writeStat.setPath(partition + "/" + fileName); + writeStat.setTotalWriteBytes(fileIdInfo.getValue()); + writeStats.add(writeStat); + } + } + return writeStats; + } + + /** + * Returns the write stats for log files in the partition. Since log file has version associated with it, the {@param partitionToFileIdMap} + * contains list of Pair where the Integer[] array has both file version and file size. + */ + private static List generateHoodieWriteStatForPartitionLogFiles(Map>> partitionToFileIdMap, String commitTime, boolean bootstrap) { + List writeStats = new ArrayList<>(); + if (partitionToFileIdMap == null) { + return writeStats; + } + for (Map.Entry>> entry : partitionToFileIdMap.entrySet()) { + String partition = entry.getKey(); + for (Pair fileIdInfo : entry.getValue()) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + String fileName = bootstrap ? fileIdInfo.getKey() : + FileCreateUtils.logFileName(commitTime, fileIdInfo.getKey(), fileIdInfo.getValue()[0]); + writeStat.setFileId(fileName); + writeStat.setPartitionPath(partition); + writeStat.setPath(partition + "/" + fileName); + writeStat.setTotalWriteBytes(fileIdInfo.getValue()[1]); + writeStats.add(writeStat); + } + } + return writeStats; + } + public static class HoodieTestTableException extends RuntimeException { public HoodieTestTableException(Throwable t) { super(t); } } + + static class HoodieTestTableState { + /** + * Map>> + * Used in building CLEAN metadata. + */ + Map>> commitsToPartitionToFileIdForCleaner = new HashMap<>(); + /** + * Map>>> + * Used to build commit metadata for base files for several write operations. + */ + Map>>> commitsToPartitionToBaseFileInfoStats = new HashMap<>(); + /** + * Map>>> + * Used to build commit metadata for log files for several write operations. + */ + Map>>> commitsToPartitionToLogFileInfoStats = new HashMap<>(); + + HoodieTestTableState() { + } + + static HoodieTestTableState of() { + return new HoodieTestTableState(); + } + + HoodieTestTableState createTestTableStateForCleaner(String commitTime, String partitionPath, List filesToClean) { + if (!commitsToPartitionToFileIdForCleaner.containsKey(commitTime)) { + commitsToPartitionToFileIdForCleaner.put(commitTime, new HashMap<>()); + } + if (!this.commitsToPartitionToFileIdForCleaner.get(commitTime).containsKey(partitionPath)) { + this.commitsToPartitionToFileIdForCleaner.get(commitTime).put(partitionPath, new ArrayList<>()); + } + + this.commitsToPartitionToFileIdForCleaner.get(commitTime).get(partitionPath).addAll(filesToClean); + return this; + } + + Map> getPartitionToFileIdMapForCleaner(String commitTime) { + return this.commitsToPartitionToFileIdForCleaner.get(commitTime); + } + + HoodieTestTableState createTestTableStateForBaseFilesOnly(String commitTime, String partitionPath, List lengths) { + if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) { + commitsToPartitionToBaseFileInfoStats.put(commitTime, new HashMap<>()); + } + if (!this.commitsToPartitionToBaseFileInfoStats.get(commitTime).containsKey(partitionPath)) { + this.commitsToPartitionToBaseFileInfoStats.get(commitTime).put(partitionPath, new ArrayList<>()); + } + + List> fileInfos = new ArrayList<>(); + for (int length : lengths) { + fileInfos.add(Pair.of(UUID.randomUUID().toString(), length)); + } + this.commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).addAll(fileInfos); + return this; + } + + HoodieTestTableState createTestTableStateForBaseAndLogFiles(String commitTime, String partitionPath, List> versionsAndLengths) { + if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) { + createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList())); + } + if (!this.commitsToPartitionToBaseFileInfoStats.get(commitTime).containsKey(partitionPath)) { + createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList())); + } + if (!commitsToPartitionToLogFileInfoStats.containsKey(commitTime)) { + commitsToPartitionToLogFileInfoStats.put(commitTime, new HashMap<>()); + } + if (!this.commitsToPartitionToLogFileInfoStats.get(commitTime).containsKey(partitionPath)) { + this.commitsToPartitionToLogFileInfoStats.get(commitTime).put(partitionPath, new ArrayList<>()); + } + + List> fileInfos = new ArrayList<>(); + for (int i = 0; i < versionsAndLengths.size(); i++) { + Pair versionAndLength = versionsAndLengths.get(i); + String fileId = FSUtils.getFileId(commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).get(i).getLeft()); + fileInfos.add(Pair.of(fileId, new Integer[] {versionAndLength.getLeft(), versionAndLength.getRight()})); + } + this.commitsToPartitionToLogFileInfoStats.get(commitTime).get(partitionPath).addAll(fileInfos); + return this; + } + + Map>> getPartitionToBaseFileInfoMap(String commitTime) { + return this.commitsToPartitionToBaseFileInfoStats.get(commitTime); + } + + Map>> getPartitionToLogFileInfoMap(String commitTime) { + return this.commitsToPartitionToLogFileInfoStats.get(commitTime); + } + } }