diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index 9de36c6e0..d3b586724 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -281,9 +281,9 @@ public class TestHoodieIndex extends HoodieClientTestHarness { // We create three parquet file, each having one record. (two different partitions) HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); - String fileId1 = testTable.addCommit("001").withInserts(p1, record1); - String fileId2 = testTable.addCommit("002").withInserts(p1, record2); - String fileId3 = testTable.addCommit("003").withInserts(p2, record4); + String fileId1 = testTable.addCommit("001").getFileIdWithInserts(p1, record1); + String fileId2 = testTable.addCommit("002").getFileIdWithInserts(p1, record2); + String fileId3 = testTable.addCommit("003").getFileIdWithInserts(p2, record4); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); @@ -375,7 +375,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness { incomingPayloadSamePartition); // We have some records to be tagged (two different partitions) - testTable.addCommit("1000").withInserts(p1, originalRecord); + testTable.addCommit("1000").getFileIdWithInserts(p1, originalRecord); // test against incoming record with a different partition JavaRDD recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 2d091a0ea..39d9b6408 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -223,7 +223,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); filter.add(record3.getRecordKey()); HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter); - String fileId = testTable.addCommit("000").withInserts(partition, record1, record2); + String fileId = testTable.addCommit("000").getFileIdWithInserts(partition, record1, record2); String filename = testTable.getBaseFileNameById(fileId); // The bloom filter contains 3 records @@ -310,9 +310,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { } // We create three parquet file, each having one record. (two different partitions) - String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1); - String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2); - String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4); + String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1); + String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2); + String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4); // We do the tag again taggedRecordRDD = bloomIndex.tagLocation(recordRDD, context, HoodieSparkTable.create(config, context, metaClient)); @@ -380,9 +380,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { } // We create three parquet file, each having one record. (two different partitions) - String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1); - String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2); - String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4); + String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1); + String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2); + String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); @@ -433,7 +433,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { BloomFilterTypeCode.SIMPLE.name()); filter.add(record2.getRecordKey()); HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter); - String fileId = testTable.addCommit("000").withInserts("2016/01/31", record1); + String fileId = testTable.addCommit("000").getFileIdWithInserts("2016/01/31", record1); assertTrue(filter.mightContain(record1.getRecordKey())); assertTrue(filter.mightContain(record2.getRecordKey())); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 2f68a032e..e6fc3be5d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -217,10 +217,10 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up - String fileId1 = testTable.addCommit("1000").withInserts("2016/04/01", record1); - String fileId2 = testTable.addCommit("2000").withInserts("2015/03/12"); - String fileId3 = testTable.addCommit("3000").withInserts("2015/03/12", record2); - String fileId4 = testTable.addCommit("4000").withInserts("2015/03/12", record4); + String fileId1 = testTable.addCommit("1000").getFileIdWithInserts("2016/04/01", record1); + String fileId2 = testTable.addCommit("2000").getFileIdWithInserts("2015/03/12"); + String fileId3 = testTable.addCommit("3000").getFileIdWithInserts("2015/03/12", record2); + String fileId4 = testTable.addCommit("4000").getFileIdWithInserts("2015/03/12", record4); // partitions will NOT be respected by this loadInvolvedFiles(...) call JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, context, hoodieTable); @@ -299,7 +299,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()), incomingPayloadSamePartition); - testTable.addCommit("1000").withInserts(p1, originalRecord); + testTable.addCommit("1000").getFileIdWithInserts(p1, originalRecord); // test against incoming record with a different partition JavaRDD recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java index 38cd19c67..3a7d468e7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java @@ -130,7 +130,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { for (List recordsPerSlice : recordsForFileSlices) { String instantTime = makeNewCommitTime(); - String fileId = testTable.addCommit(instantTime).withInserts(entry.getKey(), recordsPerSlice.toArray(new HoodieRecord[0])); + String fileId = testTable.addCommit(instantTime).getFileIdWithInserts(entry.getKey(), recordsPerSlice.toArray(new HoodieRecord[0])); Tuple2 fileIdInstantTimePair = new Tuple2<>(fileId, instantTime); List> expectedEntries = new ArrayList<>(); for (HoodieRecord record : recordsPerSlice) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 88f755a44..f2427cd9a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -18,11 +18,13 @@ package org.apache.hudi.io; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; @@ -32,15 +34,21 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.testutils.HoodieClientTestHarness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -48,11 +56,14 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Random; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -146,13 +157,14 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - HoodieTestUtils.createCleanFiles(metaClient, basePath, "100", wrapperFs.getConf()); - HoodieTestUtils.createCleanFiles(metaClient, basePath, "101", wrapperFs.getConf()); - HoodieTestUtils.createCleanFiles(metaClient, basePath, "102", wrapperFs.getConf()); - HoodieTestUtils.createCleanFiles(metaClient, basePath, "103", wrapperFs.getConf()); - HoodieTestUtils.createCleanFiles(metaClient, basePath, "104", wrapperFs.getConf()); - HoodieTestUtils.createCleanFiles(metaClient, basePath, "105", wrapperFs.getConf()); - HoodieTestUtils.createPendingCleanFiles(metaClient, "106", "107"); + createCleanMetadata("100", false); + createCleanMetadata("101", false); + createCleanMetadata("102", false); + createCleanMetadata("103", false); + createCleanMetadata("104", false); + createCleanMetadata("105", false); + createCleanMetadata("106", true); + createCleanMetadata("107", true); // reload the timeline and get all the commmits before archive timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); @@ -227,7 +239,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { int numCommits = 4; int commitInstant = 100; for (int i = 0; i < numCommits; i++) { - createReplaceMetadata(commitInstant); + createReplaceMetadata(String.valueOf(commitInstant)); commitInstant += 100; } @@ -478,17 +490,34 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString()); } - private void createReplaceMetadata(int commitInstant) throws Exception { - String commitTime = "" + commitInstant; - String fileId1 = "file-" + commitInstant + "-1"; - String fileId2 = "file-" + commitInstant + "-2"; + private void createReplaceMetadata(String instantTime) throws Exception { + String fileId1 = "file-" + instantTime + "-1"; + String fileId2 = "file-" + instantTime + "-2"; // create replace instant to mark fileId1 as deleted HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1); replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE); - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - testTable.addReplaceCommit(commitTime, replaceMetadata); - testTable.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + HoodieTestTable.of(metaClient) + .addReplaceCommit(instantTime, replaceMetadata) + .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2); + } + + private void createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException { + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), + CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + if (inflightOnly) { + HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan); + } else { + HoodieCleanStat cleanStats = new HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, + HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + instantTime); + HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); + HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata); + } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 730e1ef01..152a9814a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -982,7 +982,7 @@ public class TestCleaner extends HoodieClientTestBase { HoodieTestTable testTable = HoodieTestTable.of(metaClient) .addRequestedCommit("000") .withMarkerFiles("default", 10, IOType.MERGE); - final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size(); + final int numTempFilesBefore = testTable.listAllFilesInTempFolder().length; assertEquals(10, numTempFilesBefore, "Some marker files are created."); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); @@ -992,7 +992,7 @@ public class TestCleaner extends HoodieClientTestBase { new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); metaClient.reloadActiveTimeline(); table.rollback(context, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true); - final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size(); + final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length; assertEquals(0, numTempFilesAfter, "All temp files are deleted."); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index e7db7ada4..8b47fa3d4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -42,6 +42,7 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.Transformations; import org.apache.hudi.common.util.Option; @@ -63,6 +64,7 @@ import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExec import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; +import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.avro.generic.GenericRecord; @@ -159,7 +161,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.compact(compactionCommitTime); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); @@ -207,7 +209,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.compact(compactionCommitTime); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); @@ -377,7 +379,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); @@ -418,7 +420,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); @@ -476,7 +478,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); final String absentCommit = newCommitTime; @@ -524,7 +526,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); @@ -558,7 +560,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // Test failed delta commit rollback secondClient.rollback(commitTime1); - allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + allFiles = listAllBaseFilesInPath(hoodieTable); // After rollback, there should be no base file with the failed commit time List remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName() .contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList()); @@ -593,7 +595,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // Test successful delta commit rollback thirdClient.rollback(commitTime2); - allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + allFiles = listAllBaseFilesInPath(hoodieTable); // After rollback, there should be no parquet file with the failed commit time assertEquals(0, Arrays.stream(allFiles) .filter(file -> file.getPath().getName().contains(commitTime2)).count()); @@ -624,16 +626,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString(); thirdClient.compact(compactionInstantTime); - allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + allFiles = listAllBaseFilesInPath(hoodieTable); metaClient = HoodieTableMetaClient.reload(metaClient); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp(); - assertTrue(Arrays.stream(listAllDataFilesInPath(hoodieTable, cfg.getBasePath())) + assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable)) .anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime()))); thirdClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime), hoodieTable); - allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + allFiles = listAllBaseFilesInPath(hoodieTable); metaClient = HoodieTableMetaClient.reload(metaClient); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); @@ -680,7 +682,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); @@ -759,7 +761,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { JavaRDD ws = (JavaRDD) client.compact(compactionInstantTime); client.commitCompaction(compactionInstantTime, ws, Option.empty()); - allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + allFiles = listAllBaseFilesInPath(hoodieTable); metaClient = HoodieTableMetaClient.reload(metaClient); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); @@ -787,7 +789,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.restoreToInstant("000"); metaClient = HoodieTableMetaClient.reload(metaClient); - allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + allFiles = listAllBaseFilesInPath(hoodieTable); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); @@ -842,7 +844,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); @@ -876,7 +878,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + allFiles = listAllBaseFilesInPath(hoodieTable); roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); @@ -919,15 +921,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect(); // Write them to corresponding avro logfiles - HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(), - HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords); - - // Verify that all data file has one log file metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(config, context, metaClient); + HoodieWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS) + .withLogAppends(updatedRecords); // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state ((SyncableFileSystemView) (table.getSliceView())).reset(); + // Verify that all data file has one log file for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); @@ -1400,7 +1401,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); @@ -1499,7 +1500,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { Option commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant(); assertFalse(commit.isPresent()); - FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); @@ -1533,7 +1534,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); - return listAllDataFilesInPath(hoodieTable, cfg.getBasePath()); + return listAllBaseFilesInPath(hoodieTable); } private FileStatus[] getROSnapshotFiles(String partitionPath) @@ -1598,8 +1599,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertEquals(expectedCommitsSet, actualCommits); } - private FileStatus[] listAllDataFilesInPath(HoodieTable table, String basePath) throws IOException { - return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), basePath, table.getBaseFileExtension()); + private FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOException { + return HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(table.getBaseFileExtension()); } private FileStatus[] listStatus(JobConf jobConf, boolean realtime) throws IOException { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index d1d31f824..6992a824d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -35,7 +35,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -130,7 +130,7 @@ public class CompactionTestBase extends HoodieClientTestBase { assertNoWriteErrors(statusList); metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); - List dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg); + List dataFilesToRead = getCurrentLatestBaseFiles(hoodieTable); assertTrue(dataFilesToRead.stream().findAny().isPresent(), "should list the parquet files we wrote in the delta commit"); validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg); @@ -225,10 +225,10 @@ public class CompactionTestBase extends HoodieClientTestBase { return statusList; } - protected List getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException { - FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); + protected List getCurrentLatestBaseFiles(HoodieTable table) throws IOException { + FileStatus[] allBaseFiles = HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(); HoodieTableFileSystemView view = - getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); + getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allBaseFiles); return view.getLatestBaseFiles().collect(Collectors.toList()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 7655a75ce..2e6cea70a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -44,6 +44,7 @@ import org.apache.hudi.index.bloom.SparkHoodieBloomIndex; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; @@ -154,8 +155,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { updatedRecords = ((JavaRDD)index.tagLocation(updatedRecordsRDD, context, table)).collect(); // Write them to corresponding avro logfiles. Also, set the state transition properly. - HoodieTestUtils.writeRecordsToLogFiles(fs, metaClient.getBasePath(), - HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords); + HoodieWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS) + .withLogAppends(updatedRecords); metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); writeClient.commit(newCommitTime, jsc.emptyRDD(), Option.empty()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java index 4ab189adf..191e90fa7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -75,11 +76,11 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // then: ensure files are deleted correctly, non-existent files reported as failed deletes assertEquals(2, stats.size()); - List partAFiles = testTable.listAllFiles("partA"); - List partBFiles = testTable.listAllFiles("partB"); + FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA"); + FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB"); - assertEquals(0, partBFiles.size()); - assertEquals(1, partAFiles.size()); + assertEquals(0, partBFiles.length); + assertEquals(1, partAFiles.length); assertEquals(2, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum()); assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum()); } @@ -108,15 +109,15 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // then: ensure files are deleted, rollback block is appended (even if append does not exist) assertEquals(2, stats.size()); // will have the log file - List partBFiles = testTable.listAllFiles("partB"); - assertEquals(1, partBFiles.size()); - assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())); - assertTrue(partBFiles.get(0).getLen() > 0); + FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB"); + assertEquals(1, partBFiles.length); + assertTrue(partBFiles[0].getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())); + assertTrue(partBFiles[0].getLen() > 0); - List partAFiles = testTable.listAllFiles("partA"); - assertEquals(3, partAFiles.size()); - assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count()); - assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count()); + FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA"); + assertEquals(3, partAFiles.length); + assertEquals(2, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count()); + assertEquals(1, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count()); // only partB/f1_001 will be deleted assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index c2faa83c8..e167a0f4b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -25,8 +25,13 @@ import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.config.HoodieStorageConfig; @@ -36,19 +41,29 @@ import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import java.io.IOException; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; public class HoodieWriteableTestTable extends HoodieTestTable { + private static final Logger LOG = LogManager.getLogger(HoodieWriteableTestTable.class); private final Schema schema; private final BloomFilter filter; @@ -89,11 +104,15 @@ public class HoodieWriteableTestTable extends HoodieTestTable { return (HoodieWriteableTestTable) super.forCommit(instantTime); } - public String withInserts(String partition) throws Exception { - return withInserts(partition, new HoodieRecord[0]); + public String getFileIdWithInserts(String partition) throws Exception { + return getFileIdWithInserts(partition, new HoodieRecord[0]); } - public String withInserts(String partition, HoodieRecord... records) throws Exception { + public String getFileIdWithInserts(String partition, HoodieRecord... records) throws Exception { + return getFileIdWithInserts(partition, Arrays.asList(records)); + } + + public String getFileIdWithInserts(String partition, List records) throws Exception { String fileId = UUID.randomUUID().toString(); withInserts(partition, fileId, records); return fileId; @@ -104,6 +123,10 @@ public class HoodieWriteableTestTable extends HoodieTestTable { } public HoodieWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception { + return withInserts(partition, fileId, Arrays.asList(records)); + } + + public HoodieWriteableTestTable withInserts(String partition, String fileId, List records) throws Exception { FileCreateUtils.createPartitionMetaFile(basePath, partition); String fileName = baseFileName(currentInstantTime, fileId); @@ -128,4 +151,38 @@ public class HoodieWriteableTestTable extends HoodieTestTable { return this; } + + public HoodieWriteableTestTable withLogAppends(HoodieRecord... records) throws Exception { + return withLogAppends(Arrays.asList(records)); + } + + public HoodieWriteableTestTable withLogAppends(List records) throws Exception { + for (List groupedRecords: records.stream() + .collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) { + appendRecordsToLogFile(groupedRecords); + } + return this; + } + + private void appendRecordsToLogFile(List groupedRecords) throws Exception { + String partitionPath = groupedRecords.get(0).getPartitionPath(); + HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation(); + try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) + .overBaseCommit(location.getInstantTime()).withFs(fs).build()) { + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime()); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> { + try { + GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get(); + HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), ""); + return (IndexedRecord) val; + } catch (IOException e) { + LOG.warn("Failed to convert record " + r.toString(), e); + return null; + } + }).collect(Collectors.toList()), header)); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 16d1ff950..bca91f800 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -19,7 +19,8 @@ package org.apache.hudi.common.testutils; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodiePartitionMetadata; @@ -31,6 +32,8 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.exception.HoodieException; +import org.apache.hadoop.fs.FileSystem; + import java.io.IOException; import java.io.RandomAccessFile; import java.nio.charset.StandardCharsets; @@ -40,6 +43,9 @@ import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; +import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata; +import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan; + public class FileCreateUtils { private static final String WRITE_TOKEN = "1-0-1"; @@ -122,6 +128,18 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION); } + public static void createCleanFile(String basePath, String instantTime, HoodieCleanMetadata metadata) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, serializeCleanMetadata(metadata).get()); + } + + public static void createRequestedCleanFile(String basePath, String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_CLEAN_EXTENSION, serializeCleanerPlan(cleanerPlan).get()); + } + + public static void createInflightCleanFile(String basePath, String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION, serializeCleanerPlan(cleanerPlan).get()); + } + private static void createAuxiliaryMetaFile(String basePath, String instantTime, String suffix) throws IOException { Path parentPath = Paths.get(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); Files.createDirectories(parentPath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index 0aad0c2b3..96a00da6f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -23,8 +23,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; - import org.apache.hudi.exception.HoodieIOException; + import org.junit.jupiter.api.io.TempDir; import java.io.IOException; @@ -88,7 +88,7 @@ public class HoodieCommonTestHarness { try { return new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), - HoodieTestUtils.listAllDataFilesAndLogFilesInPath(metaClient.getFs(), metaClient.getBasePath()) + HoodieTestTable.of(metaClient).listAllBaseAndLogFiles() ); } catch (IOException ioe) { throw new HoodieIOException("Error getting file system view", ioe); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 06a4aa4b2..9cacf1faa 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -19,6 +19,9 @@ package org.apache.hudi.common.testutils; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -41,18 +44,22 @@ import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.time.temporal.ChronoUnit.SECONDS; import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; +import static org.apache.hudi.common.testutils.FileCreateUtils.createCleanFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCleanFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCompaction; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; @@ -158,6 +165,23 @@ public class HoodieTestTable { return this; } + public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException { + createRequestedCleanFile(basePath, instantTime, cleanerPlan); + createInflightCleanFile(basePath, instantTime, cleanerPlan); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + + public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPlan, HoodieCleanMetadata metadata) throws IOException { + createRequestedCleanFile(basePath, instantTime, cleanerPlan); + createInflightCleanFile(basePath, instantTime, cleanerPlan); + createCleanFile(basePath, instantTime, metadata); + currentInstantTime = instantTime; + metaClient = HoodieTableMetaClient.reload(metaClient); + return this; + } + public HoodieTestTable addRequestedCompaction(String instantTime) throws IOException { createRequestedCompaction(basePath, instantTime); currentInstantTime = instantTime; @@ -348,12 +372,36 @@ public class HoodieTestTable { return baseFileName(currentInstantTime, fileId); } - public List listAllFiles(String partitionPath) throws IOException { - return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())); + public FileStatus[] listAllBaseFiles() throws IOException { + return listAllBaseFiles(HoodieFileFormat.PARQUET.getFileExtension()); } - public List listAllFilesInTempFolder() throws IOException { - return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())); + public FileStatus[] listAllBaseFiles(String fileExtension) throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(basePath)).stream() + .filter(status -> status.getPath().getName().endsWith(fileExtension)) + .toArray(FileStatus[]::new); + } + + public FileStatus[] listAllLogFiles() throws IOException { + return listAllLogFiles(HoodieFileFormat.HOODIE_LOG.getFileExtension()); + } + + public FileStatus[] listAllLogFiles(String fileExtension) throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(basePath)).stream() + .filter(status -> status.getPath().getName().contains(fileExtension)) + .toArray(FileStatus[]::new); + } + + public FileStatus[] listAllBaseAndLogFiles() throws IOException { + return Stream.concat(Stream.of(listAllBaseFiles()), Stream.of(listAllLogFiles())).toArray(FileStatus[]::new); + } + + public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]); + } + + public FileStatus[] listAllFilesInTempFolder() throws IOException { + return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]); } public static class HoodieTestTableException extends RuntimeException { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 6547f807c..1f86a5925 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -18,74 +18,43 @@ package org.apache.hudi.common.testutils; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.util.StringUtils; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieActionInstant; -import org.apache.hudi.avro.model.HoodieCleanMetadata; -import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; -import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; -import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieIOException; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.StringUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.Random; import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.junit.jupiter.api.Assertions.fail; /** * A utility class for testing. @@ -96,7 +65,6 @@ public class HoodieTestUtils { public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; public static final int DEFAULT_LOG_VERSION = 1; public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"}; - private static Random rand = new Random(46474747); public static Configuration getDefaultHadoopConf() { return new Configuration(); @@ -171,35 +139,6 @@ public class HoodieTestUtils { } } - public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) { - for (String instantTime : instantTimes) { - Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime), - HoodieTimeline.makeInflightCleanerFileName(instantTime)) - .forEach(f -> { - FSDataOutputStream os = null; - try { - Path commitFile = new Path(Paths - .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); - os = metaClient.getFs().create(commitFile, true); - // Write empty clean metadata - os.write(TimelineMetadataUtils.serializeCleanerPlan( - new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>())).get()); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } finally { - if (null != os) { - try { - os.close(); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - } - } - }); - } - } - /** * @deprecated Use {@link HoodieTestTable} instead. */ @@ -243,26 +182,6 @@ public class HoodieTestUtils { TimelineMetadataUtils.serializeCompactionPlan(plan)); } - public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath, - String instantTime, Configuration configuration) - throws IOException { - createPendingCleanFiles(metaClient, instantTime); - Path commitFile = new Path( - basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(instantTime)); - FileSystem fs = FSUtils.getFs(basePath, configuration); - try (FSDataOutputStream os = fs.create(commitFile, true)) { - HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, - DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], new ArrayList<>(), new ArrayList<>(), - new ArrayList<>(), instantTime); - // Create the clean metadata - - HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); - // Write empty clean metadata - os.write(TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata).get()); - } - } - public static T serializeDeserialize(T object, Class clazz) { // Using Kyro as the default serializer in Spark Jobs Kryo kryo = new Kryo(); @@ -279,78 +198,6 @@ public class HoodieTestUtils { return deseralizedObject; } - /** - * @deprecated Use {@link HoodieTestTable} instead. - */ - public static void writeRecordsToLogFiles(FileSystem fs, String basePath, Schema schema, - List updatedRecords) { - Map> groupedUpdated = - updatedRecords.stream().collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)); - - groupedUpdated.forEach((location, value) -> { - String partitionPath = value.get(0).getPartitionPath(); - - Writer logWriter; - try { - logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) - .overBaseCommit(location.getInstantTime()).withFs(fs).build(); - - Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime()); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); - logWriter.appendBlock(new HoodieAvroDataBlock(value.stream().map(r -> { - try { - GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get(); - HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), ""); - return (IndexedRecord) val; - } catch (IOException e) { - return null; - } - }).collect(Collectors.toList()), header)); - logWriter.close(); - } catch (Exception e) { - fail(e.toString()); - } - }); - } - - // TODO: should be removed - public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath) throws IOException { - return listAllDataFilesInPath(fs, basePath, HoodieFileFormat.PARQUET.getFileExtension()); - } - - public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath, String datafileExtension) - throws IOException { - RemoteIterator itr = fs.listFiles(new Path(basePath), true); - List returns = new ArrayList<>(); - while (itr.hasNext()) { - LocatedFileStatus status = itr.next(); - if (status.getPath().getName().contains(datafileExtension) && !status.getPath().getName().contains(".marker")) { - returns.add(status); - } - } - return returns.toArray(new FileStatus[returns.size()]); - } - - public static FileStatus[] listAllLogFilesInPath(FileSystem fs, String basePath) - throws IOException { - RemoteIterator itr = fs.listFiles(new Path(basePath), true); - List returns = new ArrayList<>(); - while (itr.hasNext()) { - LocatedFileStatus status = itr.next(); - if (status.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { - returns.add(status); - } - } - return returns.toArray(new FileStatus[returns.size()]); - } - - public static FileStatus[] listAllDataFilesAndLogFilesInPath(FileSystem fs, String basePath) throws IOException { - return Stream.concat(Arrays.stream(listAllDataFilesInPath(fs, basePath)), Arrays.stream(listAllLogFilesInPath(fs, basePath))) - .toArray(FileStatus[]::new); - } - public static List generateFakeHoodieWriteStat(int limit) { List writeStatList = new ArrayList<>(); for (int i = 0; i < limit; i++) {