From e9cab67b8095b30205af27498dc0b279d188a454 Mon Sep 17 00:00:00 2001 From: garyli1019 Date: Fri, 5 Jun 2020 17:25:59 -0700 Subject: [PATCH] [HUDI-988] Fix More Unit Test Flakiness --- .../client/TestCompactionAdminClient.java | 8 -- .../org/apache/hudi/client/TestMultiFS.java | 4 +- .../hudi/client/TestTableSchemaEvolution.java | 12 -- .../client/TestUpdateSchemaEvolution.java | 3 +- .../execution/TestBoundedInMemoryQueue.java | 3 +- .../TestSparkBoundedInMemoryExecutor.java | 2 +- .../apache/hudi/index/TestHoodieIndex.java | 13 +- .../index/bloom/TestHoodieBloomIndex.java | 4 +- .../bloom/TestHoodieGlobalBloomIndex.java | 5 +- .../hudi/io/TestHoodieCommitArchiveLog.java | 3 +- .../io/TestHoodieKeyLocationFetchHandle.java | 4 +- .../apache/hudi/io/TestHoodieMergeHandle.java | 6 +- .../hudi/table/TestConsistencyGuard.java | 2 +- .../table/TestHoodieMergeOnReadTable.java | 123 ++++++++---------- .../commit/TestCopyOnWriteActionExecutor.java | 32 +---- .../action/commit/TestUpsertPartitioner.java | 23 +--- .../action/compact/TestAsyncCompaction.java | 2 +- .../action/compact/TestHoodieCompactor.java | 5 +- .../testutils/HoodieClientTestHarness.java | 67 ++++++++-- .../table/view/HoodieTableFileSystemView.java | 6 + .../service/FileSystemViewHandler.java | 2 +- pom.xml | 2 +- 22 files changed, 138 insertions(+), 193 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java index 2d691563e..1200f67cc 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java @@ -37,7 +37,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -71,13 +70,6 @@ public class TestCompactionAdminClient extends HoodieClientTestBase { client = new CompactionAdminClient(jsc, basePath); } - @AfterEach - public void tearDown() { - client.close(); - metaClient = null; - cleanupSparkContexts(); - } - @Test public void testUnscheduleCompactionPlan() throws Exception { int numEntriesPerInstant = 10; diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 02efe8e51..6a78bc57d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -63,9 +63,7 @@ public class TestMultiFS extends HoodieClientTestHarness { @AfterEach public void tearDown() throws Exception { - cleanupSparkContexts(); - cleanupDFS(); - cleanupTestDataGenerator(); + cleanupResources(); } protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 0148bcaeb..25e97c928 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -38,8 +38,6 @@ import org.apache.hudi.testutils.TestRawTripPayload; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -76,16 +74,6 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; - @BeforeEach - public void setUp() throws IOException { - initResources(); - } - - @AfterEach - public void tearDown() throws IOException { - cleanupResources(); - } - @Test public void testSchemaCompatibilityBasic() throws Exception { assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA), diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index d20b9fe61..2c985f348 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -61,8 +61,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { @AfterEach public void tearDown() throws IOException { - cleanupSparkContexts(); - cleanupFileSystem(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index d80c86d21..4b529267f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -72,8 +72,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { @AfterEach public void tearDown() throws Exception { - cleanupTestDataGenerator(); - cleanupExecutorService(); + cleanupResources(); } // Test to ensure that we are reading all records from queue iterator in the same order diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java index 2deea67a8..c55f27540 100644 --- a/hudi-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java @@ -53,7 +53,7 @@ public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness { @AfterEach public void tearDown() throws Exception { - cleanupTestDataGenerator(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index ea5f85137..67451f17f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -19,7 +19,6 @@ package org.apache.hudi.index; import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; @@ -85,7 +84,6 @@ public class TestHoodieIndex extends HoodieClientTestHarness { private IndexType indexType; private HoodieIndex index; private HoodieWriteConfig config; - private HoodieWriteClient writeClient; private String schemaStr; private Schema schema; @@ -95,14 +93,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness { private void setUp(IndexType indexType, boolean initializeIndex) throws Exception { this.indexType = indexType; - initSparkContexts("TestHoodieIndex"); - initPath(); - initTestDataGenerator(); - initFileSystem(); + initResources(); // We have some records to be tagged (two different partitions) schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); - initMetaClient(); if (initializeIndex) { instantiateIndex(); } @@ -110,10 +104,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @AfterEach public void tearDown() throws IOException { - cleanupSparkContexts(); - cleanupFileSystem(); - cleanupClients(); - cleanupTestDataGenerator(); + cleanupResources(); } @ParameterizedTest diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index bfbfa978e..97acf032f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -97,9 +97,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { @AfterEach public void tearDown() throws Exception { - cleanupSparkContexts(); - cleanupFileSystem(); - cleanupClients(); + cleanupResources(); } private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) { diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 3847047c5..6aab654c5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -80,9 +80,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { } @AfterEach - public void tearDown() { - cleanupSparkContexts(); - cleanupClients(); + public void tearDown() throws IOException { + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index 3bee3e877..9cd3b3f26 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -69,8 +69,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { @AfterEach public void clean() throws IOException { - cleanupDFS(); - cleanupSparkContexts(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java index af0e728b1..5c3c5ad15 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java @@ -82,9 +82,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { @AfterEach public void tearDown() throws IOException { - cleanupSparkContexts(); - cleanupFileSystem(); - cleanupClients(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index fa6f41a09..75acf68d3 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -66,11 +66,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { @AfterEach public void tearDown() throws Exception { - cleanupFileSystem(); - cleanupTestDataGenerator(); - cleanupSparkContexts(); - cleanupClients(); - cleanupFileSystem(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java index 5021f5e74..2406d8534 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java @@ -44,7 +44,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @AfterEach public void tearDown() throws Exception { - cleanupFileSystem(); + cleanupResources(); } @Test diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index a26b80b4f..57c0d8d27 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -119,10 +119,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { @AfterEach public void clean() throws IOException { - cleanupDFS(); - cleanupSparkContexts(); - cleanupTestDataGenerator(); - cleanupClients(); + cleanupResources(); } @Test @@ -151,9 +148,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { client.compact(compactionCommitTime); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); - HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - Stream dataFilesToRead = roView.getLatestBaseFiles(); + hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); // verify that there is a commit @@ -305,7 +302,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -316,13 +313,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "should list the parquet files we wrote in the delta commit"); @@ -358,11 +354,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); - List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 assertEquals(0, recordsRead.size(), "Must contain 0 records"); @@ -391,7 +387,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // verify there are no errors assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); Option commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertTrue(commit.isPresent()); assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); @@ -417,11 +413,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - HoodieTableFileSystemView roView = - new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); final String absentCommit = newCommitTime; - assertFalse(roView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime()))); + assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime()))); } } @@ -446,7 +441,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -457,13 +452,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream dataFilesToRead = roView.getLatestBaseFiles(); + tableView = + getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "should list the parquet files we wrote in the delta commit"); @@ -479,7 +474,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); - List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); assertEquals(recordsRead.size(), 200); @@ -493,7 +488,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // After rollback, there should be no parquet file with the failed commit time assertEquals(0, Arrays.stream(allFiles) .filter(file -> file.getPath().getName().contains(commitTime1)).count()); - dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); assertEquals(200, recordsRead.size()); } @@ -509,7 +504,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200)); - List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); assertEquals(200, recordsRead.size()); @@ -529,8 +524,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); // check that the number of records read is still correct after rollback operation assertEquals(200, recordsRead.size()); @@ -556,20 +551,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = HoodieTableMetaClient.reload(metaClient); - roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); - assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); + assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); thirdClient.rollback(compactedCommitTime); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = HoodieTableMetaClient.reload(metaClient); - roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); - assertFalse(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); + assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); } } } @@ -593,7 +588,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -604,13 +599,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - Stream dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "Should list the parquet files we wrote in the delta commit"); @@ -626,7 +620,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); - List dataFiles = roView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); assertEquals(200, recordsRead.size()); @@ -684,12 +678,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = HoodieTableMetaClient.reload(metaClient); - roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); - assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); + assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); /** * Write 5 (updates) @@ -711,12 +705,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); - dataFilesToRead = roView.getLatestBaseFiles(); + tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + dataFilesToRead = tableView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - SliceView rtView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); List fileGroups = ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList()); assertTrue(fileGroups.isEmpty()); @@ -756,7 +748,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -767,13 +759,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - BaseFileOnlyView roView = new HoodieTableFileSystemView(metaClient, + BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); Map parquetFileIdToSize = dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize)); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); List dataFilesList = dataFilesToRead.collect(Collectors.toList()); assertTrue(dataFilesList.size() > 0, @@ -801,7 +793,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { assertFalse(commit.isPresent()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - roView = new HoodieTableFileSystemView(metaClient, + roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); List newDataFilesList = dataFilesToRead.collect(Collectors.toList()); @@ -830,7 +822,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { writeClient.insert(recordsRDD, newCommitTime).collect(); // Update all the 100 records - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + metaClient = getHoodieMetaClient(hadoopConf, basePath); newCommitTime = "101"; writeClient.startCommitWithTime(newCommitTime); @@ -905,7 +897,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { writeClient.commit(newCommitTime, statuses); HoodieTable table = - HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); + HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -966,7 +958,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs // and calling rollback twice final String lastCommitTime = newCommitTime; - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + metaClient = getHoodieMetaClient(hadoopConf, basePath); HoodieInstant last = metaClient.getCommitsTimeline().getInstants() .filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get(); String fileName = last.getFileName(); @@ -1015,7 +1007,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { statuses.collect(); HoodieTable table = - HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); + HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -1036,7 +1028,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); // Trigger a rollback of compaction writeClient.rollback(newCommitTime); - table = HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); + table = HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf); tableRTFileSystemView = table.getSliceView(); ((SyncableFileSystemView) tableRTFileSystemView).reset(); Option lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant(); @@ -1056,7 +1048,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); + metaClient = getHoodieMetaClient(hadoopConf, basePath); HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); // Create a commit without rolling stats in metadata to test backwards compatibility @@ -1155,7 +1147,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { public void testRollingStatsWithSmallFileHandling() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); Map fileIdToInsertsMap = new HashMap<>(); Map fileIdToUpsertsMap = new HashMap<>(); @@ -1302,7 +1293,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); @@ -1314,11 +1305,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); assertFalse(dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "should list the parquet files we wrote in the delta commit"); @@ -1398,7 +1389,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List statuses = client.insert(writeRecords, commitTime).collect(); assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); + metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); @@ -1410,11 +1401,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); BaseFileOnlyView roView = - new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), "should list the parquet files we wrote in the delta commit"); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 25b64ae29..63a04ec48 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -37,7 +37,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.TestRawTripPayload; import org.apache.hudi.testutils.TestRawTripPayload.MetadataMergeWriteStatus; @@ -52,8 +52,6 @@ import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; import org.apache.spark.TaskContext; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.File; @@ -69,27 +67,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { +public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class); - @BeforeEach - public void setUp() throws Exception { - initSparkContexts("TestCopyOnWriteActionExecutor"); - initPath(); - initMetaClient(); - initTestDataGenerator(); - initFileSystem(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupSparkContexts(); - cleanupClients(); - cleanupFileSystem(); - cleanupTestDataGenerator(); - } - @Test public void testMakeNewPath() throws Exception { String fileName = UUID.randomUUID().toString(); @@ -173,7 +154,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { GenericRecord newRecord; int index = 0; for (GenericRecord record : fileRecords) { - System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey()); + //System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey()); assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString()); index++; } @@ -427,11 +408,4 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { }).map(x -> (List) HoodieClientTestUtils.collectStatuses(x)).collect(); assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords()); } - - @AfterEach - public void cleanup() { - if (jsc != null) { - jsc.stop(); - } - } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 7f15379f8..0926a376c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -29,14 +29,12 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; -import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -46,27 +44,10 @@ import scala.Tuple2; import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestUpsertPartitioner extends HoodieClientTestHarness { +public class TestUpsertPartitioner extends HoodieClientTestBase { private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class); - @BeforeEach - public void setUp() throws Exception { - initSparkContexts("TestUpsertPartitioner"); - initPath(); - initMetaClient(); - initTestDataGenerator(); - initFileSystem(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupSparkContexts(); - cleanupClients(); - cleanupFileSystem(); - cleanupTestDataGenerator(); - } - private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, String testPartitionPath, boolean autoSplitInserts) throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder() diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index 0178dba50..553be74db 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -523,7 +523,7 @@ public class TestAsyncCompaction extends HoodieClientTestBase { private List getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException { FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); HoodieTableFileSystemView view = - new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); + getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); return view.getLatestBaseFiles().collect(Collectors.toList()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 67860d76a..7fa64a591 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -78,10 +78,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { @AfterEach public void tearDown() throws Exception { - cleanupFileSystem(); - cleanupTestDataGenerator(); - cleanupSparkContexts(); - cleanupClients(); + cleanupResources(); } private HoodieWriteConfig getConfig() { diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 70bd59197..69e177601 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -22,11 +22,14 @@ import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -34,6 +37,8 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.HoodieTable; + import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.slf4j.Logger; @@ -60,7 +65,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient ExecutorService executorService; protected transient HoodieTableMetaClient metaClient; private static AtomicInteger instantGen = new AtomicInteger(1); - protected transient HoodieWriteClient client; + protected transient HoodieWriteClient writeClient; + protected transient HoodieReadClient readClient; + protected transient HoodieTableFileSystemView tableView; + protected transient HoodieTable hoodieTable; protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); @@ -93,6 +101,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im cleanupSparkContexts(); cleanupTestDataGenerator(); cleanupFileSystem(); + cleanupDFS(); + cleanupExecutorService(); + System.gc(); } /** @@ -163,6 +174,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im if (fs != null) { LOG.warn("Closing file-system instance used in previous test-run"); fs.close(); + fs = null; } } @@ -185,13 +197,22 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } /** - * Cleanups table type. + * Cleanups hoodie clients. */ - protected void cleanupClients() { - metaClient = null; - if (null != client) { - client.close(); - client = null; + protected void cleanupClients() throws IOException { + if (metaClient != null) { + metaClient = null; + } + if (readClient != null) { + readClient = null; + } + if (writeClient != null) { + writeClient.close(); + writeClient = null; + } + if (tableView != null) { + tableView.close(); + tableView = null; } } @@ -208,7 +229,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im * */ protected void cleanupTestDataGenerator() { - dataGen = null; + if (dataGen != null) { + dataGen = null; + } } /** @@ -288,16 +311,32 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im } public HoodieReadClient getHoodieReadClient(String basePath) { - return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); + readClient = new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); + return readClient; } public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, HoodieIndex index) { - if (null != client) { - client.close(); - client = null; + if (null != writeClient) { + writeClient.close(); + writeClient = null; } - client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); - return client; + writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); + return writeClient; + } + + public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) { + metaClient = new HoodieTableMetaClient(conf, basePath); + return metaClient; + } + + public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, + FileStatus[] fileStatuses) { + if (tableView == null) { + tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses); + } else { + tableView.init(metaClient, visibleActiveTimeline, fileStatuses); + } + return tableView; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java index 4d877fab2..56ae22d5c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java @@ -89,6 +89,12 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem super.init(metaClient, visibleActiveTimeline); } + public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, + FileStatus[] fileStatuses) { + init(metaClient, visibleActiveTimeline); + addFilesToView(fileStatuses); + } + @Override protected void resetViewState() { this.fgIdToPendingCompaction = null; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java index 22c4cf9eb..683eb0658 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java @@ -117,7 +117,7 @@ public class FileSystemViewHandler { synchronized (view) { if (isLocalViewBehind(ctx)) { HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline(); - LOG.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient + LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient + " as last known instant but server has the folling timeline :" + localTimeline.getInstants().collect(Collectors.toList())); view.sync(); diff --git a/pom.xml b/pom.xml index e218cff02..cc39f4136 100644 --- a/pom.xml +++ b/pom.xml @@ -246,7 +246,7 @@ ${maven-surefire-plugin.version} ${skipUTs} - -Xmx4g + -Xmx2g 120