From 3201665295122a892c8ade92aca5f72426a403fd Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Thu, 17 Sep 2020 09:29:07 -0700 Subject: [PATCH] [HUDI-995] Use HoodieTestTable in more classes (#2079) * [HUDI-995] Use HoodieTestTable in more classes Migrate test data prep logic in - TestStatsCommand - TestHoodieROTablePathFilter Re-implement methods for create new commit times in HoodieTestUtils and HoodieClientTestHarness - Move relevant APIs to HoodieTestTable - Migrate usages After changing to HoodieTestTable APIs, removed unused deprecated APIs in HoodieTestUtils --- .../hudi/cli/commands/TestStatsCommand.java | 21 ++-- .../io/TestHoodieKeyLocationFetchHandle.java | 2 +- .../org/apache/hudi/table/TestCleaner.java | 40 ++++++-- .../commit/TestCopyOnWriteActionExecutor.java | 15 +-- .../testutils/HoodieClientTestHarness.java | 10 +- .../common/testutils/FileCreateUtils.java | 17 ++++ .../common/testutils/HoodieTestTable.java | 79 +++++++++++++++ .../common/testutils/HoodieTestUtils.java | 99 +++---------------- .../hadoop/TestHoodieROTablePathFilter.java | 92 ++++++++--------- 9 files changed, 204 insertions(+), 171 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestStatsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestStatsCommand.java index 72a86b577..35e5b47da 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestStatsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestStatsCommand.java @@ -27,7 +27,7 @@ import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.Option; import com.codahale.metrics.Histogram; @@ -66,6 +66,7 @@ public class TestStatsCommand extends AbstractShellIntegrationTest { new TableCommand().createTable( tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + metaClient = HoodieCLI.getTableMetaClient(); } /** @@ -112,7 +113,7 @@ public class TestStatsCommand extends AbstractShellIntegrationTest { * Test case for command 'stats filesizes'. */ @Test - public void testFileSizeStats() throws IOException { + public void testFileSizeStats() throws Exception { String commit1 = "100"; String commit2 = "101"; Map data = new LinkedHashMap<>(); @@ -124,18 +125,20 @@ public class TestStatsCommand extends AbstractShellIntegrationTest { String partition2 = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; String partition3 = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; + HoodieTestTable testTable = HoodieTestTable.of(metaClient); Integer[] data1 = data.get(commit1); assertTrue(3 <= data1.length); - HoodieTestUtils.createNewDataFile(tablePath, partition1, commit1, data1[0]); - HoodieTestUtils.createNewDataFile(tablePath, partition2, commit1, data1[1]); - HoodieTestUtils.createNewDataFile(tablePath, partition3, commit1, data1[2]); + testTable.addCommit(commit1) + .withBaseFilesInPartition(partition1, data1[0]) + .withBaseFilesInPartition(partition2, data1[1]) + .withBaseFilesInPartition(partition3, data1[2]); Integer[] data2 = data.get(commit2); assertTrue(4 <= data2.length); - HoodieTestUtils.createNewDataFile(tablePath, partition1, commit2, data2[0]); - HoodieTestUtils.createNewDataFile(tablePath, partition2, commit2, data2[1]); - HoodieTestUtils.createNewDataFile(tablePath, partition2, commit2, data2[2]); - HoodieTestUtils.createNewDataFile(tablePath, partition3, commit2, data2[3]); + testTable.addCommit(commit2) + .withBaseFilesInPartition(partition1, data2[0]) + .withBaseFilesInPartition(partition2, data2[1], data2[2]) + .withBaseFilesInPartition(partition3, data2[3]); CommandResult cr = getShell().executeCommand("stats filesizes"); assertTrue(cr.isSuccess()); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java index ba20f5d10..a7cd50325 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java @@ -53,7 +53,7 @@ import scala.Tuple2; import static java.util.stream.Collectors.toList; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; -import static org.apache.hudi.common.testutils.HoodieTestUtils.makeNewCommitTime; +import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.Transformations.recordsToPartitionRecordsMap; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 8016acc02..c1b5296b5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -66,6 +66,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.action.clean.CleanPlanner; import org.apache.hudi.testutils.HoodieClientTestBase; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -97,6 +98,8 @@ import java.util.stream.Stream; import scala.Tuple3; +import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes; +import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -241,7 +244,7 @@ public class TestCleaner extends HoodieClientTestBase { .map(e -> Pair.of(e.getKey().getPartitionPath(), e.getValue())).collect(Collectors.toList()); HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Option.empty(), Option.empty()); - List instantTimes = HoodieTestUtils.monotonicIncreasingCommitTimestamps(9, 1); + List instantTimes = makeIncrementalCommitTimes(9); String compactionTime = instantTimes.get(0); table.getActiveTimeline().saveToCompactionRequested( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime), @@ -383,7 +386,7 @@ public class TestCleaner extends HoodieClientTestBase { HoodieCleaningPolicy.KEEP_LATEST_COMMITS); // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. - HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).forEach(newCommitTime -> { + makeIncrementalCommitTimes(8).forEach(newCommitTime -> { try { client.startCommitWithTime(newCommitTime); List records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100); @@ -432,7 +435,15 @@ public class TestCleaner extends HoodieClientTestBase { * @param config HoodieWriteConfig */ private List runCleaner(HoodieWriteConfig config) throws IOException { - return runCleaner(config, false); + return runCleaner(config, false, 1); + } + + private List runCleaner(HoodieWriteConfig config, int firstCommitSequence) throws IOException { + return runCleaner(config, false, firstCommitSequence); + } + + private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { + return runCleaner(config, simulateRetryFailure, 1); } /** @@ -440,9 +451,9 @@ public class TestCleaner extends HoodieClientTestBase { * * @param config HoodieWriteConfig */ - private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { + private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure, int firstCommitSequence) throws IOException { HoodieWriteClient writeClient = getHoodieWriteClient(config); - String cleanInstantTs = getNextInstant(); + String cleanInstantTs = makeNewCommitTime(firstCommitSequence); HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); if (null == cleanMetadata1) { @@ -463,7 +474,7 @@ public class TestCleaner extends HoodieClientTestBase { }); }); metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant); - HoodieCleanMetadata newCleanMetadata = writeClient.clean(getNextInstant()); + HoodieCleanMetadata newCleanMetadata = writeClient.clean(makeNewCommitTime(firstCommitSequence + 1)); // No new clean metadata would be created. Only the previous one will be retried assertNull(newCleanMetadata); HoodieCleanMetadata cleanMetadata2 = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant); @@ -540,7 +551,7 @@ public class TestCleaner extends HoodieClientTestBase { .withBaseFilesInPartition(p1, file1P1C0) .withBaseFilesInPartitions(p0, p1); - List hoodieCleanStatsTwo = runCleaner(config); + List hoodieCleanStatsTwo = runCleaner(config, 1); // enableBootstrapSourceClean would delete the bootstrap base file as the same time HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0); assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() @@ -581,7 +592,7 @@ public class TestCleaner extends HoodieClientTestBase { String file3P0C2 = testTable.addCommit("00000000000003") .withBaseFilesInPartition(p0, file1P0C0, file2P0C1) .withBaseFilesInPartitions(p0).get(p0); - List hoodieCleanStatsThree = runCleaner(config); + List hoodieCleanStatsThree = runCleaner(config, 3); assertEquals(2, getCleanStat(hoodieCleanStatsThree, p0) .getSuccessDeleteFiles().size(), "Must clean two files"); @@ -1061,7 +1072,18 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .build(); - HoodieTestUtils.createCorruptedPendingCleanFiles(metaClient, getNextInstant()); + String commitTime = makeNewCommitTime(1); + List cleanerFileNames = Arrays.asList( + HoodieTimeline.makeRequestedCleanerFileName(commitTime), + HoodieTimeline.makeInflightCleanerFileName(commitTime)); + for (String f : cleanerFileNames) { + Path commitFile = new Path(Paths + .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); + try (FSDataOutputStream os = metaClient.getFs().create(commitFile, true)) { + // Write empty clean metadata + os.write(new byte[0]); + } + } metaClient = HoodieTableMetaClient.reload(metaClient); List cleanStats = runCleaner(config); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 0e115d0b1..89fd592f8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -68,6 +68,7 @@ import java.util.Map; import java.util.UUID; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords; import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert; @@ -86,7 +87,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { String fileName = UUID.randomUUID().toString(); String partitionPath = "2016/05/04"; - String instantTime = HoodieTestUtils.makeNewCommitTime(); + String instantTime = makeNewCommitTime(); HoodieWriteConfig config = makeHoodieClientConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); @@ -118,7 +119,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { public void testUpdateRecords() throws Exception { // Prepare the AvroParquetIO HoodieWriteConfig config = makeHoodieClientConfig(); - String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); + String firstCommitTime = makeNewCommitTime(); HoodieWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -182,7 +183,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { List updatedRecords = Arrays.asList(updatedRecord1, insertedRecord1); Thread.sleep(1000); - String newCommitTime = HoodieTestUtils.makeNewCommitTime(); + String newCommitTime = makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); writeClient.startCommitWithTime(newCommitTime); List statuses = writeClient.upsert(jsc.parallelize(updatedRecords), newCommitTime).collect(); @@ -263,7 +264,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { // Prepare the AvroParquetIO HoodieWriteConfig config = makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build(); - String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); + String firstCommitTime = makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); @@ -317,7 +318,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { @Test public void testInsertRecords() throws Exception { HoodieWriteConfig config = makeHoodieClientConfig(); - String instantTime = HoodieTestUtils.makeNewCommitTime(); + String instantTime = makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); @@ -368,7 +369,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(HoodieStorageConfig.newBuilder() .parquetMaxFileSize(64 * 1024).hfileMaxFileSize(64 * 1024) .parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build(); - String instantTime = HoodieTestUtils.makeNewCommitTime(); + String instantTime = makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); @@ -435,7 +436,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) .withBulkInsertParallelism(2).withBulkInsertSortMode(bulkInsertMode).build(); - String instantTime = HoodieTestUtils.makeNewCommitTime(); + String instantTime = makeNewCommitTime(); HoodieWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(instantTime); metaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 736959875..2a5f47692 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -38,18 +38,17 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; -import org.apache.log4j.Logger; -import org.apache.log4j.LogManager; import java.io.IOException; import java.io.Serializable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; /** * The test harness for resource initialization and cleanup. @@ -66,17 +65,12 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient HoodieTestDataGenerator dataGen = null; protected transient ExecutorService executorService; protected transient HoodieTableMetaClient metaClient; - private static AtomicInteger instantGen = new AtomicInteger(1); protected transient HoodieWriteClient writeClient; protected transient HoodieReadClient readClient; protected transient HoodieTableFileSystemView tableView; protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); - public String getNextInstant() { - return String.format("%09d", instantGen.getAndIncrement()); - } - // dfs protected String dfsBasePath; protected transient HdfsTestService hdfsTestService; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index f2a1144d7..5da81f3cc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -100,6 +100,23 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); } + private static void createAuxiliaryMetaFile(String basePath, String instantTime, String suffix) throws IOException { + Path parentPath = Paths.get(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); + Files.createDirectories(parentPath); + Path metaFilePath = parentPath.resolve(instantTime + suffix); + if (Files.notExists(metaFilePath)) { + Files.createFile(metaFilePath); + } + } + + public static void createRequestedCompaction(String basePath, String instantTime) throws IOException { + createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION); + } + + public static void createInflightCompaction(String basePath, String instantTime) throws IOException { + createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION); + } + public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index c99fe1d0e..b80b73228 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hadoop.fs.FileStatus; @@ -29,21 +30,28 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.nio.file.Paths; +import java.time.Instant; import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.time.temporal.ChronoUnit.SECONDS; +import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCompaction; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName; @@ -66,6 +74,29 @@ public class HoodieTestTable { return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient); } + public static String makeNewCommitTime(int sequence) { + return String.format("%09d", sequence); + } + + public static String makeNewCommitTime() { + return makeNewCommitTime(Instant.now()); + } + + public static String makeNewCommitTime(Instant dateTime) { + return COMMIT_FORMATTER.format(Date.from(dateTime)); + } + + public static List makeIncrementalCommitTimes(int num) { + return makeIncrementalCommitTimes(num, 1); + } + + public static List makeIncrementalCommitTimes(int num, int firstOffsetSeconds) { + final Instant now = Instant.now(); + return IntStream.range(0, num) + .mapToObj(i -> makeNewCommitTime(now.plus(firstOffsetSeconds + i, SECONDS))) + .collect(Collectors.toList()); + } + public HoodieTestTable addRequestedCommit(String instantTime) throws Exception { createRequestedCommit(basePath, instantTime); currentInstantTime = instantTime; @@ -114,6 +145,21 @@ public class HoodieTestTable { return this; } + public HoodieTestTable addRequestedCompaction(String instantTime) throws IOException { + createRequestedCompaction(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addCompaction(String instantTime) throws IOException { + createRequestedCompaction(basePath, instantTime); + createInflightCompaction(basePath, instantTime); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + public HoodieTestTable forCommit(String instantTime) { currentInstantTime = instantTime; return this; @@ -124,6 +170,11 @@ public class HoodieTestTable { return this; } + public HoodieTestTable forCompaction(String instantTime) { + currentInstantTime = instantTime; + return this; + } + public HoodieTestTable withPartitionMetaFiles(String... partitionPaths) throws IOException { for (String partitionPath : partitionPaths) { FileCreateUtils.createPartitionMetaFile(basePath, partitionPath); @@ -174,6 +225,14 @@ public class HoodieTestTable { return this; } + public HoodieTestTable withBaseFilesInPartition(String partition, int... lengths) throws Exception { + for (int l : lengths) { + String fileId = UUID.randomUUID().toString(); + FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, fileId, l); + } + return this; + } + public String withLogFile(String partitionPath) throws Exception { String fileId = UUID.randomUUID().toString(); withLogFile(partitionPath, fileId); @@ -209,10 +268,30 @@ public class HoodieTestTable { } } + public Path getPartitionPath(String partition) { + return new Path(Paths.get(basePath, partition).toUri()); + } + public String getBaseFileNameById(String fileId) { return baseFileName(currentInstantTime, fileId); } + public Path getBaseFilePath(String partition, String fileId) { + return new Path(Paths.get(basePath, partition, getBaseFileNameById(fileId)).toUri()); + } + + public Path getInflightCommitFilePath(String instantTime) { + return new Path(Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME, instantTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION).toUri()); + } + + public Path getCommitFilePath(String instantTime) { + return new Path(Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME, instantTime + HoodieTimeline.COMMIT_EXTENSION).toUri()); + } + + public Path getRequestedCompactionFilePath(String instantTime) { + return new Path(Paths.get(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME, instantTime + HoodieTimeline.REQUESTED_COMPACTION_EXTENSION).toUri()); + } + public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) { return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v)); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 783bda4fc..84bfe228e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -71,18 +71,12 @@ import org.apache.hadoop.util.StringUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Calendar; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -92,7 +86,6 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; import static org.junit.jupiter.api.Assertions.fail; /** @@ -162,10 +155,6 @@ public class HoodieTestUtils { return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); } - public static String makeNewCommitTime() { - return COMMIT_FORMATTER.format(new Date()); - } - /** * @deprecated Use {@link HoodieTestTable} instead. */ @@ -183,10 +172,6 @@ public class HoodieTestUtils { } } - public static void createMetadataFolder(String basePath) { - new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); - } - /** * @deprecated Use {@link HoodieTestTable} instead. */ @@ -229,40 +214,6 @@ public class HoodieTestUtils { } } - public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) { - Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime), - HoodieTimeline.makeInflightCleanerFileName(commitTime)) - .forEach(f -> { - FSDataOutputStream os = null; - try { - Path commitFile = new Path(Paths - .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); - os = metaClient.getFs().create(commitFile, true); - // Write empty clean metadata - os.write(new byte[0]); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } finally { - if (null != os) { - try { - os.close(); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - } - } - }); - } - - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ - public static String createNewDataFile(String basePath, String partitionPath, String instantTime, long length) - throws IOException { - String fileID = UUID.randomUUID().toString(); - return createDataFileFixLength(basePath, partitionPath, instantTime, fileID, length); - } - /** * @deprecated Use {@link HoodieTestTable} instead. */ @@ -274,18 +225,6 @@ public class HoodieTestUtils { return fileID; } - private static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID, - long length) throws IOException { - String folderPath = basePath + "/" + partitionPath + "/"; - Files.createDirectories(Paths.get(folderPath)); - String filePath = folderPath + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID); - Files.createFile(Paths.get(filePath)); - try (FileChannel output = new FileOutputStream(new File(filePath)).getChannel()) { - output.write(ByteBuffer.allocate(1), length - 1); - } - return fileID; - } - /** * @deprecated Use {@link HoodieTestTable} instead. */ @@ -305,6 +244,11 @@ public class HoodieTestUtils { return fileID; } + /** + * TODO: incorporate into {@link HoodieTestTable}. + * + * @deprecated Use {@link HoodieTestTable} instead. + */ public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, List> fileSliceList) throws IOException { HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty()); @@ -333,22 +277,6 @@ public class HoodieTestUtils { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION; } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ - public static String getInflightCommitFilePath(String basePath, String instantTime) { - return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime - + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; - } - - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ - public static String getRequestedCompactionFilePath(String basePath, String instantTime) { - return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instantTime - + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; - } - /** * @deprecated Use {@link HoodieTestTable} instead. */ @@ -357,6 +285,9 @@ public class HoodieTestUtils { return new File(getDataFilePath(basePath, partitionPath, instantTime, fileID)).exists(); } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static boolean doesCommitExist(String basePath, String instantTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION) @@ -408,6 +339,9 @@ public class HoodieTestUtils { return deseralizedObject; } + /** + * @deprecated Use {@link HoodieTestTable} instead. + */ public static void writeRecordsToLogFiles(FileSystem fs, String basePath, Schema schema, List updatedRecords) { Map> groupedUpdated = @@ -477,17 +411,6 @@ public class HoodieTestUtils { .toArray(FileStatus[]::new); } - public static List monotonicIncreasingCommitTimestamps(int numTimestamps, int startSecsDelta) { - Calendar cal = Calendar.getInstance(); - cal.add(Calendar.SECOND, startSecsDelta); - List commits = new ArrayList<>(); - for (int i = 0; i < numTimestamps; i++) { - commits.add(COMMIT_FORMATTER.format(cal.getTime())); - cal.add(Calendar.SECOND, 1); - } - return commits; - } - public static List generateFakeHoodieWriteStat(int limit) { List writeStatList = new ArrayList<>(); for (int i = 0; i < limit; i++) { diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java index f96f6cb29..5a6070fde 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java @@ -20,84 +20,78 @@ package org.apache.hudi.hadoop; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; -import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import java.io.File; import java.io.IOException; -import java.util.ArrayList; +import java.nio.file.Files; +import java.nio.file.Paths; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -/** - * - */ public class TestHoodieROTablePathFilter extends HoodieCommonTestHarness { + private HoodieROTablePathFilter pathFilter; + private HoodieTestTable testTable; + @BeforeEach public void setUp() throws Exception { initMetaClient(); + pathFilter = new HoodieROTablePathFilter(metaClient.getHadoopConf()); + testTable = HoodieTestTable.of(metaClient); } @Test - public void testHoodiePaths() throws IOException { - // Create a temp folder as the base path - String basePath = metaClient.getBasePath(); + public void testHoodiePaths() throws Exception { + final String p1 = "2017/01/01"; + final String p2 = "2017/01/02"; + testTable.addCommit("001") + .withBaseFilesInPartition(p1, "f1", "f2") + .withBaseFilesInPartition(p2, "f3") + .addCommit("002") + .withBaseFilesInPartition(p1, "f2") + .addInflightCommit("003") + .withBaseFilesInPartition(p2, "f3") + .addRequestedCompaction("004"); - HoodieTestUtils.createCommitFiles(basePath, "001", "002"); - HoodieTestUtils.createInflightCommitFiles(basePath, "003"); - HoodieTestUtils.createCompactionRequest(metaClient, "004", new ArrayList<>()); + assertTrue(pathFilter.accept(testTable.forCommit("002").getBaseFilePath(p1, "f2"))); + assertFalse(pathFilter.accept(testTable.forCommit("003").getBaseFilePath(p2, "f3"))); + assertFalse(pathFilter.accept(testTable.forCommit("003").getBaseFilePath(p1, "f3"))); - HoodieTestUtils.createDataFile(basePath, "2017/01/01", "001", "f1"); - HoodieTestUtils.createDataFile(basePath, "2017/01/01", "001", "f2"); - HoodieTestUtils.createDataFile(basePath, "2017/01/02", "001", "f3"); - HoodieTestUtils.createDataFile(basePath, "2017/01/01", "002", "f2"); - HoodieTestUtils.createDataFile(basePath, "2017/01/02", "003", "f3"); - - HoodieROTablePathFilter pathFilter = new HoodieROTablePathFilter(); - Path partitionPath = new Path("file://" + basePath + File.separator + "2017/01/01"); - assertTrue(pathFilter.accept(partitionPath), "Directories should be accepted"); - - assertTrue( - pathFilter.accept(new Path("file:///" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "001", "f1")))); - assertFalse( - pathFilter.accept(new Path("file:///" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "001", "f2")))); - assertTrue( - pathFilter.accept(new Path("file:///" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/02", "001", "f3")))); - assertTrue( - pathFilter.accept(new Path("file:///" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "002", "f2")))); - assertFalse( - pathFilter.accept(new Path("file:///" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/02", "003", "f3")))); - assertFalse(pathFilter.accept(new Path("file:///" + HoodieTestUtils.getCommitFilePath(basePath, "001")))); - assertFalse(pathFilter.accept(new Path("file:///" + HoodieTestUtils.getCommitFilePath(basePath, "002")))); - assertFalse(pathFilter.accept(new Path("file:///" + HoodieTestUtils.getInflightCommitFilePath(basePath, "003")))); - assertFalse( - pathFilter.accept(new Path("file:///" + HoodieTestUtils.getRequestedCompactionFilePath(basePath, "004")))); + assertFalse(pathFilter.accept(testTable.getCommitFilePath("001"))); + assertFalse(pathFilter.accept(testTable.getCommitFilePath("002"))); + assertFalse(pathFilter.accept(testTable.getInflightCommitFilePath("003"))); + assertFalse(pathFilter.accept(testTable.getRequestedCompactionFilePath("004"))); assertFalse(pathFilter.accept(new Path("file:///" + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"))); assertFalse(pathFilter.accept(new Path("file:///" + basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME))); - assertFalse( - pathFilter.accept(new Path("file:///" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "003", "f3")))); assertEquals(1, pathFilter.metaClientCache.size()); } @Test - public void testNonHoodiePaths(@TempDir java.nio.file.Path tempDir) throws IOException { - String basePath = tempDir.toAbsolutePath().toString(); - HoodieROTablePathFilter pathFilter = new HoodieROTablePathFilter(); + public void testNonHoodiePaths() throws IOException { + java.nio.file.Path path1 = Paths.get(basePath, "nonhoodiefolder"); + Files.createDirectories(path1); + assertTrue(pathFilter.accept(new Path(path1.toUri()))); - String path = basePath + File.separator + "nonhoodiefolder"; - new File(path).mkdirs(); - assertTrue(pathFilter.accept(new Path("file:///" + path))); + java.nio.file.Path path2 = Paths.get(basePath, "nonhoodiefolder/somefile"); + Files.createFile(path2); + assertTrue(pathFilter.accept(new Path(path2.toUri()))); + } - path = basePath + File.separator + "nonhoodiefolder/somefile"; - new File(path).createNewFile(); - assertTrue(pathFilter.accept(new Path("file:///" + path))); + @Test + public void testPartitionPathsAsNonHoodiePaths() throws Exception { + final String p1 = "2017/01/01"; + final String p2 = "2017/01/02"; + testTable.addCommit("001").withBaseFilesInPartitions(p1, p2); + Path partitionPath1 = testTable.getPartitionPath(p1).getParent(); + Path partitionPath2 = testTable.getPartitionPath(p2).getParent(); + assertTrue(pathFilter.accept(partitionPath1), "Directories should be accepted"); + assertTrue(pathFilter.accept(partitionPath2), "Directories should be accepted"); } }