diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index b091359b2..803c5b9d8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,7 +18,6 @@ package org.apache.hudi.client.functional; -import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -27,6 +26,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; @@ -52,10 +52,8 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.exception.HoodieMetadataException; -import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; -import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieMetadataMetrics; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -67,32 +65,42 @@ import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT; +import static org.apache.hudi.common.model.WriteOperationType.DELETE; +import static org.apache.hudi.common.model.WriteOperationType.INSERT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("functional") @@ -100,12 +108,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class); - @TempDir - public java.nio.file.Path tempFolder; - + private static HoodieTestTable testTable; private String metadataTableBasePath; - private HoodieTableType tableType; + private HoodieWriteConfig writeConfig; public void init(HoodieTableType tableType) throws IOException { this.tableType = tableType; @@ -116,7 +122,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); - + writeConfig = getWriteConfig(true, true); + testTable = HoodieTestTable.of(metaClient); } @AfterEach @@ -124,136 +131,51 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { cleanupResources(); } - /** - * Metadata Table bootstrap scenarios. - */ - @Test - public void testMetadataTableBootstrap() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - // Metadata table should not exist until created for the first time - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); - assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); - - // Metadata table is not created if disabled by config - String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - client.startCommitWithTime(firstCommitTime); - client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 5)), firstCommitTime); - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created"); - assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); - } - - // Metadata table should not be created if any non-complete instants are present - String secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, true), true)) { - client.startCommitWithTime(secondCommitTime); - client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime); - // AutoCommit is false so no bootstrap - client.syncTableMetadata(); - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created"); - assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); - // rollback this commit - client.rollback(secondCommitTime); - } - - // Metadata table created when enabled by config & sync is called - secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime(secondCommitTime); - client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime); - client.syncTableMetadata(); - assertTrue(fs.exists(new Path(metadataTableBasePath))); - validateMetadata(client); - } - - // Delete all existing instants on dataset to simulate archiving. This should trigger a re-bootstrap of the metadata - // table as last synched instant has been "archived". - final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); - - Arrays.stream(fs.listStatus(new Path(metaClient.getMetaPath()))).filter(status -> status.getPath().getName().matches("^\\d+\\..*")) - .forEach(status -> { - try { - fs.delete(status.getPath(), false); - } catch (IOException e) { - LOG.warn("Error when deleting instant " + status + ": " + e); - } - }); - - String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime(thirdCommitTime); - client.insert(jsc.parallelize(dataGen.generateUpdates(thirdCommitTime, 2)), thirdCommitTime); - client.syncTableMetadata(); - assertTrue(fs.exists(new Path(metadataTableBasePath))); - validateMetadata(client); - - // Metadata Table should not have previous delta-commits as it was re-bootstrapped - assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); - assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime)))); - } + public static List bootstrapAndTableOperationTestArgs() { + return asList( + Arguments.of(COPY_ON_WRITE, true), + Arguments.of(COPY_ON_WRITE, false), + Arguments.of(MERGE_ON_READ, true), + Arguments.of(MERGE_ON_READ, false) + ); } /** - * Test enable/disable sync via the config. + * Metadata Table bootstrap scenarios. */ - @Test - public void testSyncConfig() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + @ParameterizedTest + @MethodSource("bootstrapAndTableOperationTestArgs") + public void testMetadataTableBootstrap(HoodieTableType tableType, boolean addRollback) throws Exception { + init(tableType); + // bootstrap with few commits + doWriteOperationsAndBootstrapMetadata(testTable); - // Create the metadata table - String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime(firstCommitTime); - client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 2)), firstCommitTime); - client.syncTableMetadata(); - assertTrue(fs.exists(new Path(metadataTableBasePath))); - validateMetadata(client); + if (addRollback) { + // trigger an UPSERT that will be rolled back + testTable.doWriteOperation("003", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); + // rollback last commit + testTable = testTable.doRollback("003", "004"); + syncAndValidate(testTable); } - // If sync is disabled, the table will not sync - HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(true).enableMetrics(false).enableSync(false).build()).build(); - final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; - String secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config, true)) { - client.startCommitWithTime(secondCommitTime); - client.insert(jsc.parallelize(dataGen.generateInserts(secondCommitTime, 2)), secondCommitTime); - client.syncTableMetadata(); + testTable.doWriteOperation("005", INSERT, asList("p1", "p2"), 4); + syncAndValidate(testTable); - // Metadata Table should not have synced - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); - assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); - } - - // If sync is enabled, the table will sync - String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime(thirdCommitTime); - client.insert(jsc.parallelize(dataGen.generateInserts(thirdCommitTime, 2)), thirdCommitTime); - client.syncTableMetadata(); - - // Metadata Table should have synced - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime)))); - } + // trigger an upsert and validate + testTable.doWriteOperation("006", UPSERT, singletonList("p3"), + asList("p1", "p2", "p3"), 4); + syncAndValidate(testTable, true); } /** * Only valid partition directories are added to the metadata. */ - @Test - public void testOnlyValidPartitionsAdded() throws Exception { + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Exception { // This test requires local file system - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - + init(tableType); // Create an empty directory which is not a partition directory (lacks partition metadata) final String nonPartitionDirectory = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0] + "-nonpartition"; Files.createDirectories(Paths.get(basePath, nonPartitionDirectory)); @@ -265,49 +187,82 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { final String filteredDirectoryThree = ".backups"; // Create some commits - HoodieTestTable testTable = HoodieTestTable.of(metaClient); testTable.withPartitionMetaFiles("p1", "p2", filteredDirectoryOne, filteredDirectoryTwo, filteredDirectoryThree) .addCommit("001").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10) .addCommit("002").withBaseFilesInPartition("p1", 10).withBaseFilesInPartition("p2", 10, 10, 10); - final HoodieWriteConfig writeConfig = - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false) + writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.NEVER, true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { - client.startCommitWithTime("005"); - client.insert(jsc.emptyRDD(), "005"); + testTable.doWriteOperation("003", UPSERT, emptyList(), asList("p1", "p2"), 1, true); + syncTableMetadata(writeConfig); - List partitions = metadataWriter(client).metadata().getAllPartitionPaths(); - assertFalse(partitions.contains(nonPartitionDirectory), - "Must not contain the non-partition " + nonPartitionDirectory); - assertTrue(partitions.contains("p1"), "Must contain partition p1"); - assertTrue(partitions.contains("p2"), "Must contain partition p2"); + List partitions = metadataWriter(writeConfig).metadata().getAllPartitionPaths(); + assertFalse(partitions.contains(nonPartitionDirectory), + "Must not contain the non-partition " + nonPartitionDirectory); + assertTrue(partitions.contains("p1"), "Must contain partition p1"); + assertTrue(partitions.contains("p2"), "Must contain partition p2"); - assertFalse(partitions.contains(filteredDirectoryOne), - "Must not contain the filtered directory " + filteredDirectoryOne); - assertFalse(partitions.contains(filteredDirectoryTwo), - "Must not contain the filtered directory " + filteredDirectoryTwo); - assertFalse(partitions.contains(filteredDirectoryThree), - "Must not contain the filtered directory " + filteredDirectoryThree); + assertFalse(partitions.contains(filteredDirectoryOne), + "Must not contain the filtered directory " + filteredDirectoryOne); + assertFalse(partitions.contains(filteredDirectoryTwo), + "Must not contain the filtered directory " + filteredDirectoryTwo); + assertFalse(partitions.contains(filteredDirectoryThree), + "Must not contain the filtered directory " + filteredDirectoryThree); - FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1")); - assertEquals(2, statuses.length); - statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2")); - assertEquals(5, statuses.length); - Map partitionsToFilesMap = metadata(client).getAllFilesInPartitions( - Arrays.asList(basePath + "/p1", basePath + "/p2")); - assertEquals(2, partitionsToFilesMap.size()); - assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length); - assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length); - } + FileStatus[] statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p1")); + assertEquals(tableType == COPY_ON_WRITE ? 3 : 4, statuses.length); + statuses = metadata(writeConfig, context).getAllFilesInPartition(new Path(basePath, "p2")); + assertEquals(tableType == COPY_ON_WRITE ? 6 : 7, statuses.length); + Map partitionsToFilesMap = metadata(writeConfig, context).getAllFilesInPartitions(asList(basePath + "/p1", basePath + "/p2")); + assertEquals(2, partitionsToFilesMap.size()); + assertEquals(tableType == COPY_ON_WRITE ? 3 : 4, partitionsToFilesMap.get(basePath + "/p1").length); + assertEquals(tableType == COPY_ON_WRITE ? 6 : 7, partitionsToFilesMap.get(basePath + "/p2").length); } /** * Test various table operations sync to Metadata Table correctly. */ @ParameterizedTest + @MethodSource("bootstrapAndTableOperationTestArgs") + public void testTableOperations(HoodieTableType tableType, boolean doNotSyncFewCommits) throws Exception { + init(tableType); + // bootstrap w/ 2 commits + doWriteOperationsAndBootstrapMetadata(testTable); + + // trigger an upsert + testTable.doWriteOperation("003", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable); + + // trigger compaction + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("004", asList("p1", "p2")); + syncAndValidate(testTable); + } + + // trigger an upsert + testTable.doWriteOperation("005", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + if (doNotSyncFewCommits) { + syncAndValidate(testTable, emptyList(), true, false, true); + } + + // trigger clean + testTable.doCleanBasedOnCommits("006", singletonList("001")); + if (doNotSyncFewCommits) { + syncAndValidate(testTable, emptyList(), true, false, false); + } + + // trigger delete + testTable.doWriteOperation("007", DELETE, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), true, true, false); + } + + /** + * Test several table operations with restore. This test uses SparkRDDWriteClient. + * Once the restore support is ready in HoodieTestTable, then rewrite this test. + */ + @ParameterizedTest @EnumSource(HoodieTableType.class) - public void testTableOperations(HoodieTableType tableType) throws Exception { + public void testTableOperationsWithRestore(HoodieTableType tableType) throws Exception { init(tableType); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); @@ -391,218 +346,64 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } /** - * Test rollback of various table operations sync to Metadata Table correctly. + * Tests rollback of a commit with metadata enabled. */ @ParameterizedTest @EnumSource(HoodieTableType.class) public void testRollbackOperations(HoodieTableType tableType) throws Exception { init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + // bootstrap w/ 2 commits + doWriteOperationsAndBootstrapMetadata(testTable); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Write 1 (Bulk insert) - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); + // trigger an upsert + testTable.doWriteOperation("003", UPSERT, emptyList(), asList("p1", "p2"), 2); + syncAndValidate(testTable); - // Write 2 (inserts) + Rollback of inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 20); - writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); + // trigger a commit and rollback + testTable.doWriteOperation("004", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 3); + syncTableMetadata(writeConfig); + // rollback last commit + testTable = testTable.doRollback("004", "005"); + syncAndValidate(testTable); - // Write 3 (updates) + Rollback of updates - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 20); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); + // trigger few upserts and validate + for (int i = 6; i < 10; i++) { + testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + } + syncAndValidate(testTable); - // Rollback of updates and inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 10); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); + testTable.doWriteOperation("010", UPSERT, emptyList(), asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable); - // Rollback of Compaction - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - validateMetadata(client); - } + // rollback last commit. sync and validate. + testTable.doRollback("010", "011"); + syncTableMetadata(writeConfig); - // Rollback of Deletes - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateDeletes(newCommitTime, 10); - JavaRDD deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey()); - client.startCommitWithTime(newCommitTime); - writeStatuses = client.delete(deleteKeys, newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); - - // Rollback of Clean - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.clean(newCommitTime); - validateMetadata(client); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); + // rollback of compaction + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("012", asList("p1", "p2")); + syncTableMetadata(writeConfig); + testTable.doRollback("012", "013"); + syncTableMetadata(writeConfig); } - // Rollback of partial commits - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(false).build())) { - // Write updates and inserts - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); - } + // roll back of delete + testTable.doWriteOperation("014", DELETE, emptyList(), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); + testTable.doRollback("014", "015"); + syncTableMetadata(writeConfig); - // Marker based rollback of partial commits - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(false, true, false).withRollbackUsingMarkers(true).build())) { - // Write updates and inserts - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - client.rollback(newCommitTime); - client.syncTableMetadata(); - validateMetadata(client); - } - } + // rollback partial commit + writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).build(); + testTable.doWriteOperation("016", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + testTable.doRollback("016", "017"); + syncTableMetadata(writeConfig); - /** - * Test when syncing rollback to metadata if the commit being rolled back has not been synced that essentially a no-op occurs to metadata. - * Once explicit sync is called, metadata should match. - */ - @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testRollbackUnsyncedCommit(HoodieTableType tableType) throws Exception { - init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Initialize table with metadata - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - } - - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // Commit with metadata disabled - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - client.rollback(newCommitTime); - } - - try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true))) { - assertFalse(metadata(client).isInSync()); - client.syncTableMetadata(); - validateMetadata(client); - } - - // If an unsynced commit is automatically rolled back during next commit, the rollback commit gets a timestamp - // greater than than the new commit which is started. Ensure that in this case the rollback is not processed - // as the earlier failed commit would not have been committed. - // - // Dataset: C1 C2 C3.inflight[failed] C4 R5[rolls back C3] - // Metadata: C1.delta C2.delta - // - // When R5 completes, C3.xxx will be deleted. When C4 completes, C4 and R5 will be committed to Metadata Table in - // that order. R5 should be neglected as C3 was never committed to metadata table. - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) { - // Metadata disabled and no auto-commit - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - // Not committed so left in inflight state - client.syncTableMetadata(); - assertTrue(metadata(client).isInSync()); - validateMetadata(client); - } - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) { - // Metadata enabled - // The previous commit will be rolled back automatically - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - validateMetadata(client); - } - - // In this scenario an async operations is started and completes around the same time of the failed commit. - // Rest of the reasoning is same as above test. - // C4.clean was an asynchronous clean started along with C3. The clean completed but C3 commit failed. - // - // Dataset: C1 C2 C3.inflight[failed] C4.clean C5 R6[rolls back C3] - // Metadata: C1.delta C2.delta - // - // When R6 completes, C3.xxx will be deleted. When C5 completes, C4, C5 and R6 will be committed to Metadata Table - // in that order. R6 should be neglected as C3 was never committed to metadata table. - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) { - // Metadata disabled and no auto-commit - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - // Not committed so left in inflight state - client.clean(); - client.syncTableMetadata(); - assertTrue(metadata(client).isInSync()); - validateMetadata(client); - } - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) { - // Metadata enabled - // The previous commit will be rolled back automatically - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - validateMetadata(client); - } + // marker-based rollback of partial commit + writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(true).build(); + testTable.doWriteOperation("018", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + testTable.doRollback("018", "019"); + syncAndValidate(testTable, true); } /** @@ -613,65 +414,50 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { @EnumSource(HoodieTableType.class) public void testManualRollbacks(HoodieTableType tableType) throws Exception { init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + doWriteOperationsAndBootstrapMetadata(testTable); // Setting to archive more aggressively on the Metadata Table than the Dataset final int maxDeltaCommitsBeforeCompaction = 4; final int minArchiveCommitsMetadata = 2; final int minArchiveCommitsDataset = 4; - HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) + writeConfig = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) .archiveCommitsWith(minArchiveCommitsMetadata, minArchiveCommitsMetadata + 1).retainCommits(1) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1) .retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build()) .build(); - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) { - // Initialize table with metadata - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - - // Perform multiple commits - for (int i = 1; i < 10; ++i) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - if (i == 1) { - records = dataGen.generateInserts(newCommitTime, 5); - } else { - records = dataGen.generateUpdates(newCommitTime, 2); - } - client.startCommitWithTime(newCommitTime); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); + for (int i = 3; i < 10; i++) { + if (i == 3) { + testTable.doWriteOperation("00" + i, UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); + } else { + testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); } - - // We can only rollback those commits whose deltacommit have not been archived yet. - int numRollbacks = 0; - boolean exceptionRaised = false; - - List allInstants = metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants() - .collect(Collectors.toList()); - for (HoodieInstant instantToRollback : allInstants) { - try { - client.rollback(instantToRollback.getTimestamp()); - client.syncTableMetadata(); - ++numRollbacks; - } catch (HoodieMetadataException e) { - exceptionRaised = true; - break; - } - } - - assertTrue(exceptionRaised, "Rollback of archived instants should fail"); - // Since each rollback also creates a deltacommit, we can only support rolling back of half of the original - // instants present before rollback started. - assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2, - "Rollbacks of non archived instants should work"); } + syncAndValidate(testTable, true); + + // We can only rollback those commits whose deltacommit have not been archived yet. + int numRollbacks = 0; + boolean exceptionRaised = false; + + List allInstants = metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants().collect(Collectors.toList()); + for (HoodieInstant instantToRollback : allInstants) { + try { + testTable.doRollback(instantToRollback.getTimestamp(), String.valueOf(Time.now())); + syncTableMetadata(writeConfig); + ++numRollbacks; + } catch (HoodieMetadataException e) { + exceptionRaised = true; + break; + } + } + + assertTrue(exceptionRaised, "Rollback of archived instants should fail"); + // Since each rollback also creates a deltacommit, we can only support rolling back of half of the original + // instants present before rollback started. + assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2, + "Rollbacks of non archived instants should work"); } /** @@ -679,148 +465,69 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { */ @ParameterizedTest @EnumSource(HoodieTableType.class) - @Disabled public void testSync(HoodieTableType tableType) throws Exception { init(tableType); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - String newCommitTime; - List records; - List writeStatuses; - // Initial commits without metadata table enabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateInserts(newCommitTime, 5); - client.startCommitWithTime(newCommitTime); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateInserts(newCommitTime, 5); - client.startCommitWithTime(newCommitTime); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - } - + writeConfig = getWriteConfigBuilder(true, false, false).build(); + testTable.doWriteOperation("001", BULK_INSERT, asList("p1", "p2"), asList("p1", "p2"), 1); + testTable.doWriteOperation("002", BULK_INSERT, asList("p1", "p2"), 1); // Enable metadata table so it initialized by listing from file system - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 5); - writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - validateMetadata(client); - assertTrue(metadata(client).isInSync()); - } - + testTable.doWriteOperation("003", INSERT, asList("p1", "p2"), 1); + syncAndValidate(testTable, emptyList(), true, true, true); // Various table operations without metadata table enabled - String restoreToInstant; - String inflightActionTimestamp; - String beforeInflightActionTimestamp; - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // updates - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 5); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); + testTable.doWriteOperation("004", UPSERT, asList("p1", "p2"), 1); + testTable.doWriteOperation("005", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable, emptyList(), false, true, true); - // updates and inserts - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 10); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - - // Compaction - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - assertTrue(metadata(client).isInSync()); - } - - // Savepoint - restoreToInstant = newCommitTime; - if (metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { - client.savepoint("hoodie", "metadata test"); - assertTrue(metadata(client).isInSync()); - } - - // Record a timestamp for creating an inflight instance for sync testing - inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime(); - beforeInflightActionTimestamp = newCommitTime; - - // Deletes - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - records = dataGen.generateDeletes(newCommitTime, 5); - JavaRDD deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey()); - client.startCommitWithTime(newCommitTime); - client.delete(deleteKeys, newCommitTime); - assertTrue(metadata(client).isInSync()); - - // Clean - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.clean(newCommitTime); - assertTrue(metadata(client).isInSync()); - - // updates - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 10); - writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); - - // insert overwrite to test replacecommit - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); - records = dataGen.generateInserts(newCommitTime, 5); - HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime); - writeStatuses = replaceResult.getWriteStatuses().collect(); - assertNoWriteErrors(writeStatuses); - assertTrue(metadata(client).isInSync()); + // trigger compaction + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("006", asList("p1", "p2")); + syncAndValidate(testTable, emptyList(), false, true, true); } - // If there is an incomplete operation, the Metadata Table is not updated beyond that operations but the + // trigger an upsert + testTable.doWriteOperation("008", UPSERT, asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), false, true, true); + + // savepoint + if (COPY_ON_WRITE.equals(tableType)) { + testTable.doSavepoint("008"); + syncAndValidate(testTable, emptyList(), false, true, true); + } + + // trigger delete + testTable.doWriteOperation("009", DELETE, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), false, true, true); + + // trigger clean + testTable.doCleanBasedOnCommits("010", asList("001", "002")); + syncAndValidate(testTable, emptyList(), false, true, true); + + // trigger another upsert + testTable.doWriteOperation("011", UPSERT, asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), false, true, true); + + // trigger clustering + testTable.doCluster("012", new HashMap<>()); + syncAndValidate(testTable, emptyList(), false, true, true); + + // If there is an inflight operation, the Metadata Table is not updated beyond that operations but the // in-memory merge should consider all the completed operations. - Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp)); - fs.create(inflightCleanPath).close(); + HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("007", UPSERT, emptyList(), + asList("p1", "p2", "p3"), 2, false, true); + // trigger upsert + testTable.doWriteOperation("013", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + // testTable validation will fetch only files pertaining to completed commits. So, validateMetadata() will skip files for 007 + // while validating against actual metadata table. + syncAndValidate(testTable, singletonList("007"), true, true, false); + // Remove the inflight instance holding back table sync + testTable.moveInflightCommitToComplete("007", inflightCommitMeta); + syncTableMetadata(writeConfig); + // A regular commit should get synced + testTable.doWriteOperation("014", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, emptyList(), true, true, true); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details - client.syncTableMetadata(); - - // Table should sync only before the inflightActionTimestamp - HoodieBackedTableMetadataWriter writer = - (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), beforeInflightActionTimestamp); - - // Reader should sync to all the completed instants - HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), - client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); - assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), newCommitTime); - - // Remove the inflight instance holding back table sync - fs.delete(inflightCleanPath, false); - client.syncTableMetadata(); - - writer = - (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime); - - // Reader should sync to all the completed instants - metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), - client.getConfig().getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); - assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime); - } - - // Enable metadata table and ensure it is synced + /* TODO: Restore to savepoint, enable metadata table and ensure it is synced try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { client.restoreToInstant(restoreToInstant); assertFalse(metadata(client).isInSync()); @@ -831,47 +538,41 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { validateMetadata(client); assertTrue(metadata(client).isInSync()); - } + }*/ } /** * Instants on Metadata Table should be archived as per config but we always keep atlest the number of instants * as on the dataset. Metadata Table should be automatically compacted as per config. */ - @Test - public void testCleaningArchivingAndCompaction() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testCleaningArchivingAndCompaction(HoodieTableType tableType) throws Exception { + init(tableType); + doWriteOperationsAndBootstrapMetadata(testTable); final int maxDeltaCommitsBeforeCompaction = 4; final int minArchiveLimit = 4; final int maxArchiveLimit = 6; - HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) + writeConfig = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) .archiveCommitsWith(minArchiveLimit - 2, maxArchiveLimit - 2).retainCommits(1) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveLimit, maxArchiveLimit) .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(true).build()) .build(); - - List records; - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) { - for (int i = 1; i < 10; ++i) { - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - if (i == 1) { - records = dataGen.generateInserts(newCommitTime, 5); - } else { - records = dataGen.generateUpdates(newCommitTime, 2); - } - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); + for (int i = 3; i < 10; i++) { + if (i == 3) { + testTable.doWriteOperation("00" + i, UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); + } else { + testTable.doWriteOperation("00" + i, UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); } } + syncAndValidate(testTable, true); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); - HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()).build(); + HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(writeConfig.getBasePath()).build(); HoodieActiveTimeline metadataTimeline = metadataMetaClient.getActiveTimeline(); // check that there are compactions. assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants() > 0); @@ -887,194 +588,109 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { /** * Test various error scenarios. */ - @Test - public void testErrorCases() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testErrorCases(HoodieTableType tableType) throws Exception { + init(tableType); // TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table // should be rolled back to last valid commit. - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateInserts(newCommitTime, 10); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - - newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 5); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed - // instant so that only the inflight is left over. - String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime); - assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME, - commitInstantFileName), false)); - } - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - String newCommitTime = client.startCommit(); - // Next insert - List records = dataGen.generateInserts(newCommitTime, 5); - List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - // Post rollback commit and metadata should be valid - validateMetadata(client); - } + testTable.doWriteOperation("001", UPSERT, asList("p1", "p2"), asList("p1", "p2"), 1); + syncAndValidate(testTable); + testTable.doWriteOperation("002", BULK_INSERT, emptyList(), asList("p1", "p2"), 1); + syncAndValidate(testTable); + // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed + // instant so that only the inflight is left over. + String commitInstantFileName = HoodieTimeline.makeCommitFileName("002"); + assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME, + commitInstantFileName), false)); + // Next upsert + testTable.doWriteOperation("003", UPSERT, emptyList(), asList("p1", "p2"), 1); + // Post rollback commit and metadata should be valid + syncTableMetadata(writeConfig); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieActiveTimeline timeline = metadataMetaClient.getActiveTimeline(); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "003"))); } /** * Test non-partitioned datasets. */ - //@Test - public void testNonPartitioned() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - HoodieTestDataGenerator nonPartitionedGenerator = new HoodieTestDataGenerator(new String[] {""}); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Write 1 (Bulk insert) - String newCommitTime = "001"; - List records = nonPartitionedGenerator.generateInserts(newCommitTime, 10); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - validateMetadata(client); - - List metadataPartitions = metadata(client).getAllPartitionPaths(); - assertTrue(metadataPartitions.contains(""), "Must contain empty partition"); - } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testNonPartitioned(HoodieTableType tableType) throws Exception { + init(tableType); + // Non-partitioned bulk insert + testTable.doWriteOperation("001", BULK_INSERT, emptyList(), 1); + syncTableMetadata(writeConfig); + List metadataPartitions = metadata(writeConfig, context).getAllPartitionPaths(); + assertTrue(metadataPartitions.isEmpty(), "Must contain empty partition"); } /** * Test various metrics published by metadata table. */ - @Test - public void testMetadataMetrics() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfigBuilder(true, true, true).build())) { - // Write - String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - validateMetadata(client); - - Registry metricsRegistry = Registry.getRegistry("HoodieMetadata"); - assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); - assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); - assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L); - assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size")); - assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size")); - assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count")); - assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count")); - } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataMetrics(HoodieTableType tableType) throws Exception { + init(tableType); + writeConfig = getWriteConfigBuilder(true, true, true).build(); + testTable.doWriteOperation(HoodieActiveTimeline.createNewInstantTime(), INSERT, asList("p1", "p2"), + asList("p1", "p2"), 2, true); + syncTableMetadata(writeConfig); + Registry metricsRegistry = Registry.getRegistry("HoodieMetadata"); + assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".count")); + assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR + ".totalDuration")); + assertTrue(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR + ".count") >= 1L); + assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size")); + assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size")); + assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count")); + assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.count")); } /** * Test when reading from metadata table which is out of sync with dataset that results are still consistent. */ - @Test - public void testMetadataOutOfSync() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - SparkRDDWriteClient unsyncedClient = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true)); - - // Enable metadata so table is initialized - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Perform Bulk Insert - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateInserts(newCommitTime, 20); - client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataOutOfSync(HoodieTableType tableType) throws Exception { + init(tableType); + testTable.doWriteOperation("001", BULK_INSERT, asList("p1", "p2"), asList("p1", "p2"), 1); + // Enable metadata so table is initialized but do not sync + syncAndValidate(testTable, emptyList(), true, false, false); + // Perform an insert and upsert + testTable.doWriteOperation("002", INSERT, asList("p1", "p2"), 1); + testTable.doWriteOperation("003", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 1); + // Run compaction for MOR table + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("004", asList("p1", "p2")); } - - // Perform commit operations with metadata disabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // Perform Insert - String newCommitTime = "002"; - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateInserts(newCommitTime, 20); - client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - - // Perform Upsert - newCommitTime = "003"; - client.startCommitWithTime(newCommitTime); - records = dataGen.generateUniqueUpdates(newCommitTime, 20); - client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - - // Compaction - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = "004"; - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - } + assertFalse(metadata(writeConfig, context).isInSync()); + testTable.doWriteOperation("005", UPSERT, asList("p1", "p2", "p3"), 1); + if (MERGE_ON_READ.equals(tableType)) { + testTable = testTable.doCompaction("006", asList("p1", "p2")); } - - assertFalse(metadata(unsyncedClient).isInSync()); - validateMetadata(unsyncedClient); - - // Perform clean operation with metadata disabled - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - // One more commit needed to trigger clean so upsert and compact - String newCommitTime = "005"; - client.startCommitWithTime(newCommitTime); - List records = dataGen.generateUpdates(newCommitTime, 20); - client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - - if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { - newCommitTime = "006"; - client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); - client.compact(newCommitTime); - } - - // Clean - newCommitTime = "007"; - client.clean(newCommitTime); - } - - assertFalse(metadata(unsyncedClient).isInSync()); - validateMetadata(unsyncedClient); - - // Perform restore with metadata disabled + testTable.doCleanBasedOnCommits("007", singletonList("001")); + /* TODO: Perform restore with metadata disabled try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { client.restoreToInstant("004"); - } - - assertFalse(metadata(unsyncedClient).isInSync()); - validateMetadata(unsyncedClient); + }*/ + assertFalse(metadata(writeConfig, context).isInSync()); + syncAndValidate(testTable, emptyList(), true, true, true, true); } /** * Test that failure to perform deltacommit on the metadata table does not lead to missed sync. */ - @Test - public void testMetdataTableCommitFailure() throws Exception { - init(HoodieTableType.COPY_ON_WRITE); - HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - // Write 1 - String newCommitTime = "001"; - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - - // Write 2 - newCommitTime = "002"; - client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 20); - writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetdataTableCommitFailure(HoodieTableType tableType) throws Exception { + init(tableType); + testTable.doWriteOperation("001", INSERT, asList("p1", "p2"), asList("p1", "p2"), 2, true); + syncTableMetadata(writeConfig); + testTable.doWriteOperation("002", INSERT, asList("p1", "p2"), 2, true); + syncTableMetadata(writeConfig); // At this time both commits 001 and 002 must be synced to the metadata table HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); @@ -1089,29 +705,112 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { assertTrue(timeline.containsInstant(new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); // In this commit deltacommit "002" will be rolled back and attempted again. - String latestCommitTime = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - String newCommitTime = "003"; - List records = dataGen.generateInserts(newCommitTime, 20); - client.startCommitWithTime(newCommitTime); - client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); - - records = dataGen.generateInserts(latestCommitTime, 20); - client.startCommitWithTime(latestCommitTime); - List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), latestCommitTime).collect(); - assertNoWriteErrors(writeStatuses); - } + testTable.doWriteOperation("003", BULK_INSERT, singletonList("p3"), asList("p1", "p2", "p3"), 2); + syncTableMetadata(writeConfig); timeline = metadataMetaClient.reloadActiveTimeline(); assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); - assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, latestCommitTime))); - assertTrue(timeline.getRollbackTimeline().countInstants() == 1); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "003"))); + assertEquals(1, timeline.getRollbackTimeline().countInstants()); } /** - * Validate the metadata tables contents to ensure it matches what is on the file system. + * Tests that if timeline has an inflight commit midway, metadata syncs only completed commits (including later to inflight commit). */ + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testInFlightCommit(HoodieTableType tableType) throws Exception { + init(tableType); + // bootstrap w/ 2 commits + doWriteOperationsAndBootstrapMetadata(testTable); + + // trigger an upsert + testTable.doWriteOperation("003", UPSERT, singletonList("p3"), asList("p1", "p2", "p3"), 3); + syncAndValidate(testTable); + + // trigger an upsert + testTable.doWriteOperation("005", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable); + + // create an inflight commit. + HoodieCommitMetadata inflightCommitMeta = testTable.doWriteOperation("006", UPSERT, emptyList(), + asList("p1", "p2", "p3"), 2, false, true); + + // trigger upsert + testTable.doWriteOperation("007", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + // testTable validation will fetch only files pertaining to completed commits. So, validateMetadata() will skip files for 006 + // while validating against actual metadata table. + syncAndValidate(testTable, singletonList("006"), writeConfig.isMetadataTableEnabled(), writeConfig.getMetadataConfig().enableSync(), false); + + // Remove the inflight instance holding back table sync + testTable.moveInflightCommitToComplete("006", inflightCommitMeta); + syncTableMetadata(writeConfig); + + // A regular commit should get synced + testTable.doWriteOperation("008", UPSERT, emptyList(), asList("p1", "p2", "p3"), 2); + syncAndValidate(testTable, true); + } + + private void doWriteOperationsAndBootstrapMetadata(HoodieTestTable testTable) throws Exception { + testTable.doWriteOperation("001", INSERT, asList("p1", "p2"), asList("p1", "p2"), + 2, true); + testTable.doWriteOperation("002", UPSERT, asList("p1", "p2"), + 2, true); + syncAndValidate(testTable); + } + + private void syncAndValidate(HoodieTestTable testTable) throws IOException { + syncAndValidate(testTable, emptyList(), writeConfig.isMetadataTableEnabled(), writeConfig.getMetadataConfig().enableSync(), true); + } + + private void syncAndValidate(HoodieTestTable testTable, boolean doFullValidation) throws IOException { + syncAndValidate(testTable, emptyList(), writeConfig.isMetadataTableEnabled(), writeConfig.getMetadataConfig().enableSync(), true, doFullValidation); + } + + private void syncAndValidate(HoodieTestTable testTable, List inflightCommits, boolean enableMetadata, + boolean enableMetadataSync, boolean enableValidation) throws IOException { + syncAndValidate(testTable, inflightCommits, enableMetadata, enableMetadataSync, enableValidation, false); + } + + private void syncAndValidate(HoodieTestTable testTable, List inflightCommits, boolean enableMetadata, + boolean enableMetadataSync, boolean enableValidation, boolean doFullValidation) throws IOException { + writeConfig.getMetadataConfig().setValue(HoodieMetadataConfig.ENABLE, String.valueOf(enableMetadata)); + writeConfig.getMetadataConfig().setValue(HoodieMetadataConfig.SYNC_ENABLE, String.valueOf(enableMetadataSync)); + syncTableMetadata(writeConfig); + validateMetadata(testTable, inflightCommits, writeConfig, metadataTableBasePath, doFullValidation); + } + + private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) { + return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build(); + } + + private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { + return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics); + } + + private HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2) + .withAutoCommit(autoCommit) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1) + .withFailedWritesCleaningPolicy(policy) + .withAutoClean(false).retainCommits(1).retainFileVersions(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build()) + .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(useFileListingMetadata) + .enableMetrics(enableMetrics).build()) + .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) + .withExecutorMetrics(true).build()) + .withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() + .usePrefix("unit-test").build()); + } + private void validateMetadata(SparkRDDWriteClient testClient) throws IOException { HoodieWriteConfig config = testClient.getConfig(); @@ -1273,44 +972,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { clientConfig.getSpillableMapBasePath()); } - // TODO: this can be moved to TestHarness after merge from master - private void assertNoWriteErrors(List statuses) { - // Verify there are no errors - for (WriteStatus status : statuses) { - assertFalse(status.hasErrors(), "Errors found in write of " + status.getFileId()); - } - } - - private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileListingMetadata) { - return getWriteConfigBuilder(autoCommit, useFileListingMetadata, false).build(); - } - - private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { - return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, autoCommit, useFileListingMetadata, enableMetrics); - } - - private HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2) - .withAutoCommit(autoCommit) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1) - .withFailedWritesCleaningPolicy(policy) - .withAutoClean(false).retainCommits(1).retainFileVersions(1).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build()) - .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") - .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() - .withEnableBackupForRemoteFileSystemView(false).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(useFileListingMetadata) - .enableMetrics(enableMetrics).build()) - .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) - .withExecutorMetrics(true).build()) - .withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() - .usePrefix("unit-test").build()); - } - @Override protected HoodieTableType getTableType() { return tableType; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 1e52e4494..c734daecd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -21,7 +21,11 @@ import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; @@ -29,15 +33,26 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadStat; import org.apache.hadoop.conf.Configuration; @@ -53,19 +68,31 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import scala.Tuple2; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + /** * The test harness for resource initialization and cleanup. */ @@ -149,7 +176,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } /** - * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) + * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) * with a default name matching the name of the class. */ protected void initSparkContexts() { @@ -376,9 +403,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, - FileStatus[] fileStatuses) { + FileStatus[] fileStatuses) { if (tableView == null) { - tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); + tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); } else { tableView.init(metaClient, visibleActiveTimeline, fileStatuses); } @@ -418,4 +445,175 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } return Pair.of(partitionPathStatMap, globalStat); } + + /** + * Validate the metadata tables contents to ensure it matches what is on the file system. + */ + public void validateMetadata(HoodieTestTable testTable, List inflightCommits, HoodieWriteConfig writeConfig, + String metadataTableBasePath, boolean doFullValidation) throws IOException { + HoodieTableMetadata tableMetadata = metadata(writeConfig, context); + assertNotNull(tableMetadata, "MetadataReader should have been initialized"); + if (!writeConfig.isMetadataTableEnabled() || !writeConfig.getMetadataConfig().validateFileListingMetadata()) { + return; + } + + assertEquals(inflightCommits, testTable.inflightCommits()); + + HoodieTimer timer = new HoodieTimer().startTimer(); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + // Partitions should match + List fsPartitionPaths = testTable.getAllPartitionPaths(); + List fsPartitions = new ArrayList<>(); + fsPartitionPaths.forEach(entry -> fsPartitions.add(entry.getFileName().toString())); + List metadataPartitions = tableMetadata.getAllPartitionPaths(); + + Collections.sort(fsPartitions); + Collections.sort(metadataPartitions); + + assertEquals(fsPartitions.size(), metadataPartitions.size(), "Partitions should match"); + assertEquals(fsPartitions, metadataPartitions, "Partitions should match"); + + // Files within each partition should match + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext); + TableFileSystemView tableView = table.getHoodieView(); + List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); + Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); + assertEquals(fsPartitions.size(), partitionToFilesMap.size()); + + fsPartitions.forEach(partition -> { + try { + validateFilesPerPartition(testTable, tableMetadata, tableView, partitionToFilesMap, partition); + } catch (IOException e) { + fail("Exception should not be raised: " + e); + } + }); + if (doFullValidation) { + runFullValidation(writeConfig, metadataTableBasePath, engineContext); + } + + LOG.info("Validation time=" + timer.endTimer()); + } + + public void syncTableMetadata(HoodieWriteConfig writeConfig) { + if (!writeConfig.getMetadataConfig().enableSync()) { + return; + } + // Open up the metadata table again, for syncing + try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, writeConfig, context)) { + LOG.info("Successfully synced to metadata table"); + } catch (Exception e) { + throw new HoodieMetadataException("Error syncing to metadata table.", e); + } + } + + public HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteConfig clientConfig) { + return (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter + .create(hadoopConf, clientConfig, new HoodieSparkEngineContext(jsc)); + } + + public HoodieTableMetadata metadata(HoodieWriteConfig clientConfig, HoodieEngineContext hoodieEngineContext) { + return HoodieTableMetadata.create(hoodieEngineContext, clientConfig.getMetadataConfig(), clientConfig.getBasePath(), + clientConfig.getSpillableMapBasePath()); + } + + private void validateFilesPerPartition(HoodieTestTable testTable, HoodieTableMetadata tableMetadata, TableFileSystemView tableView, + Map partitionToFilesMap, String partition) throws IOException { + Path partitionPath; + if (partition.equals("")) { + // Should be the non-partitioned case + partitionPath = new Path(basePath); + } else { + partitionPath = new Path(basePath, partition); + } + + FileStatus[] fsStatuses = testTable.listAllFilesInPartition(partition); + FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath); + List fsFileNames = Arrays.stream(fsStatuses) + .map(s -> s.getPath().getName()).collect(Collectors.toList()); + List metadataFilenames = Arrays.stream(metaStatuses) + .map(s -> s.getPath().getName()).collect(Collectors.toList()); + Collections.sort(fsFileNames); + Collections.sort(metadataFilenames); + + assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length); + + if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) { + LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray())); + LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray())); + + for (String fileName : fsFileNames) { + if (!metadataFilenames.contains(fileName)) { + LOG.error(partition + "FsFilename " + fileName + " not found in Meta data"); + } + } + for (String fileName : metadataFilenames) { + if (!fsFileNames.contains(fileName)) { + LOG.error(partition + "Metadata file " + fileName + " not found in original FS"); + } + } + } + + // Block sizes should be valid + Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0)); + List fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList()); + List metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).sorted().collect(Collectors.toList()); + assertEquals(fsBlockSizes, metadataBlockSizes); + + assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match"); + assertEquals(fsFileNames, metadataFilenames, "Files within partition " + partition + " should match"); + + // FileSystemView should expose the same data + List fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList()); + fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList())); + + fileGroups.forEach(g -> LogManager.getLogger(getClass()).info(g)); + fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(getClass()).info(b))); + fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> LogManager.getLogger(getClass()).info(s))); + + long numFiles = fileGroups.stream() + .mapToLong(g -> g.getAllBaseFiles().count() + g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum()) + .sum(); + assertEquals(metadataFilenames.size(), numFiles); + } + + private void runFullValidation(HoodieWriteConfig writeConfig, String metadataTableBasePath, HoodieSparkEngineContext engineContext) { + HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(writeConfig); + assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + + // Validate write config for metadata table + HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); + assertFalse(metadataWriteConfig.isMetadataTableEnabled(), "No metadata table for metadata table"); + + // Metadata table should be in sync with the dataset + assertTrue(metadata(writeConfig, engineContext).isInSync()); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + + // Metadata table is MOR + assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR"); + + // Metadata table is HFile format + assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE, + "Metadata Table base file format should be HFile"); + + // Metadata table has a fixed number of partitions + // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory + // in the .hoodie folder. + List metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, HoodieTableMetadata.getMetadataTableBasePath(basePath), + false, false, false); + Assertions.assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size()); + + // Metadata table should automatically compact and clean + // versions are +1 as autoclean / compaction happens end of commits + int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1; + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); + metadataTablePartitions.forEach(partition -> { + List latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList()); + assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file"); + assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice"); + assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to " + + numFileVersions + " but was " + latestSlices.size()); + }); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 5c439f51a..e96fcce6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -167,6 +167,9 @@ public class FSUtils { } public static String getCommitTime(String fullFileName) { + if (isLogFile(new Path(fullFileName))) { + return fullFileName.split("_")[1].split("\\.")[0]; + } return fullFileName.split("_")[2].split("\\.")[0]; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index c345cc7af..ef8b09b51 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -50,6 +50,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -157,6 +158,9 @@ public class TestFSUtils extends HoodieCommonTestHarness { String fileName = UUID.randomUUID().toString(); String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName); assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); + // test log file name + fullFileName = FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), instantTime, 1, TEST_WRITE_TOKEN); + assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); } @Test diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index bb6c0b491..600ee1673 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -39,6 +39,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.io.RandomAccessFile; @@ -48,8 +50,11 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.FileTime; import java.time.Instant; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan; @@ -59,6 +64,8 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serial public class FileCreateUtils { + private static final Logger LOG = LogManager.getLogger(FileCreateUtils.class); + private static final String WRITE_TOKEN = "1-0-1"; private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); @@ -216,6 +223,10 @@ public class FileCreateUtils { createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION); } + public static void createInflightCompaction(String basePath, String instantTime) throws IOException { + createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION); + } + public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); @@ -307,6 +318,13 @@ public class FileCreateUtils { .endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, ioType))).count(); } + public static List getPartitionPaths(Path basePath) throws IOException { + if (Files.notExists(basePath)) { + return Collections.emptyList(); + } + return Files.list(basePath).filter(entry -> !entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()); + } + /** * Find total basefiles for passed in paths. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java index 76fdf18d4..95188bb0b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileSystemTestUtils.java @@ -75,7 +75,11 @@ public class FileSystemTestUtils { } public static List listRecursive(FileSystem fs, Path path) throws IOException { - RemoteIterator itr = fs.listFiles(path, true); + return listFiles(fs, path, true); + } + + public static List listFiles(FileSystem fs, Path path, boolean recursive) throws IOException { + RemoteIterator itr = fs.listFiles(path, recursive); List statuses = new ArrayList<>(); while (itr.hasNext()) { statuses.add(itr.next()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index e6c488e49..099fec287 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -19,52 +19,79 @@ package org.apache.hudi.common.testutils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata; +import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Paths; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import static java.time.temporal.ChronoUnit.SECONDS; +import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ; +import static org.apache.hudi.common.model.WriteOperationType.CLUSTER; +import static org.apache.hudi.common.model.WriteOperationType.COMPACT; +import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; import static org.apache.hudi.common.testutils.FileCreateUtils.createCleanFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCleanFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile; @@ -77,9 +104,19 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDe import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createRollbackFile; import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName; +import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; +import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap; +import static org.apache.hudi.common.util.CommitUtils.buildMetadata; +import static org.apache.hudi.common.util.CommitUtils.getCommitActionType; +import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; public class HoodieTestTable { + private static final Logger LOG = LogManager.getLogger(HoodieTestTable.class); + private static final Random RANDOM = new Random(); + private static HoodieTestTableState testTableState; + private final List inflightCommits = new ArrayList<>(); + protected final String basePath; protected final FileSystem fs; protected HoodieTableMetaClient metaClient; @@ -94,6 +131,7 @@ public class HoodieTestTable { } public static HoodieTestTable of(HoodieTableMetaClient metaClient) { + testTableState = HoodieTestTableState.of(); return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient); } @@ -130,6 +168,7 @@ public class HoodieTestTable { public HoodieTestTable addInflightCommit(String instantTime) throws Exception { createRequestedCommit(basePath, instantTime); createInflightCommit(basePath, instantTime); + inflightCommits.add(instantTime); currentInstantTime = instantTime; metaClient = HoodieTableMetaClient.reload(metaClient); return this; @@ -144,6 +183,28 @@ public class HoodieTestTable { return this; } + public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, HoodieTestTableState testTableState) { + String actionType = getCommitActionType(operationType, metaClient.getTableType()); + return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, false, actionType); + } + + public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, + HoodieTestTableState testTableState, boolean bootstrap) { + String actionType = getCommitActionType(operationType, metaClient.getTableType()); + return createCommitMetadata(operationType, commitTime, Collections.emptyMap(), testTableState, bootstrap, actionType); + } + + public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, + Map> partitionToReplaceFileIds, + HoodieTestTableState testTableState, boolean bootstrap, String action) { + List writeStats = generateHoodieWriteStatForPartition(testTableState.getPartitionToBaseFileInfoMap(commitTime), commitTime, bootstrap); + if (MERGE_ON_READ.equals(metaClient.getTableType()) && UPSERT.equals(operationType)) { + writeStats.addAll(generateHoodieWriteStatForPartitionLogFiles(testTableState.getPartitionToLogFileInfoMap(commitTime), commitTime, bootstrap)); + } + Map extraMetadata = createImmutableMap("test", "test"); + return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, EMPTY_STRING, action); + } + public HoodieTestTable addCommit(String instantTime, HoodieCommitMetadata metadata) throws Exception { createRequestedCommit(basePath, instantTime); createInflightCommit(basePath, instantTime); @@ -153,6 +214,14 @@ public class HoodieTestTable { return this; } + public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { + createCommit(basePath, instantTime, metadata); + inflightCommits.remove(instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + public HoodieTestTable addDeltaCommit(String instantTime) throws Exception { createRequestedDeltaCommit(basePath, instantTime); createInflightDeltaCommit(basePath, instantTime); @@ -199,6 +268,31 @@ public class HoodieTestTable { return this; } + public HoodieTestTable addClean(String instantTime) throws IOException { + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING), EMPTY_STRING, new HashMap<>(), + CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + HoodieCleanStat cleanStats = new HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, + HoodieTestUtils.DEFAULT_PARTITION_PATHS[RANDOM.nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + instantTime); + HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); + return HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata); + } + + public Pair getHoodieCleanMetadata(String commitTime, HoodieTestTableState testTableState) { + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING), EMPTY_STRING, new HashMap<>(), + CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + List cleanStats = new ArrayList<>(); + for (Map.Entry> entry : testTableState.getPartitionToFileIdMapForCleaner(commitTime).entrySet()) { + cleanStats.add(new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, + entry.getKey(), entry.getValue(), entry.getValue(), Collections.emptyList(), commitTime)); + } + return Pair.of(cleanerPlan, convertCleanMetadata(commitTime, Option.of(0L), cleanStats)); + } + public HoodieTestTable addInflightRollback(String instantTime) throws IOException { createInflightRollbackFile(basePath, instantTime); currentInstantTime = instantTime; @@ -214,6 +308,61 @@ public class HoodieTestTable { return this; } + public HoodieRollbackMetadata getRollbackMetadata(String instantTimeToDelete, Map> partitionToFilesMeta) throws Exception { + HoodieRollbackMetadata rollbackMetadata = new HoodieRollbackMetadata(); + rollbackMetadata.setCommitsRollback(Collections.singletonList(instantTimeToDelete)); + rollbackMetadata.setStartRollbackTime(instantTimeToDelete); + Map partitionMetadataMap = new HashMap<>(); + for (Map.Entry> entry : partitionToFilesMeta.entrySet()) { + HoodieRollbackPartitionMetadata rollbackPartitionMetadata = new HoodieRollbackPartitionMetadata(); + rollbackPartitionMetadata.setPartitionPath(entry.getKey()); + rollbackPartitionMetadata.setSuccessDeleteFiles(entry.getValue()); + rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>()); + rollbackPartitionMetadata.setWrittenLogFiles(getWrittenLogFiles(instantTimeToDelete, entry)); + rollbackPartitionMetadata.setRollbackLogFiles(createImmutableMap(logFileName(instantTimeToDelete, UUID.randomUUID().toString(), 0), (long) (100 + RANDOM.nextInt(500)))); + partitionMetadataMap.put(entry.getKey(), rollbackPartitionMetadata); + } + rollbackMetadata.setPartitionMetadata(partitionMetadataMap); + rollbackMetadata.setInstantsRollback(Collections.singletonList(new HoodieInstantInfo(instantTimeToDelete, HoodieTimeline.ROLLBACK_ACTION))); + return rollbackMetadata; + } + + /** + * Return a map of log file name to file size that were expected to be rolled back in that partition. + */ + private Map getWrittenLogFiles(String instant, Map.Entry> entry) { + Map writtenLogFiles = new HashMap<>(); + for (String fileName : entry.getValue()) { + if (FSUtils.isLogFile(new Path(fileName))) { + if (testTableState.getPartitionToLogFileInfoMap(instant) != null + && testTableState.getPartitionToLogFileInfoMap(instant).containsKey(entry.getKey())) { + List> fileInfos = testTableState.getPartitionToLogFileInfoMap(instant).get(entry.getKey()); + for (Pair fileInfo : fileInfos) { + if (fileName.equals(logFileName(instant, fileInfo.getLeft(), fileInfo.getRight()[0]))) { + writtenLogFiles.put(fileName, Long.valueOf(fileInfo.getRight()[1])); + } + } + } + } + } + return writtenLogFiles; + } + + public HoodieSavepointMetadata getSavepointMetadata(String instant, Map> partitionToFilesMeta) { + HoodieSavepointMetadata savepointMetadata = new HoodieSavepointMetadata(); + savepointMetadata.setSavepointedAt(Long.valueOf(instant)); + Map partitionMetadataMap = new HashMap<>(); + for (Map.Entry> entry : partitionToFilesMeta.entrySet()) { + HoodieSavepointPartitionMetadata savepointPartitionMetadata = new HoodieSavepointPartitionMetadata(); + savepointPartitionMetadata.setPartitionPath(entry.getKey()); + savepointPartitionMetadata.setSavepointDataFile(entry.getValue()); + partitionMetadataMap.put(entry.getKey(), savepointPartitionMetadata); + } + savepointMetadata.setPartitionMetadata(partitionMetadataMap); + savepointMetadata.setSavepointedBy("test"); + return savepointMetadata; + } + public HoodieTestTable addRequestedCompaction(String instantTime) throws IOException { createRequestedCompaction(basePath, instantTime); currentInstantTime = instantTime; @@ -235,6 +384,13 @@ public class HoodieTestTable { return addRequestedCompaction(instantTime, plan); } + public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception { + createRequestedCompaction(basePath, instantTime); + createInflightCompaction(basePath, instantTime); + return HoodieTestTable.of(metaClient) + .addCommit(instantTime, commitMetadata); + } + public HoodieTestTable forCommit(String instantTime) { currentInstantTime = instantTime; return this; @@ -311,6 +467,13 @@ public class HoodieTestTable { return this; } + public HoodieTestTable withBaseFilesInPartition(String partition, List> fileInfos) throws Exception { + for (Pair fileInfo : fileInfos) { + FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, fileInfo.getKey(), fileInfo.getValue()); + } + return this; + } + public String getFileIdWithLogFile(String partitionPath) throws Exception { String fileId = UUID.randomUUID().toString(); withLogFile(partitionPath, fileId); @@ -328,6 +491,13 @@ public class HoodieTestTable { return this; } + public HoodieTestTable withLogFilesInPartition(String partition, List> fileInfos) throws Exception { + for (Pair fileInfo : fileInfos) { + FileCreateUtils.createLogFile(basePath, partition, currentInstantTime, fileInfo.getKey(), fileInfo.getValue()[0], fileInfo.getValue()[1]); + } + return this; + } + public boolean inflightCommitExists(String instantTime) { try { return fs.exists(getInflightCommitFilePath(instantTime)); @@ -388,6 +558,11 @@ public class HoodieTestTable { return new Path(Paths.get(basePath, partition).toUri()); } + public List getAllPartitionPaths() throws IOException { + java.nio.file.Path basePathPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).getParent().getParent(); + return FileCreateUtils.getPartitionPaths(basePathPath); + } + public Path getBaseFilePath(String partition, String fileId) { return new Path(Paths.get(basePath, partition, getBaseFileNameById(fileId)).toUri()); } @@ -396,6 +571,24 @@ public class HoodieTestTable { return baseFileName(currentInstantTime, fileId); } + public Path getLogFilePath(String partition, String fileId, int version) { + return new Path(Paths.get(basePath, partition, getLogFileNameById(fileId, version)).toString()); + } + + public String getLogFileNameById(String fileId, int version) { + return logFileName(currentInstantTime, fileId, version); + } + + public List getEarliestFilesInPartition(String partition, int count) throws IOException { + List fileStatuses = Arrays.asList(listAllFilesInPartition(partition)); + fileStatuses.sort(Comparator.comparing(FileStatus::getModificationTime)); + return fileStatuses.subList(0, count).stream().map(entry -> entry.getPath().getName()).collect(Collectors.toList()); + } + + public List inflightCommits() { + return this.inflightCommits; + } + public FileStatus[] listAllBaseFiles() throws IOException { return listAllBaseFiles(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension()); } @@ -421,16 +614,356 @@ public class HoodieTestTable { } public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException { - return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]); + return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream() + .filter(entry -> { + boolean toReturn = true; + String fileName = entry.getPath().getName(); + if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + toReturn = false; + } else { + for (String inflight : inflightCommits) { + if (fileName.contains(inflight)) { + toReturn = false; + break; + } + } + } + return toReturn; + }).toArray(FileStatus[]::new); } public FileStatus[] listAllFilesInTempFolder() throws IOException { return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]); } + public void deleteFilesInPartition(String partitionPath, List filesToDelete) throws IOException { + FileStatus[] allFiles = listAllFilesInPartition(partitionPath); + Arrays.stream(allFiles).filter(entry -> filesToDelete.contains(entry.getPath().getName())).forEach(entry -> { + try { + Files.delete(Paths.get(basePath, partitionPath, entry.getPath().getName())); + } catch (IOException e) { + throw new HoodieTestTableException(e); + } + }); + } + + public HoodieTestTable doRollback(String commitTimeToRollback, String commitTime) throws Exception { + Option commitMetadata = getMetadataForInstant(commitTimeToRollback); + if (!commitMetadata.isPresent()) { + throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTimeToRollback); + } + Map> partitionFiles = getPartitionFiles(commitMetadata.get()); + HoodieRollbackMetadata rollbackMetadata = getRollbackMetadata(commitTimeToRollback, partitionFiles); + for (Map.Entry> entry : partitionFiles.entrySet()) { + deleteFilesInPartition(entry.getKey(), entry.getValue()); + } + return addRollback(commitTime, rollbackMetadata); + } + + public HoodieTestTable doCluster(String commitTime, Map> partitionToReplaceFileIds) throws Exception { + Map>> partitionToReplaceFileIdsWithLength = new HashMap<>(); + for (Map.Entry> entry : partitionToReplaceFileIds.entrySet()) { + String partition = entry.getKey(); + partitionToReplaceFileIdsWithLength.put(entry.getKey(), new ArrayList<>()); + for (String fileId : entry.getValue()) { + int length = 100 + RANDOM.nextInt(500); + partitionToReplaceFileIdsWithLength.get(partition).add(Pair.of(fileId, length)); + } + } + List writeStats = generateHoodieWriteStatForPartition(partitionToReplaceFileIdsWithLength, commitTime, false); + HoodieReplaceCommitMetadata replaceMetadata = + (HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, EMPTY_STRING, REPLACE_COMMIT_ACTION); + return addReplaceCommit(commitTime, Option.empty(), Option.empty(), replaceMetadata); + } + + public HoodieCleanMetadata doClean(String commitTime, Map partitionFileCountsToDelete) throws IOException { + Map> partitionFilesToDelete = new HashMap<>(); + for (Map.Entry entry : partitionFileCountsToDelete.entrySet()) { + partitionFilesToDelete.put(entry.getKey(), getEarliestFilesInPartition(entry.getKey(), entry.getValue())); + } + HoodieTestTableState testTableState = new HoodieTestTableState(); + for (Map.Entry> entry : partitionFilesToDelete.entrySet()) { + testTableState = testTableState.createTestTableStateForCleaner(commitTime, entry.getKey(), entry.getValue()); + deleteFilesInPartition(entry.getKey(), entry.getValue()); + } + Pair cleanerMeta = getHoodieCleanMetadata(commitTime, testTableState); + addClean(commitTime, cleanerMeta.getKey(), cleanerMeta.getValue()); + return cleanerMeta.getValue(); + } + + public HoodieCleanMetadata doCleanBasedOnCommits(String cleanCommitTime, List commitsToClean) throws IOException { + Map partitionFileCountsToDelete = new HashMap<>(); + for (String commitTime : commitsToClean) { + Option commitMetadata = getMetadataForInstant(commitTime); + if (commitMetadata.isPresent()) { + Map> partitionFiles = getPartitionFiles(commitMetadata.get()); + for (String partition : partitionFiles.keySet()) { + partitionFileCountsToDelete.put(partition, partitionFiles.get(partition).size() + partitionFileCountsToDelete.getOrDefault(partition, 0)); + } + } + } + return doClean(cleanCommitTime, partitionFileCountsToDelete); + } + + public HoodieSavepointMetadata doSavepoint(String commitTime) throws IOException { + Option commitMetadata = getMetadataForInstant(commitTime); + if (!commitMetadata.isPresent()) { + throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTime); + } + Map> partitionFiles = getPartitionFiles(commitMetadata.get()); + HoodieSavepointMetadata savepointMetadata = getSavepointMetadata(commitTime, partitionFiles); + for (Map.Entry> entry : partitionFiles.entrySet()) { + deleteFilesInPartition(entry.getKey(), entry.getValue()); + } + return savepointMetadata; + } + + public HoodieTestTable doCompaction(String commitTime, List partitions) throws Exception { + this.currentInstantTime = commitTime; + if (partitions.isEmpty()) { + partitions = Collections.singletonList(EMPTY_STRING); + } + HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(COMPACT, metaClient.getTableType(), commitTime, partitions, 1); + HoodieCommitMetadata commitMetadata = createCommitMetadata(COMPACT, commitTime, testTableState); + for (String partition : partitions) { + this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition)); + } + return addCompaction(commitTime, commitMetadata); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List partitions, int filesPerPartition) throws Exception { + return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, false); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List newPartitionsToAdd, List partitions, + int filesPerPartition) throws Exception { + return doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitions, filesPerPartition, false); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List newPartitionsToAdd, List partitions, + int filesPerPartition, boolean bootstrap) throws Exception { + return doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitions, filesPerPartition, bootstrap, false); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List partitions, int filesPerPartition, boolean bootstrap) throws Exception { + return doWriteOperation(commitTime, operationType, Collections.emptyList(), partitions, filesPerPartition, bootstrap, false); + } + + public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationType operationType, + List newPartitionsToAdd, List partitions, + int filesPerPartition, boolean bootstrap, boolean createInflightCommit) throws Exception { + if (partitions.isEmpty()) { + partitions = Collections.singletonList(EMPTY_STRING); + } + HoodieTestTableState testTableState = getTestTableStateWithPartitionFileInfo(operationType, metaClient.getTableType(), commitTime, partitions, filesPerPartition); + HoodieCommitMetadata commitMetadata = createCommitMetadata(operationType, commitTime, testTableState, bootstrap); + for (String str : newPartitionsToAdd) { + this.withPartitionMetaFiles(str); + } + if (createInflightCommit) { + this.addInflightCommit(commitTime); + } else { + this.addCommit(commitTime, commitMetadata); + } + for (String partition : partitions) { + this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition)); + if (MERGE_ON_READ.equals(metaClient.getTableType()) && UPSERT.equals(operationType)) { + this.withLogFilesInPartition(partition, testTableState.getPartitionToLogFileInfoMap(commitTime).get(partition)); + } + } + return commitMetadata; + } + + private Option getMetadataForInstant(String instantTime) { + Option hoodieInstant = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().filter(i -> i.getTimestamp().equals(instantTime)).firstInstant(); + try { + if (hoodieInstant.isPresent()) { + switch (hoodieInstant.get().getAction()) { + case HoodieTimeline.REPLACE_COMMIT_ACTION: + HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant.get()).get(), HoodieReplaceCommitMetadata.class); + return Option.of(replaceCommitMetadata); + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMMIT_ACTION: + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant.get()).get(), HoodieCommitMetadata.class); + return Option.of(commitMetadata); + default: + throw new IllegalArgumentException("Unknown instant action" + hoodieInstant.get().getAction()); + } + } else { + return Option.empty(); + } + } catch (IOException io) { + throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstant.get(), io); + } + } + + private static Map> getPartitionFiles(HoodieCommitMetadata commitMetadata) { + Map> partitionFilesToDelete = new HashMap<>(); + Map> partitionToWriteStats = commitMetadata.getPartitionToWriteStats(); + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + partitionFilesToDelete.put(entry.getKey(), new ArrayList<>()); + entry.getValue().forEach(writeStat -> partitionFilesToDelete.get(entry.getKey()).add(writeStat.getFileId())); + } + return partitionFilesToDelete; + } + + private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType, HoodieTableType tableType, String commitTime, + List partitions, int filesPerPartition) { + for (String partition : partitions) { + Stream fileLengths = IntStream.range(0, filesPerPartition).map(i -> 100 + RANDOM.nextInt(500)).boxed(); + if (MERGE_ON_READ.equals(tableType) && UPSERT.equals(operationType)) { + List> fileVersionAndLength = fileLengths.map(len -> Pair.of(0, len)).collect(Collectors.toList()); + testTableState = testTableState.createTestTableStateForBaseAndLogFiles(commitTime, partition, fileVersionAndLength); + } else { + testTableState = testTableState.createTestTableStateForBaseFilesOnly(commitTime, partition, fileLengths.collect(Collectors.toList())); + } + } + return testTableState; + } + + private static List generateHoodieWriteStatForPartition(Map>> partitionToFileIdMap, + String commitTime, boolean bootstrap) { + List writeStats = new ArrayList<>(); + for (Map.Entry>> entry : partitionToFileIdMap.entrySet()) { + String partition = entry.getKey(); + for (Pair fileIdInfo : entry.getValue()) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + String fileName = bootstrap ? fileIdInfo.getKey() : + FileCreateUtils.baseFileName(commitTime, fileIdInfo.getKey()); + writeStat.setFileId(fileName); + writeStat.setPartitionPath(partition); + writeStat.setPath(partition + "/" + fileName); + writeStat.setTotalWriteBytes(fileIdInfo.getValue()); + writeStats.add(writeStat); + } + } + return writeStats; + } + + /** + * Returns the write stats for log files in the partition. Since log file has version associated with it, the {@param partitionToFileIdMap} + * contains list of Pair where the Integer[] array has both file version and file size. + */ + private static List generateHoodieWriteStatForPartitionLogFiles(Map>> partitionToFileIdMap, String commitTime, boolean bootstrap) { + List writeStats = new ArrayList<>(); + if (partitionToFileIdMap == null) { + return writeStats; + } + for (Map.Entry>> entry : partitionToFileIdMap.entrySet()) { + String partition = entry.getKey(); + for (Pair fileIdInfo : entry.getValue()) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + String fileName = bootstrap ? fileIdInfo.getKey() : + FileCreateUtils.logFileName(commitTime, fileIdInfo.getKey(), fileIdInfo.getValue()[0]); + writeStat.setFileId(fileName); + writeStat.setPartitionPath(partition); + writeStat.setPath(partition + "/" + fileName); + writeStat.setTotalWriteBytes(fileIdInfo.getValue()[1]); + writeStats.add(writeStat); + } + } + return writeStats; + } + public static class HoodieTestTableException extends RuntimeException { public HoodieTestTableException(Throwable t) { super(t); } } + + static class HoodieTestTableState { + /** + * Map>> + * Used in building CLEAN metadata. + */ + Map>> commitsToPartitionToFileIdForCleaner = new HashMap<>(); + /** + * Map>>> + * Used to build commit metadata for base files for several write operations. + */ + Map>>> commitsToPartitionToBaseFileInfoStats = new HashMap<>(); + /** + * Map>>> + * Used to build commit metadata for log files for several write operations. + */ + Map>>> commitsToPartitionToLogFileInfoStats = new HashMap<>(); + + HoodieTestTableState() { + } + + static HoodieTestTableState of() { + return new HoodieTestTableState(); + } + + HoodieTestTableState createTestTableStateForCleaner(String commitTime, String partitionPath, List filesToClean) { + if (!commitsToPartitionToFileIdForCleaner.containsKey(commitTime)) { + commitsToPartitionToFileIdForCleaner.put(commitTime, new HashMap<>()); + } + if (!this.commitsToPartitionToFileIdForCleaner.get(commitTime).containsKey(partitionPath)) { + this.commitsToPartitionToFileIdForCleaner.get(commitTime).put(partitionPath, new ArrayList<>()); + } + + this.commitsToPartitionToFileIdForCleaner.get(commitTime).get(partitionPath).addAll(filesToClean); + return this; + } + + Map> getPartitionToFileIdMapForCleaner(String commitTime) { + return this.commitsToPartitionToFileIdForCleaner.get(commitTime); + } + + HoodieTestTableState createTestTableStateForBaseFilesOnly(String commitTime, String partitionPath, List lengths) { + if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) { + commitsToPartitionToBaseFileInfoStats.put(commitTime, new HashMap<>()); + } + if (!this.commitsToPartitionToBaseFileInfoStats.get(commitTime).containsKey(partitionPath)) { + this.commitsToPartitionToBaseFileInfoStats.get(commitTime).put(partitionPath, new ArrayList<>()); + } + + List> fileInfos = new ArrayList<>(); + for (int length : lengths) { + fileInfos.add(Pair.of(UUID.randomUUID().toString(), length)); + } + this.commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).addAll(fileInfos); + return this; + } + + HoodieTestTableState createTestTableStateForBaseAndLogFiles(String commitTime, String partitionPath, List> versionsAndLengths) { + if (!commitsToPartitionToBaseFileInfoStats.containsKey(commitTime)) { + createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList())); + } + if (!this.commitsToPartitionToBaseFileInfoStats.get(commitTime).containsKey(partitionPath)) { + createTestTableStateForBaseFilesOnly(commitTime, partitionPath, versionsAndLengths.stream().map(Pair::getRight).collect(Collectors.toList())); + } + if (!commitsToPartitionToLogFileInfoStats.containsKey(commitTime)) { + commitsToPartitionToLogFileInfoStats.put(commitTime, new HashMap<>()); + } + if (!this.commitsToPartitionToLogFileInfoStats.get(commitTime).containsKey(partitionPath)) { + this.commitsToPartitionToLogFileInfoStats.get(commitTime).put(partitionPath, new ArrayList<>()); + } + + List> fileInfos = new ArrayList<>(); + for (int i = 0; i < versionsAndLengths.size(); i++) { + Pair versionAndLength = versionsAndLengths.get(i); + String fileId = FSUtils.getFileId(commitsToPartitionToBaseFileInfoStats.get(commitTime).get(partitionPath).get(i).getLeft()); + fileInfos.add(Pair.of(fileId, new Integer[] {versionAndLength.getLeft(), versionAndLength.getRight()})); + } + this.commitsToPartitionToLogFileInfoStats.get(commitTime).get(partitionPath).addAll(fileInfos); + return this; + } + + Map>> getPartitionToBaseFileInfoMap(String commitTime) { + return this.commitsToPartitionToBaseFileInfoStats.get(commitTime); + } + + Map>> getPartitionToLogFileInfoMap(String commitTime) { + return this.commitsToPartitionToLogFileInfoStats.get(commitTime); + } + } }