From b2e703d4427abca02b053fa4444cd5058aa256ef Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Mon, 10 Aug 2020 18:44:03 -0700 Subject: [PATCH] [HUDI-781] Introduce HoodieTestTable for test preparation (#1871) --- .../apache/hudi/io/HoodieAppendHandle.java | 1 + .../apache/hudi/io/HoodieCreateHandle.java | 1 + .../org/apache/hudi/io/HoodieMergeHandle.java | 1 + .../org/apache/hudi/io/HoodieWriteHandle.java | 3 +- .../org/apache/hudi/table/MarkerFiles.java | 17 +- .../rollback/MarkerBasedRollbackStrategy.java | 8 +- .../upgrade/ZeroToOneUpgradeHandler.java | 2 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 2 +- .../org/apache/hudi/table/TestCleaner.java | 391 +++++++----------- .../hudi/table/TestConsistencyGuard.java | 28 +- .../apache/hudi/table/TestMarkerFiles.java | 16 +- .../action/commit/TestUpsertPartitioner.java | 8 +- .../action/compact/TestHoodieCompactor.java | 7 +- .../TestMarkerBasedRollbackStrategy.java | 69 ++-- .../hudi/testutils/HoodieClientTestUtils.java | 99 +---- .../org/apache/hudi/common/model}/IOType.java | 15 +- .../common/testutils/FileCreateUtils.java | 113 +++++ .../common/testutils/HoodieTestTable.java | 232 +++++++++++ .../common/testutils/HoodieTestUtils.java | 102 +++-- 19 files changed, 645 insertions(+), 470 deletions(-) rename {hudi-client/src/main/java/org/apache/hudi/io => hudi-common/src/main/java/org/apache/hudi/common/model}/IOType.java (57%) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 7a8e5abf3..7996a77f2 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieDataBlock; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 705e98d94..5a76dc746 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index f0ea284ea..8d54065a0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index d148b1b8a..5ea8c3802 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -33,13 +34,13 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.table.MarkerFiles; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java index 8a310fd30..9577ceab9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java @@ -18,26 +18,27 @@ package org.apache.hudi.table; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.IOType; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.io.Serializable; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java index 40b81a213..2a137b4e0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java @@ -18,10 +18,10 @@ package org.apache.hudi.table.action.rollback; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; @@ -29,19 +29,21 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; + +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import scala.Tuple2; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import scala.Tuple2; + /** * Performs rollback using marker files generated during the write.. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 4960ff527..e9c9e28bb 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -21,13 +21,13 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; -import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 51d8a6a43..1c4988a52 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -47,7 +48,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.io.IOType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.commit.WriteHelper; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 0376ec36a..55fbab2ec 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -43,8 +44,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; import org.apache.hudi.common.table.view.TableFileSystemView; -import org.apache.hudi.common.testutils.FileSystemTestUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; @@ -56,13 +57,16 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.testutils.HoodieClientTestBase; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -482,125 +486,97 @@ public class TestCleaner extends HoodieClientTestBase { * Test HoodieTable.clean() Cleaning by versions logic. */ @Test - public void testKeepLatestFileVersions() throws IOException { + public void testKeepLatestFileVersions() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + String p1 = "2020/01/02"; + // make 1 commit, with 1 file per partition - HoodieTestUtils.createCommitFiles(basePath, "000"); - - String file1P0C0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); - String file1P1C0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); - metaClient = HoodieTableMetaClient.reload(metaClient); - + Map partitionAndFileId000 = testTable.addCommit("000").withInserts(p0, p1); List hoodieCleanStatsOne = runCleaner(config); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", - file1P1C0)); + assertTrue(testTable.filesExist(partitionAndFileId000, "000")); // make next commit, with 1 insert & 1 update per partition - HoodieTestUtils.createCommitFiles(basePath, "001"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - String file2P0C1 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert - String file2P1C1 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update - + String file1P0C0 = partitionAndFileId000.get(p0); + String file1P1C0 = partitionAndFileId000.get(p1); + Map partitionAndFileId001 = testTable.addCommit("001") + .withUpdates(p0, file1P0C0) + .withUpdates(p1, file1P1C0) + .withInserts(p0, p1); List hoodieCleanStatsTwo = runCleaner(config); assertEquals(1, - getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStatsTwo, p0).getSuccessDeleteFiles() .size(), "Must clean 1 file"); assertEquals(1, - getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStatsTwo, p1).getSuccessDeleteFiles() .size(), "Must clean 1 file"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", - file2P1C1)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file1P0C0)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, - "000", file1P1C0)); + String file2P0C1 = partitionAndFileId001.get(p0); + String file2P1C1 = partitionAndFileId001.get(p1); + assertTrue(testTable.fileExists(p0, "001", file2P0C1)); + assertTrue(testTable.fileExists(p1, "001", file2P1C1)); + assertFalse(testTable.fileExists(p0, "000", file1P0C0)); + assertFalse(testTable.fileExists(p1, "000", file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createCommitFiles(basePath, "002"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update - String file3P0C2 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); - + String file3P0C2 = testTable.addCommit("002") + .withUpdates(p0, file1P0C0, file2P0C1) + .withInserts(p0, "002").get(p0); List hoodieCleanStatsThree = runCleaner(config); assertEquals(2, - getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + getCleanStat(hoodieCleanStatsThree, p0) .getSuccessDeleteFiles().size(), "Must clean two files"); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", - file1P0C0)); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", - file3P0C2)); + assertFalse(testTable.fileExists(p0, "001", file1P0C0)); + assertFalse(testTable.fileExists(p0, "001", file2P0C1)); + assertTrue(testTable.fileExists(p0, "002", file3P0C2)); // No cleaning on partially written file, with no commit. - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update + testTable.forCommit("003").withUpdates(p0, file3P0C2); List hoodieCleanStatsFour = runCleaner(config); assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", - file3P0C2)); + assertTrue(testTable.fileExists(p0, "003", file3P0C2)); } /** * Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files. */ @Test - public void testKeepLatestFileVersionsMOR() throws IOException { - + public void testKeepLatestFileVersionsMOR() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); - HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; // Make 3 files, one base file and 2 log files associated with base file - String file1P0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); - String file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.empty()); - HoodieTestUtils.createNewLogFile(fs, basePath, - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.of(2)); - // make 1 compaction commit - HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000"); + String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0); + testTable.forDeltaCommit("000") + .withLogFile(p0, file1P0, 1) + .withLogFile(p0, file1P0, 2); - // Make 4 files, one base file and 3 log files associated with base file - HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0); - file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, - "001", file1P0, Option.of(3)); - // make 1 compaction commit - HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001"); + // Make 2 files, one base file and 1 log files associated with base file + testTable.addDeltaCommit("001") + .withUpdates(p0, file1P0) + .withLogFile(p0, file1P0, 3); List hoodieCleanStats = runCleaner(config); assertEquals(3, - getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() .size(), "Must clean three files, one parquet and 2 log files"); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file1P0)); - assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file2P0L0, Option.empty())); - assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file2P0L0, Option.of(2))); + assertFalse(testTable.fileExists(p0, "000", file1P0)); + assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); + assertTrue(testTable.fileExists(p0, "001", file1P0)); + assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); } @Test @@ -652,33 +628,33 @@ public class TestCleaner extends HoodieClientTestBase { ); metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1); - // NOw upgrade and check + // Now upgrade and check CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); metadata = metadataMigrator.upgradeToLatest(metadata, metadata.getVersion()); - testCleanMetadataPathEquality(metadata, newExpected); + assertCleanMetadataPathEquals(newExpected, metadata); CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient); HoodieCleanMetadata oldMetadata = migrator.migrateToVersion(metadata, metadata.getVersion(), CleanerUtils.CLEAN_METADATA_VERSION_1); assertEquals(CleanerUtils.CLEAN_METADATA_VERSION_1, oldMetadata.getVersion()); - testCleanMetadataEquality(metadata, oldMetadata); - testCleanMetadataPathEquality(oldMetadata, oldExpected); + assertCleanMetadataEquals(metadata, oldMetadata); + assertCleanMetadataPathEquals(oldExpected, oldMetadata); HoodieCleanMetadata newMetadata = migrator.upgradeToLatest(oldMetadata, oldMetadata.getVersion()); assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, newMetadata.getVersion()); - testCleanMetadataEquality(oldMetadata, newMetadata); - testCleanMetadataPathEquality(newMetadata, newExpected); - testCleanMetadataPathEquality(oldMetadata, oldExpected); + assertCleanMetadataEquals(oldMetadata, newMetadata); + assertCleanMetadataPathEquals(newExpected, newMetadata); + assertCleanMetadataPathEquals(oldExpected, oldMetadata); } - public void testCleanMetadataEquality(HoodieCleanMetadata input1, HoodieCleanMetadata input2) { - assertEquals(input1.getEarliestCommitToRetain(), input2.getEarliestCommitToRetain()); - assertEquals(input1.getStartCleanTime(), input2.getStartCleanTime()); - assertEquals(input1.getTimeTakenInMillis(), input2.getTimeTakenInMillis()); - assertEquals(input1.getTotalFilesDeleted(), input2.getTotalFilesDeleted()); + private static void assertCleanMetadataEquals(HoodieCleanMetadata expected, HoodieCleanMetadata actual) { + assertEquals(expected.getEarliestCommitToRetain(), actual.getEarliestCommitToRetain()); + assertEquals(expected.getStartCleanTime(), actual.getStartCleanTime()); + assertEquals(expected.getTimeTakenInMillis(), actual.getTimeTakenInMillis()); + assertEquals(expected.getTotalFilesDeleted(), actual.getTotalFilesDeleted()); - Map map1 = input1.getPartitionMetadata(); - Map map2 = input2.getPartitionMetadata(); + Map map1 = expected.getPartitionMetadata(); + Map map2 = actual.getPartitionMetadata(); assertEquals(map1.keySet(), map2.keySet()); @@ -693,7 +669,7 @@ public class TestCleaner extends HoodieClientTestBase { assertEquals(policies1, policies2); } - private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map expected) { + private static void assertCleanMetadataPathEquals(Map expected, HoodieCleanMetadata metadata) { Map partitionMetadataMap = metadata.getPartitionMetadata(); @@ -707,54 +683,40 @@ public class TestCleaner extends HoodieClientTestBase { } } - /** - * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. - */ - @Test - public void testKeepLatestCommits() throws IOException { - testKeepLatestCommits(false, false); + private static Stream argumentsForTestKeepLatestCommits() { + return Stream.of( + Arguments.of(false, false), + Arguments.of(true, false), + Arguments.of(false, true) + ); } /** * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. */ - @Test - public void testKeepLatestCommitsWithFailureRetry() throws IOException { - testKeepLatestCommits(true, false); - } - - /** - * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. - */ - @Test - public void testKeepLatestCommitsIncrMode() throws IOException { - testKeepLatestCommits(false, true); - } - - /** - * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. - */ - private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws IOException { + @ParameterizedTest + @MethodSource("argumentsForTestKeepLatestCommits") + public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withIncrementalCleaningMode(enableIncrementalClean) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .build(); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + String p1 = "2020/01/02"; + // make 1 commit, with 1 file per partition - HoodieTestUtils.createInflightCommitFiles(basePath, "000"); - - String file1P0C0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000"); - String file1P1C0 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000"); - + Map partitionAndFileId000 = testTable.addInflightCommit("000").withInserts(p0, p1); + String file1P0C0 = partitionAndFileId000.get(p0); + String file1P1C0 = partitionAndFileId000.get(p1); HoodieCommitMetadata commitMetadata = generateCommitMetadata( Collections.unmodifiableMap(new HashMap>() { { - put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0)); - put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0)); + put(p0, CollectionUtils.createImmutableList(file1P0C0)); + put(p1, CollectionUtils.createImmutableList(file1P1C0)); } }) ); @@ -766,29 +728,20 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", - file1P1C0)); + assertTrue(testTable.fileExists(p0, "000", file1P0C0)); + assertTrue(testTable.fileExists(p1, "000", file1P1C0)); // make next commit, with 1 insert & 1 update per partition - HoodieTestUtils.createInflightCommitFiles(basePath, "001"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - String file2P0C1 = - HoodieTestUtils - .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert - String file2P1C1 = - HoodieTestUtils - .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update + Map partitionAndFileId001 = testTable.addInflightCommit("001").withInserts(p0, p1); + String file2P0C1 = partitionAndFileId001.get(p0); + String file2P1C1 = partitionAndFileId001.get(p1); + testTable.forCommit("001") + .withUpdates(p0, file1P0C0) + .withUpdates(p1, file1P1C0); commitMetadata = generateCommitMetadata(new HashMap>() { { - put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); - put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); + put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); + put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); } }); metaClient.getActiveTimeline().saveAsComplete( @@ -796,28 +749,18 @@ public class TestCleaner extends HoodieClientTestBase { Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", - file2P1C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000", - file1P1C0)); + assertTrue(testTable.fileExists(p0, "001", file2P0C1)); + assertTrue(testTable.fileExists(p1, "001", file2P1C1)); + assertTrue(testTable.fileExists(p0, "000", file1P0C0)); + assertTrue(testTable.fileExists(p1, "000", file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createInflightCommitFiles(basePath, "002"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update - String file3P0C2 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002"); - + String file3P0C2 = testTable.addInflightCommit("002") + .withUpdates(p0, file1P0C0) + .withUpdates(p0, file2P0C1) + .withInserts(p0).get(p0); commitMetadata = generateCommitMetadata(CollectionUtils - .createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + .createImmutableMap(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"), @@ -826,49 +769,35 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsThree.size(), "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file1P0C0)); + assertTrue(testTable.fileExists(p0, "000", file1P0C0)); // make next commit, with 2 updates to existing files, and 1 insert - HoodieTestUtils.createInflightCommitFiles(basePath, "003"); - metaClient = HoodieTableMetaClient.reload(metaClient); - - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update - String file4P0C3 = - HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003"); + String file4P0C3 = testTable.addInflightCommit("003") + .withUpdates(p0, file1P0C0) + .withUpdates(p0, file2P0C1) + .withInserts(p0).get(p0); commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap( - HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); + p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); metaClient.getActiveTimeline().saveAsComplete( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); assertEquals(1, - getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles() + getCleanStat(hoodieCleanStatsFour, p0).getSuccessDeleteFiles() .size(), "Must not clean one old file"); - assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", - file2P0C1)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", - file3P0C2)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", - file4P0C3)); + assertFalse(testTable.fileExists(p0, "000", file1P0C0)); + assertTrue(testTable.fileExists(p0, "001", file1P0C0)); + assertTrue(testTable.fileExists(p0, "002", file1P0C0)); + assertTrue(testTable.fileExists(p0, "001", file2P0C1)); + assertTrue(testTable.fileExists(p0, "002", file2P0C1)); + assertTrue(testTable.fileExists(p0, "002", file3P0C2)); + assertTrue(testTable.fileExists(p0, "003", file4P0C3)); // No cleaning on partially written file, with no commit. - HoodieTestUtils - .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update - commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + testTable.forCommit("004").withUpdates(p0, file3P0C2); + commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0, CollectionUtils.createImmutableList(file3P0C2))); metaClient.getActiveTimeline().createNewInstant( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004")); @@ -876,41 +805,40 @@ public class TestCleaner extends HoodieClientTestBase { new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); - HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); - assertEquals(0, - cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files"); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", - file1P0C0)); - assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", - file2P0C1)); + HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0); + assertNull(cleanStat, "Must not clean any files"); + assertTrue(testTable.fileExists(p0, "001", file1P0C0)); + assertTrue(testTable.fileExists(p0, "001", file2P0C1)); + assertTrue(testTable.fileExists(p0, "004", file3P0C2)); } /** * Test Cleaning functionality of table.rollback() API. */ @Test - public void testCleanMarkerDataFilesOnRollback() throws IOException { - List markerFiles = createMarkerFiles("000", 10); - assertEquals(10, markerFiles.size(), "Some marker files are created."); - assertEquals(markerFiles.size(), getTotalTempFiles(), "Some marker files are created."); + public void testCleanMarkerDataFilesOnRollback() throws Exception { + HoodieTestTable testTable = HoodieTestTable.of(metaClient) + .addRequestedCommit("000") + .withMarkerFiles("default", 10, IOType.MERGE); + final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size(); + assertEquals(10, numTempFilesBefore, "Some marker files are created."); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); - table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, - HoodieTimeline.COMMIT_ACTION, "000")); table.getActiveTimeline().transitionRequestedToInflight( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); metaClient.reloadActiveTimeline(); table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true); - assertEquals(0, getTotalTempFiles(), "All temp files are deleted."); + final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size(); + assertEquals(0, numTempFilesAfter, "All temp files are deleted."); } /** * Test CLeaner Stat when there are no partition paths. */ @Test - public void testCleaningWithZeroPartitionPaths() throws IOException { + public void testCleaningWithZeroPartitionPaths() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) @@ -919,9 +847,7 @@ public class TestCleaner extends HoodieClientTestBase { // Make a commit, although there are no partitionPaths. // Example use-case of this is when a client wants to create a table // with just some commit metadata, but no data/partitionPaths. - HoodieTestUtils.createCommitFiles(basePath, "000"); - - metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTestTable.of(metaClient).addCommit("000"); List hoodieCleanStatsOne = runCleaner(config); assertTrue(hoodieCleanStatsOne.isEmpty(), "HoodieCleanStats should be empty for a table with empty partitionPaths"); @@ -952,21 +878,9 @@ public class TestCleaner extends HoodieClientTestBase { * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. */ - @Test - public void testKeepLatestVersionsWithPendingCompactions() throws IOException { - testKeepLatestVersionsWithPendingCompactions(false); - } - - - /** - * Test Keep Latest Versions when there are pending compactions. - */ - @Test - public void testKeepLatestVersionsWithPendingCompactionsAndFailureRetry() throws IOException { - testKeepLatestVersionsWithPendingCompactions(true); - } - - private void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -1118,33 +1032,6 @@ public class TestCleaner extends HoodieClientTestBase { "Correct number of files under compaction deleted"); } - /** - * Utility method to create temporary data files. - * - * @param instantTime Commit Timestamp - * @param numFiles Number for files to be generated - * @return generated files - * @throws IOException in case of error - */ - private List createMarkerFiles(String instantTime, int numFiles) throws IOException { - List files = new ArrayList<>(); - for (int i = 0; i < numFiles; i++) { - files.add(HoodieClientTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime)); - } - return files; - } - - /*** - * Helper method to return temporary files count. - * - * @return Number of temporary files found - * @throws IOException in case of error - */ - private int getTotalTempFiles() throws IOException { - return FileSystemTestUtils.listRecursive(fs, new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)) - .size(); - } - private Stream> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient, List paths) { Predicate roFilePredicate = diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java index da4224a14..1f638c39f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java @@ -22,8 +22,8 @@ import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; import org.apache.hudi.common.fs.OptimisticConsistencyGuard; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; @@ -66,9 +66,9 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @ParameterizedTest @MethodSource("consistencyGuardType") public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2"); - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3"); ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000); ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName()) @@ -88,7 +88,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearFailSafe() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays @@ -98,7 +98,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearTimedWait() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -106,7 +106,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearsFailSafe() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); @@ -115,14 +115,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearsTimedWait() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); } @Test public void testCheckFailingDisappearFailSafe() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays @@ -132,7 +132,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearTimedWait() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -140,8 +140,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearsFailSafe() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); @@ -150,8 +150,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearsTimedWait() throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); - HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java index af679cee8..55b7b50af 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java @@ -18,17 +18,17 @@ package org.apache.hudi.table; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.testutils.FileSystemTestUtils; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.testutils.HoodieClientTestUtils; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.testutils.FileSystemTestUtils; -import org.apache.hudi.common.testutils.HoodieCommonTestHarness; - -import org.apache.hudi.common.util.CollectionUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.IOType; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index d8bb946da..f49d6d5c4 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; @@ -34,7 +35,6 @@ import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.testutils.HoodieClientTestBase; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.avro.Schema; import org.apache.log4j.LogManager; @@ -73,8 +73,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); - HoodieClientTestUtils.fakeCommit(basePath, "001"); - HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize); + FileCreateUtils.createCommit(basePath, "001"); + FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); @@ -193,7 +193,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0) .insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build(); - HoodieClientTestUtils.fakeCommit(basePath, "001"); + FileCreateUtils.createCommit(basePath, "001"); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 1529d7993..c044beea6 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -53,6 +53,9 @@ import org.junit.jupiter.api.Test; import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -166,7 +169,9 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file"); } } - HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime); + createDeltaCommit(basePath, newCommitTime); + createRequestedDeltaCommit(basePath, newCommitTime); + createInflightDeltaCommit(basePath, newCommitTime); // Do a compaction table = HoodieTable.create(config, hadoopConf); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java index c6652ed79..83e7ea082 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java @@ -20,16 +20,14 @@ package org.apache.hudi.table.action.rollback; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.testutils.FileSystemTestUtils; -import org.apache.hudi.io.IOType; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestBase; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -55,38 +53,20 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { cleanupResources(); } - private void givenCommit0(boolean isDeltaCommit) throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2"); - if (isDeltaCommit) { - HoodieClientTestUtils.fakeDeltaCommit(basePath, "000"); - } else { - HoodieClientTestUtils.fakeCommit(basePath, "000"); - } - } - - private void givenInflightCommit1(boolean isDeltaCommit) throws Exception { - HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1"); - HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1", IOType.CREATE); - - HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3", IOType.CREATE); - - if (isDeltaCommit) { - HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0); - HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.APPEND); - HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4", IOType.APPEND); - HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001"); - } else { - HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2"); - HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.MERGE); - HoodieClientTestUtils.fakeInFlightCommit(basePath, "001"); - } - } - @Test public void testCopyOnWriteRollback() throws Exception { // given: wrote some base files and corresponding markers - givenCommit0(false); - givenInflightCommit1(false); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String f0 = testTable.addRequestedCommit("000") + .withInserts("partA").get("partA"); + String f1 = testTable.addCommit("001") + .withUpdates("partA", f0) + .withInserts("partB").get("partB"); + String f2 = "f2"; + testTable.forCommit("001") + .withMarkerFile("partA", f0, IOType.MERGE) + .withMarkerFile("partB", f1, IOType.CREATE) + .withMarkerFile("partA", f2, IOType.CREATE); // when List stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002") @@ -95,8 +75,8 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // then: ensure files are deleted correctly, non-existent files reported as failed deletes assertEquals(2, stats.size()); - List partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA")); - List partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB")); + List partAFiles = testTable.listAllFiles("partA"); + List partBFiles = testTable.listAllFiles("partB"); assertEquals(0, partBFiles.size()); assertEquals(1, partAFiles.size()); @@ -107,8 +87,19 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { @Test public void testMergeOnReadRollback() throws Exception { // given: wrote some base + log files and corresponding markers - givenCommit0(true); - givenInflightCommit1(true); + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String f2 = testTable.addRequestedDeltaCommit("000") + .withInserts("partA").get("partA"); + String f1 = testTable.addDeltaCommit("001") + .withLogFile("partA", f2) + .withInserts("partB").get("partB"); + String f3 = "f3"; + String f4 = "f4"; + testTable.forDeltaCommit("001") + .withMarkerFile("partB", f1, IOType.CREATE) + .withMarkerFile("partA", f3, IOType.CREATE) + .withMarkerFile("partA", f2, IOType.APPEND) + .withMarkerFile("partB", f4, IOType.APPEND); // when List stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002") @@ -117,12 +108,12 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // then: ensure files are deleted, rollback block is appended (even if append does not exist) assertEquals(2, stats.size()); // will have the log file - List partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB")); + List partBFiles = testTable.listAllFiles("partB"); assertEquals(1, partBFiles.size()); assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())); assertTrue(partBFiles.get(0).getLen() > 0); - List partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA")); + List partAFiles = testTable.listAllFiles("partA"); assertEquals(3, partAFiles.size()); assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count()); assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count()); diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 6db6529f2..b61abb049 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -35,10 +35,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.IOType; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieParquetWriter; @@ -59,7 +59,6 @@ import org.apache.spark.sql.SQLContext; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -74,57 +73,6 @@ import java.util.stream.Collectors; public class HoodieClientTestUtils { private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class); - public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; - - private static void fakeMetaFile(String basePath, String instantTime, String suffix) throws IOException { - String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME; - new File(parentPath).mkdirs(); - new File(parentPath + "/" + instantTime + suffix).createNewFile(); - } - - public static void fakeCommit(String basePath, String instantTime) throws IOException { - fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); - } - - public static void fakeDeltaCommit(String basePath, String instantTime) throws IOException { - fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION); - } - - public static void fakeInflightDeltaCommit(String basePath, String instantTime) throws IOException { - fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); - } - - public static void fakeInFlightCommit(String basePath, String instantTime) throws IOException { - fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION); - } - - public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId) - throws Exception { - fakeDataFile(basePath, partitionPath, instantTime, fileId, 0); - } - - public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length) - throws Exception { - String parentPath = String.format("%s/%s", basePath, partitionPath); - new File(parentPath).mkdirs(); - String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)); - new File(path).createNewFile(); - new RandomAccessFile(path, "rw").setLength(length); - } - - public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version) - throws Exception { - fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0); - } - - public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length) - throws Exception { - String parentPath = String.format("%s/%s", basePath, partitionPath); - new File(parentPath).mkdirs(); - String path = String.format("%s/%s", parentPath, FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1")); - new File(path).createNewFile(); - new RandomAccessFile(path, "rw").setLength(length); - } /** * Returns a Spark config for this test. @@ -153,8 +101,8 @@ public class HoodieClientTestUtils { return HoodieReadClient.addHoodieSupport(sparkConf); } - public static HashMap getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, - List commitsToReturn) throws IOException { + private static HashMap getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, + List commitsToReturn) throws IOException { HashMap fileIdToFullPath = new HashMap<>(); for (HoodieInstant commit : commitsToReturn) { HoodieCommitMetadata metadata = @@ -227,6 +175,8 @@ public class HoodieClientTestUtils { /** * Find total basefiles for passed in paths. + *

+ * TODO move to {@link FileCreateUtils}. */ public static Map getBaseFileCountForPaths(String basePath, FileSystem fs, String... paths) { @@ -245,6 +195,9 @@ public class HoodieClientTestUtils { } } + /** + * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. + */ public static String writeParquetFile(String basePath, String partitionPath, String filename, List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { @@ -260,7 +213,7 @@ public class HoodieClientTestUtils { HoodieTestUtils.getDefaultHadoopConf(), Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); HoodieParquetWriter writer = new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config, - schema, new SparkTaskContextSupplier()); + schema, new SparkTaskContextSupplier()); int seqId = 1; for (HoodieRecord record : records) { GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); @@ -278,6 +231,9 @@ public class HoodieClientTestUtils { return filename; } + /** + * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. + */ public static String writeParquetFile(String basePath, String partitionPath, List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException { Thread.sleep(1000); @@ -289,27 +245,9 @@ public class HoodieClientTestUtils { createCommitTime); } - public static String createNewMarkerFile(String basePath, String partitionPath, String instantTime) - throws IOException { - return createMarkerFile(basePath, partitionPath, instantTime); - } - - public static String createMarkerFile(String basePath, String partitionPath, String instantTime) - throws IOException { - return createMarkerFile(basePath, partitionPath, instantTime, UUID.randomUUID().toString(), IOType.MERGE); - } - - public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType) - throws IOException { - String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + instantTime + "/" + partitionPath + "/"; - new File(folderPath).mkdirs(); - String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, DEFAULT_WRITE_TOKEN, instantTime, - HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType); - File f = new File(folderPath + markerFileName); - f.createNewFile(); - return f.getAbsolutePath(); - } - + /** + * TODO move to {@link FileCreateUtils}. + */ public static void createMarkerFile(String basePath, String partitionPath, String instantTime, String dataFileName) throws IOException { createTempFolderForMarkerFiles(basePath); String folderPath = getTempFolderName(basePath); @@ -318,6 +256,9 @@ public class HoodieClientTestUtils { new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + dataFileName + ".marker.MERGE").createNewFile(); } + /** + * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. + */ public static int getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime) { String folderPath = getTempFolderName(basePath); File markerDir = new File(folderPath + "/" + instantTime + "/" + partitionPath); @@ -327,11 +268,11 @@ public class HoodieClientTestUtils { return 0; } - public static void createTempFolderForMarkerFiles(String basePath) { + private static void createTempFolderForMarkerFiles(String basePath) { new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs(); } - public static String getTempFolderName(String basePath) { + private static String getTempFolderName(String basePath) { return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME; } } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/IOType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java similarity index 57% rename from hudi-client/src/main/java/org/apache/hudi/io/IOType.java rename to hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java index aa6660e72..bd29ff091 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/IOType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/IOType.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io; +package org.apache.hudi.common.model; /** * Types of lower level I/O operations done on each file slice. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java new file mode 100644 index 000000000..2da8e2966 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class FileCreateUtils { + + private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException { + Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + Files.createDirectories(parentPath); + Path metaFilePath = parentPath.resolve(instantTime + suffix); + if (Files.notExists(metaFilePath)) { + Files.createFile(metaFilePath); + } + } + + public static void createCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); + } + + public static void createRequestedCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMMIT_EXTENSION); + } + + public static void createInflightCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION); + } + + public static void createDeltaCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION); + } + + public static void createRequestedDeltaCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION); + } + + public static void createInflightDeltaCommit(String basePath, String instantTime) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); + } + + public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId) + throws Exception { + createDataFile(basePath, partitionPath, instantTime, fileId, 0); + } + + public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length) + throws Exception { + Path parentPath = Paths.get(basePath, partitionPath); + Files.createDirectories(parentPath); + Path dataFilePath = parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)); + if (Files.notExists(dataFilePath)) { + Files.createFile(dataFilePath); + } + new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length); + } + + public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version) + throws Exception { + createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0); + } + + public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length) + throws Exception { + Path parentPath = Paths.get(basePath, partitionPath); + Files.createDirectories(parentPath); + Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1")); + if (Files.notExists(logFilePath)) { + Files.createFile(logFilePath); + } + new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length); + } + + public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType) + throws IOException { + Path folderPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); + Files.createDirectories(folderPath); + String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", instantTime, + HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType); + Path markerFilePath = folderPath.resolve(markerFileName); + if (Files.notExists(markerFilePath)) { + Files.createFile(markerFilePath); + } + return markerFilePath.toAbsolutePath().toString(); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java new file mode 100644 index 000000000..32f2d4580 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.IntStream; + +import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile; +import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; + +public class HoodieTestTable { + + private final String basePath; + private final FileSystem fs; + private HoodieTableMetaClient metaClient; + private String currentInstantTime; + + private HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) { + ValidationUtils.checkArgument(Objects.equals(basePath, metaClient.getBasePath())); + ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs())); + this.basePath = basePath; + this.fs = fs; + this.metaClient = metaClient; + } + + public static HoodieTestTable of(HoodieTableMetaClient metaClient) { + return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient); + } + + public HoodieTestTable addRequestedCommit(String instantTime) throws Exception { + createRequestedCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws Exception { + createRequestedDeltaCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addInflightCommit(String instantTime) throws Exception { + createRequestedCommit(basePath, instantTime); + createInflightCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addInflightDeltaCommit(String instantTime) throws Exception { + createRequestedDeltaCommit(basePath, instantTime); + createInflightDeltaCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addCommit(String instantTime) throws Exception { + createRequestedCommit(basePath, instantTime); + createInflightCommit(basePath, instantTime); + createCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addDeltaCommit(String instantTime) throws Exception { + createRequestedDeltaCommit(basePath, instantTime); + createInflightDeltaCommit(basePath, instantTime); + createDeltaCommit(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable forCommit(String instantTime) { + currentInstantTime = instantTime; + return this; + } + + public HoodieTestTable forDeltaCommit(String instantTime) { + currentInstantTime = instantTime; + return this; + } + + public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException { + return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType); + } + + public HoodieTestTable withMarkerFile(String partitionPath, String fileId, IOType ioType) throws IOException { + createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType); + return this; + } + + public HoodieTestTable withMarkerFiles(String partitionPath, int num, IOType ioType) throws IOException { + String[] fileIds = IntStream.range(0, num).mapToObj(i -> UUID.randomUUID().toString()).toArray(String[]::new); + return withMarkerFiles(partitionPath, fileIds, ioType); + } + + public HoodieTestTable withMarkerFiles(String partitionPath, String[] fileIds, IOType ioType) throws IOException { + for (String fileId : fileIds) { + createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType); + } + return this; + } + + /** + * Insert one base file to each of the given distinct partitions. + * + * @return A {@link Map} of partition and its newly inserted file's id. + */ + public Map withInserts(String... partitions) throws Exception { + Map partitionFileIdMap = new HashMap<>(); + for (String p : partitions) { + String fileId = UUID.randomUUID().toString(); + FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId); + partitionFileIdMap.put(p, fileId); + } + return partitionFileIdMap; + } + + public HoodieTestTable withUpdates(String partition, String... fileIds) throws Exception { + for (String f : fileIds) { + FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, f); + } + return this; + } + + public String withLogFile(String partitionPath) throws Exception { + String fileId = UUID.randomUUID().toString(); + withLogFile(partitionPath, fileId); + return fileId; + } + + public HoodieTestTable withLogFile(String partitionPath, String fileId) throws Exception { + return withLogFile(partitionPath, fileId, 0); + } + + public HoodieTestTable withLogFile(String partitionPath, String fileId, int version) throws Exception { + FileCreateUtils.createLogFile(basePath, partitionPath, currentInstantTime, fileId, version); + return this; + } + + public boolean filesExist(Map partitionAndFileId, String instantTime) { + return partitionAndFileId.entrySet().stream().allMatch(entry -> { + String partition = entry.getKey(); + String fileId = entry.getValue(); + return fileExists(partition, instantTime, fileId); + }); + } + + public boolean filesExist(String partition, String instantTime, String... fileIds) { + return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, instantTime, f)); + } + + public boolean fileExists(String partition, String instantTime, String fileId) { + try { + return fs.exists(new Path(Paths.get(basePath, partition, + FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString())); + } catch (IOException e) { + throw new HoodieTestTableException(e); + } + } + + public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) { + return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v)); + } + + public boolean logFileExists(String partition, String instantTime, String fileId, int version) { + try { + return fs.exists(new Path(Paths.get(basePath, partition, + FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, "1-0-1")).toString())); + } catch (IOException e) { + throw new HoodieTestTableException(e); + } + } + + public List listAllFiles(String partitionPath) throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())); + } + + public List listAllFilesInTempFolder() throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())); + } + + public static class HoodieTestTableException extends RuntimeException { + public HoodieTestTableException(Throwable t) { + super(t); + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 92d431c2d..8b38b25ab 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -99,7 +99,6 @@ import static org.junit.jupiter.api.Assertions.fail; */ public class HoodieTestUtils { - public static final String TEST_EXTENSION = ".test"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; public static final int DEFAULT_LOG_VERSION = 1; @@ -138,7 +137,7 @@ public class HoodieTestUtils { } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - String tableName) + String tableName) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); @@ -146,7 +145,7 @@ public class HoodieTestUtils { } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - HoodieFileFormat baseFileFormat) + HoodieFileFormat baseFileFormat) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, baseFileFormat.toString()); @@ -154,7 +153,7 @@ public class HoodieTestUtils { } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - Properties properties) + Properties properties) throws IOException { properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); @@ -166,6 +165,9 @@ public class HoodieTestUtils { return COMMIT_FORMATTER.format(new Date()); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static void createCommitFiles(String basePath, String... instantTimes) throws IOException { for (String instantTime : instantTimes) { new File( @@ -176,20 +178,6 @@ public class HoodieTestUtils { + HoodieTimeline.makeInflightCommitFileName(instantTime)).createNewFile(); new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime)) - .createNewFile(); - } - } - - public static void createDeltaCommitFiles(String basePath, String... instantTimes) throws IOException { - for (String instantTime : instantTimes) { - new File( - basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile(); - new File( - basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile(); - new File( - basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(instantTime)) .createNewFile(); } } @@ -198,6 +186,9 @@ public class HoodieTestUtils { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static void createInflightCommitFiles(String basePath, String... instantTimes) throws IOException { for (String instantTime : instantTimes) { @@ -211,11 +202,12 @@ public class HoodieTestUtils { public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) { for (String instantTime : instantTimes) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime), - HoodieTimeline.makeInflightCleanerFileName(instantTime)).forEach(f -> { + HoodieTimeline.makeInflightCleanerFileName(instantTime)) + .forEach(f -> { FSDataOutputStream os = null; try { - Path commitFile = new Path( - metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); + Path commitFile = new Path(Paths + .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); os = metaClient.getFs().create(commitFile, true); // Write empty clean metadata os.write(TimelineMetadataUtils.serializeCleanerPlan( @@ -237,11 +229,12 @@ public class HoodieTestUtils { public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime), - HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> { + HoodieTimeline.makeInflightCleanerFileName(commitTime)) + .forEach(f -> { FSDataOutputStream os = null; try { - Path commitFile = new Path( - metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); + Path commitFile = new Path(Paths + .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); os = metaClient.getFs().create(commitFile, true); // Write empty clean metadata os.write(new byte[0]); @@ -259,18 +252,18 @@ public class HoodieTestUtils { }); } - public static String createNewDataFile(String basePath, String partitionPath, String instantTime) - throws IOException { - String fileID = UUID.randomUUID().toString(); - return createDataFile(basePath, partitionPath, instantTime, fileID); - } - + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String createNewDataFile(String basePath, String partitionPath, String instantTime, long length) throws IOException { String fileID = UUID.randomUUID().toString(); return createDataFileFixLength(basePath, partitionPath, instantTime, fileID, length); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String createDataFile(String basePath, String partitionPath, String instantTime, String fileID) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; @@ -279,7 +272,7 @@ public class HoodieTestUtils { return fileID; } - public static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID, + private static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID, long length) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; Files.createDirectories(Paths.get(folderPath)); @@ -291,6 +284,9 @@ public class HoodieTestUtils { return fileID; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String instantTime, String fileID, Option version) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; @@ -307,17 +303,6 @@ public class HoodieTestUtils { return fileID; } - public static void createCompactionCommitFiles(FileSystem fs, String basePath, String... instantTimes) - throws IOException { - for (String instantTime : instantTimes) { - boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + HoodieTimeline.makeCommitFileName(instantTime))); - if (!createFile) { - throw new IOException("cannot create commit file for commit " + instantTime); - } - } - } - public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, List> fileSliceList) throws IOException { HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty()); @@ -326,10 +311,16 @@ public class HoodieTestUtils { TimelineMetadataUtils.serializeCompactionPlan(plan)); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String getDataFilePath(String basePath, String partitionPath, String instantTime, String fileID) { return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String getLogFilePath(String basePath, String partitionPath, String instantTime, String fileID, Option version) { return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", instantTime, @@ -340,36 +331,43 @@ public class HoodieTestUtils { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String getInflightCommitFilePath(String basePath, String instantTime) { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static String getRequestedCompactionFilePath(String basePath, String instantTime) { return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static boolean doesDataFileExist(String basePath, String partitionPath, String instantTime, String fileID) { return new File(getDataFilePath(basePath, partitionPath, instantTime, fileID)).exists(); } - public static boolean doesLogFileExist(String basePath, String partitionPath, String instantTime, String fileID, - Option version) { - return new File(getLogFilePath(basePath, partitionPath, instantTime, fileID, version)).exists(); - } - public static boolean doesCommitExist(String basePath, String instantTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION) - .exists(); + .exists(); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static boolean doesInflightExist(String basePath, String instantTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_EXTENSION) - .exists(); + .exists(); } public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath, @@ -419,8 +417,8 @@ public class HoodieTestUtils { Writer logWriter; try { logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) - .overBaseCommit(location.getInstantTime()).withFs(fs).build(); + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) + .overBaseCommit(location.getInstantTime()).withFs(fs).build(); Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime()); @@ -474,7 +472,7 @@ public class HoodieTestUtils { public static FileStatus[] listAllDataFilesAndLogFilesInPath(FileSystem fs, String basePath) throws IOException { return Stream.concat(Arrays.stream(listAllDataFilesInPath(fs, basePath)), Arrays.stream(listAllLogFilesInPath(fs, basePath))) - .toArray(FileStatus[]::new); + .toArray(FileStatus[]::new); } public static List monotonicIncreasingCommitTimestamps(int numTimestamps, int startSecsDelta) {