From 3a2ae16961131ed22e3b09877ff00cc42d47aa7d Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Thu, 20 Aug 2020 20:46:33 -0700 Subject: [PATCH] [HUDI-781] Introduce HoodieTestTable for test preparation (#1997) --- .../commands/TestUpgradeDowngradeCommand.java | 13 +- .../apache/hudi/io/HoodieAppendHandle.java | 1 + .../apache/hudi/io/HoodieCreateHandle.java | 1 + .../org/apache/hudi/io/HoodieMergeHandle.java | 1 + .../apache/hudi/io/HoodieRowCreateHandle.java | 1 + .../org/apache/hudi/io/HoodieWriteHandle.java | 3 +- .../org/apache/hudi/table/MarkerFiles.java | 17 +- .../rollback/MarkerBasedRollbackStrategy.java | 8 +- .../upgrade/ZeroToOneUpgradeHandler.java | 2 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 15 +- .../org/apache/hudi/table/TestCleaner.java | 393 ++++++------------ .../hudi/table/TestConsistencyGuard.java | 28 +- .../apache/hudi/table/TestMarkerFiles.java | 16 +- .../action/commit/TestUpsertPartitioner.java | 8 +- .../action/compact/TestHoodieCompactor.java | 7 +- .../TestMarkerBasedRollbackStrategy.java | 69 ++- .../hudi/testutils/HoodieClientTestUtils.java | 126 +----- .../org/apache/hudi/common/model}/IOType.java | 15 +- .../common/testutils/FileCreateUtils.java | 147 +++++++ .../common/testutils/HoodieTestTable.java | 232 +++++++++++ .../common/testutils/HoodieTestUtils.java | 86 ++-- 21 files changed, 665 insertions(+), 524 deletions(-) rename {hudi-client/src/main/java/org/apache/hudi/io => hudi-common/src/main/java/org/apache/hudi/common/model}/IOType.java (57%) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java index 4c0479a58..2334f12b4 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java @@ -21,13 +21,14 @@ package org.apache.hudi.cli.commands; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -77,11 +78,11 @@ public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest { // generate commit and marker files for inflight commit 101 for (String commitTime : Arrays.asList(commitTime2)) { HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); - HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1"); + FileCreateUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1", IOType.MERGE); HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); - HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2"); + FileCreateUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2", IOType.MERGE); HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); - HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3"); + FileCreateUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3", IOType.MERGE); } } @@ -98,7 +99,7 @@ public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest { // verify marker files for inflight commit exists for (String partitionPath : HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) { - assertEquals(1, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101")); + assertEquals(1, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE)); } SparkMain.upgradeOrDowngradeTable(jsc, tablePath, HoodieTableVersion.ZERO.name()); @@ -112,7 +113,7 @@ public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest { // verify marker files are non existant for (String partitionPath : HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) { - assertEquals(0, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101")); + assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE)); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 7a8e5abf3..7996a77f2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieDataBlock; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 705e98d94..5a76dc746 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index f0ea284ea..8d54065a0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java index 723d9f9ad..fa160c691 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.config.HoodieWriteConfig; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index d148b1b8a..5ea8c3802 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -33,13 +34,13 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.table.MarkerFiles; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java index 8a310fd30..9577ceab9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java @@ -18,26 +18,27 @@ package org.apache.hudi.table; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.IOType; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.io.Serializable; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java index 40b81a213..2a137b4e0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java @@ -18,10 +18,10 @@ package org.apache.hudi.table.action.rollback; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; @@ -29,19 +29,21 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; + +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import scala.Tuple2; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import scala.Tuple2; + /** * Performs rollback using marker files generated during the write.. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 4960ff527..e9c9e28bb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -21,13 +21,13 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 51d8a6a43..24e538e0b 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -47,7 +48,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.commit.WriteHelper; @@ -80,6 +80,7 @@ import java.util.UUID; import java.util.stream.Collectors; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0; +import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; @@ -506,8 +507,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // verify one basefile per partition String[] fullExpectedPartitionPaths = getFullPartitionPaths(expectedPartitionPathRecKeyPairs.stream().map(Pair::getLeft).toArray(String[]::new)); - Map baseFileCounts = getBaseFileCounts(fullExpectedPartitionPaths); - for (Map.Entry entry : baseFileCounts.entrySet()) { + Map baseFileCounts = getBaseFileCountsForPaths(basePath, fs, fullExpectedPartitionPaths); + for (Map.Entry entry : baseFileCounts.entrySet()) { assertEquals(1, entry.getValue()); } assertTrue(baseFileCounts.entrySet().stream().allMatch(entry -> entry.getValue() == 1)); @@ -532,9 +533,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // verify that there are more than 1 basefiles per partition // we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile. - baseFileCounts = getBaseFileCounts(fullPartitionPaths); + baseFileCounts = getBaseFileCountsForPaths(basePath, fs, fullPartitionPaths); assertTrue(baseFileCounts.entrySet().stream().filter(entry -> entry.getValue() > 1).count() >= 1, - "Atleast one partition should have more than 1 base file after 2nd batch of writes"); + "At least one partition should have more than 1 base file after 2nd batch of writes"); // Write 3 (upserts to records from batch 1 with diff partition path) newCommitTime = "003"; @@ -605,10 +606,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { return fullPartitionPaths; } - private Map getBaseFileCounts(String[] fullPartitionPaths) { - return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs, fullPartitionPaths); - } - private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set> expectedPartitionPathRecKeyPairs, List> actualPartitionPathRecKeyPairs) { // verify all partitionpath, record key matches diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index a0647882a..6dacc8169 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -50,8 +51,8 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigra import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler; import org.apache.hudi.common.table.view.TableFileSystemView; -import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; @@ -64,17 +65,21 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.action.clean.CleanPlanner; import org.apache.hudi.testutils.HoodieClientTestBase; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -500,137 +505,102 @@ public class TestCleaner extends HoodieClientTestBase { return new ArrayList<>(cleanStatMap.values()); } - /** - * Test HoodieTable.clean() Cleaning by versions for COW table. - */ - @Test - public void testKeepLatestFileVersions() throws IOException { - testKeepLatestFileVersions(false); - } - - /** - * Test HoodieTable.clean() Cleaning by version logic for COW table with Bootstrap source file clean enable. - */ - @Test - public void testBootstrapSourceFileCleanWithKeepLatestFileVersions() throws IOException { - testKeepLatestFileVersions(true); - } - /** * Test HoodieTable.clean() Cleaning by versions logic. */ - public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws IOException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + String p1 = "2020/01/02"; + Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null; // make 1 commit, with 1 file per partition - HoodieTestUtils.createCommitFiles(basePath, "00000000000001"); - - Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null; - - String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId() + String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() : UUID.randomUUID().toString(); - String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId() + String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() : UUID.randomUUID().toString(); - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert + testTable.addCommit("00000000000001").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0); List hoodieCleanStatsOne = runCleaner(config); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", - file1P1C0)); + assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0)); // make next commit, with 1 insert & 1 update per partition - HoodieTestUtils.createCommitFiles(basePath, "00000000000002"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - String file2P0C1 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert - String file2P1C1 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update + Map partitionAndFileId002 = testTable.addCommit("00000000000002") + .withUpdates(p0, file1P0C0) + .withUpdates(p1, file1P1C0) + .withInserts(p0, p1); List hoodieCleanStatsTwo = runCleaner(config); // enableBootstrapSourceClean would delete the bootstrap base file as the same time - HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0); assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); if (enableBootstrapSourceClean) { HoodieFileStatus fstatus = - bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus(); + bootstrapMapping.get(p0).get(0).getBoostrapFileStatus(); // This ensures full path is recorded in metadata. assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + " but did not contain " + fstatus.getPath().getUri()); - assertFalse(new File(bootstrapMapping.get( - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists()); + assertFalse(Files.exists(Paths.get(bootstrapMapping.get( + p0).get(0).getBoostrapFileStatus().getPath().getUri()))); } - cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH); + cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); + String file2P0C1 = partitionAndFileId002.get(p0); + String file2P1C1 = partitionAndFileId002.get(p1); + assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1)); + assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0)); + assertFalse(testTable.fileExists(p1, "00000000000001", file1P1C0)); assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", - file2P1C1)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", - file1P0C0)); if (enableBootstrapSourceClean) { HoodieFileStatus fstatus = - bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus(); + bootstrapMapping.get(p1).get(0).getBoostrapFileStatus(); // This ensures full path is recorded in metadata. assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() + " but did not contain " + fstatus.getPath().getUri()); - assertFalse(new File(bootstrapMapping.get( - HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists()); + assertFalse(Files.exists(Paths.get(bootstrapMapping.get( + p1).get(0).getBoostrapFileStatus().getPath().getUri()))); } - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, - "00000000000001", file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createCommitFiles(basePath, "00000000000003"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update - String file3P0C2 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003"); - + String file3P0C2 = testTable.addCommit("00000000000003") + .withUpdates(p0, file1P0C0, file2P0C1) + .withInserts(p0, "00000000000003").get(p0); List hoodieCleanStatsThree = runCleaner(config); assertEquals(2, - getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + getCleanStat(hoodieCleanStatsThree, p0) .getSuccessDeleteFiles().size(), "Must clean two files"); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", - file1P0C0)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", - file3P0C2)); + assertFalse(testTable.fileExists(p0, "00000000000002", file1P0C0)); + assertFalse(testTable.fileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2)); // No cleaning on partially written file, with no commit. - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file3P0C2); // update + testTable.forCommit("00000000000004").withUpdates(p0, file3P0C2); List hoodieCleanStatsFour = runCleaner(config); assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", - file3P0C2)); + assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2)); } /** * Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files. */ @Test - public void testKeepLatestFileVersionsMOR() throws IOException { + public void testKeepLatestFileVersionsMOR() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) @@ -638,35 +608,29 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); - HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; // Make 3 files, one base file and 2 log files associated with base file - String file1P0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); - String file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.empty()); - HoodieTestUtils.createNewLogFile(fs, basePath, - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.of(2)); - // make 1 compaction commit - HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000"); + String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0); + testTable.forDeltaCommit("000") + .withLogFile(p0, file1P0, 1) + .withLogFile(p0, file1P0, 2); - // Make 4 files, one base file and 3 log files associated with base file - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0); - file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, - "001", file1P0, Option.of(3)); - // make 1 compaction commit - HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001"); + // Make 2 files, one base file and 1 log files associated with base file + testTable.addDeltaCommit("001") + .withUpdates(p0, file1P0) + .withLogFile(p0, file1P0, 3); List hoodieCleanStats = runCleaner(config); assertEquals(3, - getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() .size(), "Must clean three files, one parquet and 2 log files"); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file1P0)); - assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file2P0L0, Option.empty())); - assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file2P0L0, Option.of(2))); + assertFalse(testTable.fileExists(p0, "000", file1P0)); + assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); + assertTrue(testTable.fileExists(p0, "001", file1P0)); + assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); } @Test @@ -835,43 +799,21 @@ public class TestCleaner extends HoodieClientTestBase { } } - /** - * Test HoodieTable.clean() Cleaning by commit logic for COW table. - */ - @Test - public void testKeepLatestCommits() throws IOException { - testKeepLatestCommits(false, false, false); - } - - /** - * Test HoodieTable.clean() Cleaning by commit logic for COW table. Here the operations are simulated - * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. - */ - @Test - public void testKeepLatestCommitsWithFailureRetry() throws IOException { - testKeepLatestCommits(true, false, false); + private static Stream argumentsForTestKeepLatestCommits() { + return Stream.of( + Arguments.of(false, false, false), + Arguments.of(true, false, false), + Arguments.of(false, true, false), + Arguments.of(false, false, true) + ); } /** * Test HoodieTable.clean() Cleaning by commit logic for COW table. */ - @Test - public void testKeepLatestCommitsIncrMode() throws IOException { - testKeepLatestCommits(false, true, false); - } - - /** - * Test HoodieTable.clean() Cleaning by commit logic for COW table with Bootstrap source file clean enable. - */ - @Test - public void testBootstrapSourceFileCleanWithKeepLatestCommits() throws IOException { - testKeepLatestCommits(false, false, true); - } - - /** - * Test HoodieTable.clean() Cleaning by commit logic for COW table. - */ - private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws IOException { + @ParameterizedTest + @MethodSource("argumentsForTestKeepLatestCommits") + public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withIncrementalCleaningMode(enableIncrementalClean) @@ -879,25 +821,23 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .build(); - Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null; + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + String p1 = "2020/01/02"; + Map> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null; // make 1 commit, with 1 file per partition - HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000001"); - - String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId() + String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId() : UUID.randomUUID().toString(); - String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId() + String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() : UUID.randomUUID().toString(); - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert + testTable.addInflightCommit("00000000000001").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0); HoodieCommitMetadata commitMetadata = generateCommitMetadata( Collections.unmodifiableMap(new HashMap>() { { - put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0)); - put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0)); + put(p0, CollectionUtils.createImmutableList(file1P0C0)); + put(p1, CollectionUtils.createImmutableList(file1P1C0)); } }) ); @@ -909,29 +849,18 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", - file1P1C0)); + assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0)); // make next commit, with 1 insert & 1 update per partition - HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000002"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - String file2P0C1 = - HoodieTestUtils - .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert - String file2P1C1 = - HoodieTestUtils - .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update + Map partitionAndFileId002 = testTable.addInflightCommit("00000000000002").withInserts(p0, p1); + String file2P0C1 = partitionAndFileId002.get(p0); + String file2P1C1 = partitionAndFileId002.get(p1); + testTable.forCommit("00000000000002").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0); commitMetadata = generateCommitMetadata(new HashMap>() { { - put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); - put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); + put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); + put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); } }); metaClient.getActiveTimeline().saveAsComplete( @@ -939,28 +868,18 @@ public class TestCleaner extends HoodieClientTestBase { Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", - file2P1C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", - file1P1C0)); + assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1)); + assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000003"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update - String file3P0C2 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003"); - + String file3P0C2 = testTable.addInflightCommit("00000000000003") + .withUpdates(p0, file1P0C0) + .withUpdates(p0, file2P0C1) + .withInserts(p0).get(p0); commitMetadata = generateCommitMetadata(CollectionUtils - .createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + .createImmutableMap(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"), @@ -969,57 +888,41 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsThree.size(), "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", - file1P0C0)); + assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000004"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file1P0C0); // update - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file2P0C1); // update - String file4P0C3 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004"); + String file4P0C3 = testTable.addInflightCommit("00000000000004") + .withUpdates(p0, file1P0C0) + .withUpdates(p0, file2P0C1) + .withInserts(p0).get(p0); commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap( - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000004"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); // enableBootstrapSourceClean would delete the bootstrap base file as the same time - HoodieCleanStat partitionCleanStat = - getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, p0); assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size() + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file"); - - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", - file1P0C0)); + assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0)); + assertTrue(testTable.fileExists(p0, "00000000000003", file1P0C0)); + assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.fileExists(p0, "00000000000003", file2P0C1)); + assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2)); + assertTrue(testTable.fileExists(p0, "00000000000004", file4P0C3)); if (enableBootstrapSourceClean) { - assertFalse(new File(bootstrapMapping.get( - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists()); + assertFalse(Files.exists(Paths.get(bootstrapMapping.get( + p0).get(0).getBoostrapFileStatus().getPath().getUri()))); } - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", - file3P0C2)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", - file4P0C3)); // No cleaning on partially written file, with no commit. - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000005", file3P0C2); // update - commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + testTable.forCommit("00000000000005").withUpdates(p0, file3P0C2); + commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0, CollectionUtils.createImmutableList(file3P0C2))); metaClient.getActiveTimeline().createNewInstant( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005")); @@ -1027,13 +930,11 @@ public class TestCleaner extends HoodieClientTestBase { new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); - HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - assertEquals(0, - cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", - file2P0C1)); + HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0); + assertNull(cleanStat, "Must not clean any files"); + assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0)); + assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.fileExists(p0, "00000000000005", file3P0C2)); } /** @@ -1041,7 +942,7 @@ public class TestCleaner extends HoodieClientTestBase { * @return Partition to BootstrapFileMapping Map * @throws IOException */ - private Map> generateBootstrapIndexAndSourceData() throws IOException { + private Map> generateBootstrapIndexAndSourceData(String... partitions) throws IOException { // create bootstrap source data path java.nio.file.Path sourcePath = tempDir.resolve("data"); java.nio.file.Files.createDirectories(sourcePath); @@ -1052,7 +953,7 @@ public class TestCleaner extends HoodieClientTestBase { // generate bootstrap index Map> bootstrapMapping = TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(), - new String[] {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH}, 1); + partitions, 1); for (Map.Entry> entry : bootstrapMapping.entrySet()) { new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs(); @@ -1065,28 +966,29 @@ public class TestCleaner extends HoodieClientTestBase { * Test Cleaning functionality of table.rollback() API. */ @Test - public void testCleanMarkerDataFilesOnRollback() throws IOException { - List markerFiles = createMarkerFiles("000", 10); - assertEquals(10, markerFiles.size(), "Some marker files are created."); - assertEquals(markerFiles.size(), getTotalTempFiles(), "Some marker files are created."); + public void testCleanMarkerDataFilesOnRollback() throws Exception { + HoodieTestTable testTable = HoodieTestTable.of(metaClient) + .addRequestedCommit("000") + .withMarkerFiles("default", 10, IOType.MERGE); + final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size(); + assertEquals(10, numTempFilesBefore, "Some marker files are created."); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); - table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, - HoodieTimeline.COMMIT_ACTION, "000")); table.getActiveTimeline().transitionRequestedToInflight( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); metaClient.reloadActiveTimeline(); table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true); - assertEquals(0, getTotalTempFiles(), "All temp files are deleted."); + final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size(); + assertEquals(0, numTempFilesAfter, "All temp files are deleted."); } /** * Test CLeaner Stat when there are no partition paths. */ @Test - public void testCleaningWithZeroPartitionPaths() throws IOException { + public void testCleaningWithZeroPartitionPaths() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) @@ -1095,7 +997,7 @@ public class TestCleaner extends HoodieClientTestBase { // Make a commit, although there are no partitionPaths. // Example use-case of this is when a client wants to create a table // with just some commit metadata, but no data/partitionPaths. - HoodieTestUtils.createCommitFiles(basePath, "000"); + HoodieTestTable.of(metaClient).addCommit("000"); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -1294,33 +1196,6 @@ public class TestCleaner extends HoodieClientTestBase { "Correct number of files under compaction deleted"); } - /** - * Utility method to create temporary data files. - * - * @param instantTime Commit Timestamp - * @param numFiles Number for files to be generated - * @return generated files - * @throws IOException in case of error - */ - private List createMarkerFiles(String instantTime, int numFiles) throws IOException { - List files = new ArrayList<>(); - for (int i = 0; i < numFiles; i++) { - files.add(HoodieClientTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime)); - } - return files; - } - - /*** - * Helper method to return temporary files count. - * - * @return Number of temporary files found - * @throws IOException in case of error - */ - private int getTotalTempFiles() throws IOException { - return FileSystemTestUtils.listRecursive(fs, new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)) - .size(); - } - private Stream> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient, List paths) { Predicate roFilePredicate = diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java index da4224a14..1f638c39f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java @@ -22,8 +22,8 @@ import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; import org.apache.hudi.common.fs.OptimisticConsistencyGuard; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; @@ -66,9 +66,9 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @ParameterizedTest @MethodSource("consistencyGuardType") public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2"); - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3"); ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000); ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName()) @@ -88,7 +88,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearFailSafe() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays @@ -98,7 +98,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearTimedWait() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -106,7 +106,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearsFailSafe() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); @@ -115,14 +115,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearsTimedWait() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); } @Test public void testCheckFailingDisappearFailSafe() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays @@ -132,7 +132,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearTimedWait() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -140,8 +140,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearsFailSafe() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); @@ -150,8 +150,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearsTimedWait() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java index af679cee8..55b7b50af 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java @@ -18,17 +18,17 @@ package org.apache.hudi.table; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.testutils.FileSystemTestUtils; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.testutils.HoodieClientTestUtils; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.testutils.FileSystemTestUtils; -import org.apache.hudi.common.testutils.HoodieCommonTestHarness; - -import org.apache.hudi.common.util.CollectionUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.IOType; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index d8bb946da..f49d6d5c4 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; @@ -34,7 +35,6 @@ import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.testutils.HoodieClientTestBase; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.avro.Schema; import org.apache.log4j.LogManager; @@ -73,8 +73,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); - HoodieClientTestUtils.fakeCommit(basePath, "001"); - HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize); + FileCreateUtils.createCommit(basePath, "001"); + FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); @@ -193,7 +193,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0) .insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build(); - HoodieClientTestUtils.fakeCommit(basePath, "001"); + FileCreateUtils.createCommit(basePath, "001"); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 1529d7993..c044beea6 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -53,6 +53,9 @@ import org.junit.jupiter.api.Test; import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -166,7 +169,9 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file"); } } - HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime); + createDeltaCommit(basePath, newCommitTime); + createRequestedDeltaCommit(basePath, newCommitTime); + createInflightDeltaCommit(basePath, newCommitTime); // Do a compaction table = HoodieTable.create(config, hadoopConf); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java index c6652ed79..83e7ea082 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java @@ -20,16 +20,14 @@ package org.apache.hudi.table.action.rollback; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.testutils.FileSystemTestUtils; -import org.apache.hudi.io.IOType; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestBase; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -55,38 +53,20 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { cleanupResources(); } - private void givenCommit0(boolean isDeltaCommit) throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2"); - if (isDeltaCommit) { - HoodieClientTestUtils.fakeDeltaCommit(basePath, "000"); - } else { - HoodieClientTestUtils.fakeCommit(basePath, "000"); - } - } - - private void givenInflightCommit1(boolean isDeltaCommit) throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1"); - HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1", IOType.CREATE); - - HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3", IOType.CREATE); - - if (isDeltaCommit) { - HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0); - HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.APPEND); - HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4", IOType.APPEND); - HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001"); - } else { - HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2"); - HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.MERGE); - HoodieClientTestUtils.fakeInFlightCommit(basePath, "001"); - } - } - @Test public void testCopyOnWriteRollback() throws Exception { // given: wrote some base files and corresponding markers - givenCommit0(false); - givenInflightCommit1(false); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String f0 = testTable.addRequestedCommit("000") + .withInserts("partA").get("partA"); + String f1 = testTable.addCommit("001") + .withUpdates("partA", f0) + .withInserts("partB").get("partB"); + String f2 = "f2"; + testTable.forCommit("001") + .withMarkerFile("partA", f0, IOType.MERGE) + .withMarkerFile("partB", f1, IOType.CREATE) + .withMarkerFile("partA", f2, IOType.CREATE); // when List stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002") @@ -95,8 +75,8 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // then: ensure files are deleted correctly, non-existent files reported as failed deletes assertEquals(2, stats.size()); - List partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA")); - List partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB")); + List partAFiles = testTable.listAllFiles("partA"); + List partBFiles = testTable.listAllFiles("partB"); assertEquals(0, partBFiles.size()); assertEquals(1, partAFiles.size()); @@ -107,8 +87,19 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { @Test public void testMergeOnReadRollback() throws Exception { // given: wrote some base + log files and corresponding markers - givenCommit0(true); - givenInflightCommit1(true); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String f2 = testTable.addRequestedDeltaCommit("000") + .withInserts("partA").get("partA"); + String f1 = testTable.addDeltaCommit("001") + .withLogFile("partA", f2) + .withInserts("partB").get("partB"); + String f3 = "f3"; + String f4 = "f4"; + testTable.forDeltaCommit("001") + .withMarkerFile("partB", f1, IOType.CREATE) + .withMarkerFile("partA", f3, IOType.CREATE) + .withMarkerFile("partA", f2, IOType.APPEND) + .withMarkerFile("partB", f4, IOType.APPEND); // when List stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002") @@ -117,12 +108,12 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // then: ensure files are deleted, rollback block is appended (even if append does not exist) assertEquals(2, stats.size()); // will have the log file - List partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB")); + List partBFiles = testTable.listAllFiles("partB"); assertEquals(1, partBFiles.size()); assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())); assertTrue(partBFiles.get(0).getLen() > 0); - List partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA")); + List partAFiles = testTable.listAllFiles("partA"); assertEquals(3, partAFiles.size()); assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count()); assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count()); diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 4aaf58514..353b34c10 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.IOType; import org.apache.hudi.io.storage.HoodieAvroParquetConfig; import org.apache.hudi.io.storage.HoodieParquetWriter; @@ -57,14 +56,11 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; @@ -74,57 +70,6 @@ import java.util.stream.Collectors; public class HoodieClientTestUtils { private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class); - public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; - - private static void fakeMetaFile(String basePath, String instantTime, String suffix) throws IOException { - String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME; - new File(parentPath).mkdirs(); - new File(parentPath + "/" + instantTime + suffix).createNewFile(); - } - - public static void fakeCommit(String basePath, String instantTime) throws IOException { - fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); - } - - public static void fakeDeltaCommit(String basePath, String instantTime) throws IOException { - fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION); - } - - public static void fakeInflightDeltaCommit(String basePath, String instantTime) throws IOException { - fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); - } - - public static void fakeInFlightCommit(String basePath, String instantTime) throws IOException { - fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION); - } - - public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId) - throws Exception { - fakeDataFile(basePath, partitionPath, instantTime, fileId, 0); - } - - public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length) - throws Exception { - String parentPath = String.format("%s/%s", basePath, partitionPath); - new File(parentPath).mkdirs(); - String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)); - new File(path).createNewFile(); - new RandomAccessFile(path, "rw").setLength(length); - } - - public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version) - throws Exception { - fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0); - } - - public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length) - throws Exception { - String parentPath = String.format("%s/%s", basePath, partitionPath); - new File(parentPath).mkdirs(); - String path = String.format("%s/%s", parentPath, FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1")); - new File(path).createNewFile(); - new RandomAccessFile(path, "rw").setLength(length); - } /** * Returns a Spark config for this test. @@ -153,8 +98,8 @@ public class HoodieClientTestUtils { return HoodieReadClient.addHoodieSupport(sparkConf); } - public static HashMap getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, - List commitsToReturn) throws IOException { + private static HashMap getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, + List commitsToReturn) throws IOException { HashMap fileIdToFullPath = new HashMap<>(); for (HoodieInstant commit : commitsToReturn) { HoodieCommitMetadata metadata = @@ -226,25 +171,8 @@ public class HoodieClientTestUtils { } /** - * Find total basefiles for passed in paths. + * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. */ - public static Map getBaseFileCountForPaths(String basePath, FileSystem fs, - String... paths) { - Map toReturn = new HashMap<>(); - try { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); - for (String path : paths) { - BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, - metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path))); - List latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList()); - toReturn.put(path, latestFiles.size()); - } - return toReturn; - } catch (Exception e) { - throw new HoodieException("Error reading hoodie table as a dataframe", e); - } - } - public static String writeParquetFile(String basePath, String partitionPath, String filename, List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { @@ -278,6 +206,9 @@ public class HoodieClientTestUtils { return filename; } + /** + * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. + */ public static String writeParquetFile(String basePath, String partitionPath, List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException { Thread.sleep(1000); @@ -289,49 +220,4 @@ public class HoodieClientTestUtils { createCommitTime); } - public static String createNewMarkerFile(String basePath, String partitionPath, String instantTime) - throws IOException { - return createMarkerFile(basePath, partitionPath, instantTime); - } - - public static String createMarkerFile(String basePath, String partitionPath, String instantTime) - throws IOException { - return createMarkerFile(basePath, partitionPath, instantTime, UUID.randomUUID().toString(), IOType.MERGE); - } - - public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType) - throws IOException { - String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + instantTime + "/" + partitionPath + "/"; - new File(folderPath).mkdirs(); - String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, DEFAULT_WRITE_TOKEN, instantTime, - HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType); - File f = new File(folderPath + markerFileName); - f.createNewFile(); - return f.getAbsolutePath(); - } - - public static void createMarkerFile(String basePath, String partitionPath, String instantTime, String dataFileName) throws IOException { - createTempFolderForMarkerFiles(basePath); - String folderPath = getTempFolderName(basePath); - // create dir for this instant - new File(folderPath + "/" + instantTime + "/" + partitionPath).mkdirs(); - new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + dataFileName + ".marker.MERGE").createNewFile(); - } - - public static int getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime) { - String folderPath = getTempFolderName(basePath); - File markerDir = new File(folderPath + "/" + instantTime + "/" + partitionPath); - if (markerDir.exists()) { - return markerDir.listFiles((dir, name) -> name.contains(".marker.MERGE")).length; - } - return 0; - } - - public static void createTempFolderForMarkerFiles(String basePath) { - new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs(); - } - - public static String getTempFolderName(String basePath) { - return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME; - } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/IOType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java similarity index 57% rename from hudi-client/src/main/java/org/apache/hudi/io/IOType.java rename to hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java index aa6660e72..bd29ff091 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/IOType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io; +package org.apache.hudi.common.model; /** * Types of lower level I/O operations done on each file slice. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java new file mode 100644 index 000000000..987b5679d --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.exception.HoodieException; + +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +public class FileCreateUtils { + + private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException { + Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + Files.createDirectories(parentPath); + Path metaFilePath = parentPath.resolve(instantTime + suffix); + if (Files.notExists(metaFilePath)) { + Files.createFile(metaFilePath); + } + } + + public static void createCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); + } + + public static void createRequestedCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMMIT_EXTENSION); + } + + public static void createInflightCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION); + } + + public static void createDeltaCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION); + } + + public static void createRequestedDeltaCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION); + } + + public static void createInflightDeltaCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); + } + + public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId) + throws Exception { + createDataFile(basePath, partitionPath, instantTime, fileId, 0); + } + + public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length) + throws Exception { + Path parentPath = Paths.get(basePath, partitionPath); + Files.createDirectories(parentPath); + Path dataFilePath = parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)); + if (Files.notExists(dataFilePath)) { + Files.createFile(dataFilePath); + } + new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length); + } + + public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version) + throws Exception { + createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0); + } + + public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length) + throws Exception { + Path parentPath = Paths.get(basePath, partitionPath); + Files.createDirectories(parentPath); + Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1")); + if (Files.notExists(logFilePath)) { + Files.createFile(logFilePath); + } + new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length); + } + + public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType) + throws IOException { + Path folderPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); + Files.createDirectories(folderPath); + String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", instantTime, + HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType); + Path markerFilePath = folderPath.resolve(markerFileName); + if (Files.notExists(markerFilePath)) { + Files.createFile(markerFilePath); + } + return markerFilePath.toAbsolutePath().toString(); + } + + public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException { + Path markerDir = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); + if (Files.notExists(markerDir)) { + return 0; + } + return Files.list(markerDir).filter(p -> p.getFileName().toString() + .endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, ioType))).count(); + } + + /** + * Find total basefiles for passed in paths. + */ + public static Map getBaseFileCountsForPaths(String basePath, FileSystem fs, String... paths) { + Map toReturn = new HashMap<>(); + try { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); + for (String path : paths) { + TableFileSystemView.BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new org.apache.hadoop.fs.Path(path))); + toReturn.put(path, fileSystemView.getLatestBaseFiles().count()); + } + return toReturn; + } catch (Exception e) { + throw new HoodieException("Error reading hoodie table as a dataframe", e); + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java new file mode 100644 index 000000000..32f2d4580 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.IntStream; + +import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile; +import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; + +public class HoodieTestTable { + + private final String basePath; + private final FileSystem fs; + private HoodieTableMetaClient metaClient; + private String currentInstantTime; + + private HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) { + ValidationUtils.checkArgument(Objects.equals(basePath, metaClient.getBasePath())); + ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs())); + this.basePath = basePath; + this.fs = fs; + this.metaClient = metaClient; + } + + public static HoodieTestTable of(HoodieTableMetaClient metaClient) { + return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient); + } + + public HoodieTestTable addRequestedCommit(String instantTime) throws Exception { + createRequestedCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws Exception { + createRequestedDeltaCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addInflightCommit(String instantTime) throws Exception { + createRequestedCommit(basePath, instantTime); + createInflightCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addInflightDeltaCommit(String instantTime) throws Exception { + createRequestedDeltaCommit(basePath, instantTime); + createInflightDeltaCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addCommit(String instantTime) throws Exception { + createRequestedCommit(basePath, instantTime); + createInflightCommit(basePath, instantTime); + createCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addDeltaCommit(String instantTime) throws Exception { + createRequestedDeltaCommit(basePath, instantTime); + createInflightDeltaCommit(basePath, instantTime); + createDeltaCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable forCommit(String instantTime) { + currentInstantTime = instantTime; + return this; + } + + public HoodieTestTable forDeltaCommit(String instantTime) { + currentInstantTime = instantTime; + return this; + } + + public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException { + return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType); + } + + public HoodieTestTable withMarkerFile(String partitionPath, String fileId, IOType ioType) throws IOException { + createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType); + return this; + } + + public HoodieTestTable withMarkerFiles(String partitionPath, int num, IOType ioType) throws IOException { + String[] fileIds = IntStream.range(0, num).mapToObj(i -> UUID.randomUUID().toString()).toArray(String[]::new); + return withMarkerFiles(partitionPath, fileIds, ioType); + } + + public HoodieTestTable withMarkerFiles(String partitionPath, String[] fileIds, IOType ioType) throws IOException { + for (String fileId : fileIds) { + createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType); + } + return this; + } + + /** + * Insert one base file to each of the given distinct partitions. + * + * @return A {@link Map} of partition and its newly inserted file's id. + */ + public Map withInserts(String... partitions) throws Exception { + Map partitionFileIdMap = new HashMap<>(); + for (String p : partitions) { + String fileId = UUID.randomUUID().toString(); + FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId); + partitionFileIdMap.put(p, fileId); + } + return partitionFileIdMap; + } + + public HoodieTestTable withUpdates(String partition, String... fileIds) throws Exception { + for (String f : fileIds) { + FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, f); + } + return this; + } + + public String withLogFile(String partitionPath) throws Exception { + String fileId = UUID.randomUUID().toString(); + withLogFile(partitionPath, fileId); + return fileId; + } + + public HoodieTestTable withLogFile(String partitionPath, String fileId) throws Exception { + return withLogFile(partitionPath, fileId, 0); + } + + public HoodieTestTable withLogFile(String partitionPath, String fileId, int version) throws Exception { + FileCreateUtils.createLogFile(basePath, partitionPath, currentInstantTime, fileId, version); + return this; + } + + public boolean filesExist(Map partitionAndFileId, String instantTime) { + return partitionAndFileId.entrySet().stream().allMatch(entry -> { + String partition = entry.getKey(); + String fileId = entry.getValue(); + return fileExists(partition, instantTime, fileId); + }); + } + + public boolean filesExist(String partition, String instantTime, String... fileIds) { + return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, instantTime, f)); + } + + public boolean fileExists(String partition, String instantTime, String fileId) { + try { + return fs.exists(new Path(Paths.get(basePath, partition, + FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString())); + } catch (IOException e) { + throw new HoodieTestTableException(e); + } + } + + public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) { + return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v)); + } + + public boolean logFileExists(String partition, String instantTime, String fileId, int version) { + try { + return fs.exists(new Path(Paths.get(basePath, partition, + FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, "1-0-1")).toString())); + } catch (IOException e) { + throw new HoodieTestTableException(e); + } + } + + public List listAllFiles(String partitionPath) throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())); + } + + public List listAllFilesInTempFolder() throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())); + } + + public static class HoodieTestTableException extends RuntimeException { + public HoodieTestTableException(Throwable t) { + super(t); + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index c88ea5129..783bda4fc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -100,7 +100,6 @@ import static org.junit.jupiter.api.Assertions.fail; */ public class HoodieTestUtils { - public static final String TEST_EXTENSION = ".test"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; public static final int DEFAULT_LOG_VERSION = 1; @@ -167,6 +166,9 @@ public class HoodieTestUtils { return COMMIT_FORMATTER.format(new Date()); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static void createCommitFiles(String basePath, String... instantTimes) throws IOException { for (String instantTime : instantTimes) { new File( @@ -177,20 +179,6 @@ public class HoodieTestUtils { + HoodieTimeline.makeInflightCommitFileName(instantTime)).createNewFile(); new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime)) - .createNewFile(); - } - } - - public static void createDeltaCommitFiles(String basePath, String... instantTimes) throws IOException { - for (String instantTime : instantTimes) { - new File( - basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile(); - new File( - basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile(); - new File( - basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(instantTime)) .createNewFile(); } } @@ -199,6 +187,9 @@ public class HoodieTestUtils { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static void createInflightCommitFiles(String basePath, String... instantTimes) throws IOException { for (String instantTime : instantTimes) { @@ -212,11 +203,12 @@ public class HoodieTestUtils { public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) { for (String instantTime : instantTimes) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime), - HoodieTimeline.makeInflightCleanerFileName(instantTime)).forEach(f -> { + HoodieTimeline.makeInflightCleanerFileName(instantTime)) + .forEach(f -> { FSDataOutputStream os = null; try { - Path commitFile = new Path( - metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); + Path commitFile = new Path(Paths + .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); os = metaClient.getFs().create(commitFile, true); // Write empty clean metadata os.write(TimelineMetadataUtils.serializeCleanerPlan( @@ -239,11 +231,12 @@ public class HoodieTestUtils { public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime), - HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> { + HoodieTimeline.makeInflightCleanerFileName(commitTime)) + .forEach(f -> { FSDataOutputStream os = null; try { - Path commitFile = new Path( - metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); + Path commitFile = new Path(Paths + .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); os = metaClient.getFs().create(commitFile, true); // Write empty clean metadata os.write(new byte[0]); @@ -261,18 +254,18 @@ public class HoodieTestUtils { }); } - public static String createNewDataFile(String basePath, String partitionPath, String instantTime) - throws IOException { - String fileID = UUID.randomUUID().toString(); - return createDataFile(basePath, partitionPath, instantTime, fileID); - } - + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String createNewDataFile(String basePath, String partitionPath, String instantTime, long length) throws IOException { String fileID = UUID.randomUUID().toString(); return createDataFileFixLength(basePath, partitionPath, instantTime, fileID, length); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String createDataFile(String basePath, String partitionPath, String instantTime, String fileID) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; @@ -281,7 +274,7 @@ public class HoodieTestUtils { return fileID; } - public static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID, + private static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID, long length) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; Files.createDirectories(Paths.get(folderPath)); @@ -293,6 +286,9 @@ public class HoodieTestUtils { return fileID; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String instantTime, String fileID, Option version) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; @@ -309,17 +305,6 @@ public class HoodieTestUtils { return fileID; } - public static void createCompactionCommitFiles(FileSystem fs, String basePath, String... instantTimes) - throws IOException { - for (String instantTime : instantTimes) { - boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + HoodieTimeline.makeCommitFileName(instantTime))); - if (!createFile) { - throw new IOException("cannot create commit file for commit " + instantTime); - } - } - } - public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, List> fileSliceList) throws IOException { HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty()); @@ -328,10 +313,16 @@ public class HoodieTestUtils { TimelineMetadataUtils.serializeCompactionPlan(plan)); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String getDataFilePath(String basePath, String partitionPath, String instantTime, String fileID) { return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String getLogFilePath(String basePath, String partitionPath, String instantTime, String fileID, Option version) { return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", instantTime, @@ -342,32 +333,39 @@ public class HoodieTestUtils { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String getInflightCommitFilePath(String basePath, String instantTime) { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String getRequestedCompactionFilePath(String basePath, String instantTime) { return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static boolean doesDataFileExist(String basePath, String partitionPath, String instantTime, String fileID) { return new File(getDataFilePath(basePath, partitionPath, instantTime, fileID)).exists(); } - public static boolean doesLogFileExist(String basePath, String partitionPath, String instantTime, String fileID, - Option version) { - return new File(getLogFilePath(basePath, partitionPath, instantTime, fileID, version)).exists(); - } - public static boolean doesCommitExist(String basePath, String instantTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION) .exists(); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static boolean doesInflightExist(String basePath, String instantTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_EXTENSION)