[HUDI-988] Fix Unit Test Flakiness : Ensure all instantiations of HoodieWriteClient is closed properly. Fix bug in TestRollbacks. Make CLI unit tests for Hudi CLI check skip redering strings
This commit is contained in:
committed by
Balaji Varadarajan
parent
a9a97d6af4
commit
a68180b179
@@ -68,10 +68,6 @@ public class TestMultiFS extends HoodieClientTestHarness {
|
||||
cleanupTestDataGenerator();
|
||||
}
|
||||
|
||||
private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) {
|
||||
return new HoodieWriteClient(jsc, config);
|
||||
}
|
||||
|
||||
protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
|
||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
@@ -59,8 +60,9 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
public void tearDown() throws IOException {
|
||||
cleanupSparkContexts();
|
||||
cleanupFileSystem();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -112,7 +112,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||
public void tearDown() throws IOException {
|
||||
cleanupSparkContexts();
|
||||
cleanupFileSystem();
|
||||
cleanupMetaClient();
|
||||
cleanupClients();
|
||||
cleanupTestDataGenerator();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@@ -544,10 +545,6 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
||||
}
|
||||
|
||||
private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
|
||||
return new HoodieWriteClient(jsc, cfg, false);
|
||||
}
|
||||
|
||||
private void instantiateIndex() {
|
||||
config = getConfigBuilder()
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
|
||||
|
||||
@@ -99,7 +99,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupFileSystem();
|
||||
cleanupMetaClient();
|
||||
cleanupClients();
|
||||
}
|
||||
|
||||
private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
|
||||
|
||||
@@ -82,7 +82,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
cleanupSparkContexts();
|
||||
cleanupMetaClient();
|
||||
cleanupClients();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -91,6 +91,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
@AfterAll
|
||||
public static void clean() throws Exception {
|
||||
if (utility != null) {
|
||||
utility.deleteTable(tableName);
|
||||
utility.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
@@ -123,11 +124,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupMetaClient();
|
||||
}
|
||||
|
||||
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
|
||||
return new HoodieWriteClient(jsc, config);
|
||||
cleanupClients();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -140,7 +137,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
|
||||
@@ -180,7 +177,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
|
||||
HoodieWriteClient writeClient = getHoodieWriteClient(config);
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
@@ -214,7 +211,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
HoodieWriteClient writeClient = getWriteClient(config);
|
||||
HoodieWriteClient writeClient = getHoodieWriteClient(config);
|
||||
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
@@ -264,7 +261,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// only for test, set the hbaseConnection to mocked object
|
||||
index.setHbaseConnection(hbaseConnection);
|
||||
|
||||
HoodieWriteClient writeClient = getWriteClient(config);
|
||||
HoodieWriteClient writeClient = getHoodieWriteClient(config);
|
||||
|
||||
// start a commit and generate test data
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
@@ -289,7 +286,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
public void testTotalPutsBatching() throws Exception {
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
HoodieWriteClient writeClient = getWriteClient(config);
|
||||
HoodieWriteClient writeClient = getHoodieWriteClient(config);
|
||||
|
||||
// start a commit and generate test data
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
@@ -406,7 +403,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfig(2);
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
|
||||
@@ -446,7 +443,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupMetaClient();
|
||||
cleanupClients();
|
||||
if (utility != null) {
|
||||
utility.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
|
||||
public void tearDown() throws IOException {
|
||||
cleanupSparkContexts();
|
||||
cleanupFileSystem();
|
||||
cleanupMetaClient();
|
||||
cleanupClients();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -69,11 +69,8 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
||||
cleanupFileSystem();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupSparkContexts();
|
||||
cleanupMetaClient();
|
||||
}
|
||||
|
||||
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
|
||||
return new HoodieWriteClient(jsc, config);
|
||||
cleanupClients();
|
||||
cleanupFileSystem();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -84,7 +81,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
||||
|
||||
// Build a write config with bulkinsertparallelism set
|
||||
HoodieWriteConfig cfg = getConfigBuilder().build();
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
|
||||
|
||||
/**
|
||||
@@ -226,7 +223,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
|
||||
public void testHoodieMergeHandleWriteStatMetrics() throws Exception {
|
||||
// insert 100 records
|
||||
HoodieWriteConfig config = getConfigBuilder().build();
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
String newCommitTime = "100";
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
|
||||
@@ -121,16 +121,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
cleanupDFS();
|
||||
cleanupSparkContexts();
|
||||
cleanupTestDataGenerator();
|
||||
}
|
||||
|
||||
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
|
||||
return new HoodieWriteClient(jsc, config);
|
||||
cleanupClients();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleInsertAndUpdate() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts)
|
||||
@@ -178,7 +175,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
String partitionPath = "2020/02/20"; // use only one partition for this test
|
||||
dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts)
|
||||
@@ -270,7 +267,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
@Test
|
||||
public void testMetadataAggregateFromWriteStatus() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
String newCommitTime = "001";
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
@@ -293,7 +290,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
@Test
|
||||
public void testSimpleInsertUpdateAndDelete() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts, written as parquet file)
|
||||
@@ -378,7 +375,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
|
||||
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts)
|
||||
@@ -431,7 +428,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
|
||||
|
||||
HoodieWriteConfig cfg = getConfig(false);
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
// Test delta commit rollback
|
||||
/**
|
||||
@@ -474,7 +471,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
*/
|
||||
final String commitTime1 = "002";
|
||||
// WriteClient with custom config (disable small file handling)
|
||||
try (HoodieWriteClient secondClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
|
||||
try (HoodieWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
|
||||
secondClient.startCommitWithTime(commitTime1);
|
||||
|
||||
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
|
||||
@@ -504,7 +501,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
* Write 3 (inserts + updates - testing successful delta commit)
|
||||
*/
|
||||
final String commitTime2 = "002";
|
||||
try (HoodieWriteClient thirdClient = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient thirdClient = getHoodieWriteClient(cfg);) {
|
||||
thirdClient.startCommitWithTime(commitTime2);
|
||||
|
||||
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
|
||||
@@ -580,7 +577,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
|
||||
|
||||
HoodieWriteConfig cfg = getConfig(false);
|
||||
try (final HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (final HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
/**
|
||||
* Write 1 (only inserts)
|
||||
*/
|
||||
@@ -621,7 +618,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
*/
|
||||
newCommitTime = "002";
|
||||
// WriteClient with custom config (disable small file handling)
|
||||
HoodieWriteClient nClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
|
||||
HoodieWriteClient nClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
|
||||
nClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
|
||||
@@ -744,7 +741,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
@Test
|
||||
public void testUpsertPartitioner() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts, written as parquet file)
|
||||
@@ -823,7 +820,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
public void testLogFileCountsAfterCompaction() throws Exception {
|
||||
// insert 100 records
|
||||
HoodieWriteConfig config = getConfig(true);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
String newCommitTime = "100";
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
@@ -897,7 +894,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// insert 100 records
|
||||
// Setting IndexType to be InMemory to simulate Global Index nature
|
||||
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
String newCommitTime = "100";
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
@@ -934,7 +931,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// insert 100 records
|
||||
// Setting IndexType to be InMemory to simulate Global Index nature
|
||||
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
String newCommitTime = "100";
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
@@ -1005,7 +1002,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
// insert 100 records
|
||||
// Setting IndexType to be InMemory to simulate Global Index nature
|
||||
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
String newCommitTime = "100";
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
@@ -1057,7 +1054,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
public void testRollingStatsInMetadata() throws Exception {
|
||||
|
||||
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
|
||||
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||
|
||||
@@ -1156,7 +1153,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
@Test
|
||||
public void testRollingStatsWithSmallFileHandling() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
|
||||
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
|
||||
Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
|
||||
@@ -1290,7 +1287,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
||||
@Test
|
||||
public void testHandleUpdateWithMultiplePartitions() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts, written as parquet file)
|
||||
|
||||
@@ -85,7 +85,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupMetaClient();
|
||||
cleanupClients();
|
||||
cleanupFileSystem();
|
||||
cleanupTestDataGenerator();
|
||||
}
|
||||
@@ -129,7 +129,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
|
||||
// Prepare the AvroParquetIO
|
||||
HoodieWriteConfig config = makeHoodieClientConfig();
|
||||
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
|
||||
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
|
||||
HoodieWriteClient writeClient = getHoodieWriteClient(config);
|
||||
writeClient.startCommitWithTime(firstCommitTime);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ public class TestUpsertPartitioner extends HoodieClientTestHarness {
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
cleanupSparkContexts();
|
||||
cleanupMetaClient();
|
||||
cleanupClients();
|
||||
cleanupFileSystem();
|
||||
cleanupTestDataGenerator();
|
||||
}
|
||||
|
||||
@@ -81,10 +81,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
cleanupFileSystem();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupSparkContexts();
|
||||
}
|
||||
|
||||
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
|
||||
return new HoodieWriteClient(jsc, config);
|
||||
cleanupClients();
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getConfig() {
|
||||
@@ -120,7 +117,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
HoodieWriteConfig config = getConfig();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
|
||||
@@ -137,7 +134,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
public void testWriteStatusContentsAfterCompaction() throws Exception {
|
||||
// insert 100 records
|
||||
HoodieWriteConfig config = getConfig();
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config)) {
|
||||
try (HoodieWriteClient writeClient = getHoodieWriteClient(config)) {
|
||||
String newCommitTime = "100";
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.testutils;
|
||||
|
||||
import org.apache.hudi.client.HoodieReadClient;
|
||||
import org.apache.hudi.client.HoodieWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.HoodieCleanStat;
|
||||
@@ -49,7 +48,6 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@@ -83,23 +81,6 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
|
||||
cleanupResources();
|
||||
}
|
||||
|
||||
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
|
||||
return getHoodieWriteClient(cfg, false);
|
||||
}
|
||||
|
||||
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
|
||||
return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg));
|
||||
}
|
||||
|
||||
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
|
||||
HoodieIndex index) {
|
||||
return new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
|
||||
}
|
||||
|
||||
public HoodieReadClient getHoodieReadClient(String basePath) {
|
||||
return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Default HoodieWriteConfig for tests.
|
||||
*
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.hudi.testutils;
|
||||
|
||||
import org.apache.hudi.client.HoodieReadClient;
|
||||
import org.apache.hudi.client.HoodieWriteClient;
|
||||
import org.apache.hudi.client.SparkTaskContextSupplier;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
@@ -30,6 +32,8 @@ import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.slf4j.Logger;
|
||||
@@ -56,6 +60,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
||||
protected transient ExecutorService executorService;
|
||||
protected transient HoodieTableMetaClient metaClient;
|
||||
private static AtomicInteger instantGen = new AtomicInteger(1);
|
||||
protected transient HoodieWriteClient client;
|
||||
|
||||
protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
|
||||
|
||||
@@ -84,7 +89,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
||||
* Cleanups resource group for the subclasses of {@link HoodieClientTestBase}.
|
||||
*/
|
||||
public void cleanupResources() throws IOException {
|
||||
cleanupMetaClient();
|
||||
cleanupClients();
|
||||
cleanupSparkContexts();
|
||||
cleanupTestDataGenerator();
|
||||
cleanupFileSystem();
|
||||
@@ -182,8 +187,12 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
||||
/**
|
||||
* Cleanups table type.
|
||||
*/
|
||||
protected void cleanupMetaClient() {
|
||||
protected void cleanupClients() {
|
||||
metaClient = null;
|
||||
if (null != client) {
|
||||
client.close();
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -227,6 +236,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
||||
if (hdfsTestService != null) {
|
||||
hdfsTestService.stop();
|
||||
dfsCluster.shutdown();
|
||||
hdfsTestService = null;
|
||||
dfsCluster = null;
|
||||
dfs = null;
|
||||
}
|
||||
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
|
||||
// same JVM
|
||||
@@ -267,4 +279,25 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
||||
}
|
||||
}
|
||||
|
||||
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
|
||||
return getHoodieWriteClient(cfg, false);
|
||||
}
|
||||
|
||||
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
|
||||
return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg));
|
||||
}
|
||||
|
||||
public HoodieReadClient getHoodieReadClient(String basePath) {
|
||||
return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
|
||||
}
|
||||
|
||||
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
|
||||
HoodieIndex index) {
|
||||
if (null != client) {
|
||||
client.close();
|
||||
client = null;
|
||||
}
|
||||
client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user