1
0

[HUDI-988] Fix More Unit Test Flakiness

This commit is contained in:
garyli1019
2020-06-05 17:25:59 -07:00
committed by Balaji Varadarajan
parent fb283934a3
commit e9cab67b80
22 changed files with 138 additions and 193 deletions

View File

@@ -37,7 +37,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -71,13 +70,6 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
client = new CompactionAdminClient(jsc, basePath); client = new CompactionAdminClient(jsc, basePath);
} }
@AfterEach
public void tearDown() {
client.close();
metaClient = null;
cleanupSparkContexts();
}
@Test @Test
public void testUnscheduleCompactionPlan() throws Exception { public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10; int numEntriesPerInstant = 10;

View File

@@ -63,9 +63,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
cleanupSparkContexts(); cleanupResources();
cleanupDFS();
cleanupTestDataGenerator();
} }
protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {

View File

@@ -38,8 +38,6 @@ import org.apache.hudi.testutils.TestRawTripPayload;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
@@ -76,16 +74,6 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
+ TRIP_SCHEMA_SUFFIX; + TRIP_SCHEMA_SUFFIX;
@BeforeEach
public void setUp() throws IOException {
initResources();
}
@AfterEach
public void tearDown() throws IOException {
cleanupResources();
}
@Test @Test
public void testSchemaCompatibilityBasic() throws Exception { public void testSchemaCompatibilityBasic() throws Exception {
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA), assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA),

View File

@@ -61,8 +61,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws IOException { public void tearDown() throws IOException {
cleanupSparkContexts(); cleanupResources();
cleanupFileSystem();
} }
@Test @Test

View File

@@ -72,8 +72,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
cleanupTestDataGenerator(); cleanupResources();
cleanupExecutorService();
} }
// Test to ensure that we are reading all records from queue iterator in the same order // Test to ensure that we are reading all records from queue iterator in the same order

View File

@@ -53,7 +53,7 @@ public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
cleanupTestDataGenerator(); cleanupResources();
} }
@Test @Test

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.index; package org.apache.hudi.index;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
@@ -85,7 +84,6 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
private IndexType indexType; private IndexType indexType;
private HoodieIndex index; private HoodieIndex index;
private HoodieWriteConfig config; private HoodieWriteConfig config;
private HoodieWriteClient writeClient;
private String schemaStr; private String schemaStr;
private Schema schema; private Schema schema;
@@ -95,14 +93,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
private void setUp(IndexType indexType, boolean initializeIndex) throws Exception { private void setUp(IndexType indexType, boolean initializeIndex) throws Exception {
this.indexType = indexType; this.indexType = indexType;
initSparkContexts("TestHoodieIndex"); initResources();
initPath();
initTestDataGenerator();
initFileSystem();
// We have some records to be tagged (two different partitions) // We have some records to be tagged (two different partitions)
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
initMetaClient();
if (initializeIndex) { if (initializeIndex) {
instantiateIndex(); instantiateIndex();
} }
@@ -110,10 +104,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws IOException { public void tearDown() throws IOException {
cleanupSparkContexts(); cleanupResources();
cleanupFileSystem();
cleanupClients();
cleanupTestDataGenerator();
} }
@ParameterizedTest @ParameterizedTest

View File

@@ -97,9 +97,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
cleanupSparkContexts(); cleanupResources();
cleanupFileSystem();
cleanupClients();
} }
private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) { private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {

View File

@@ -80,9 +80,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
} }
@AfterEach @AfterEach
public void tearDown() { public void tearDown() throws IOException {
cleanupSparkContexts(); cleanupResources();
cleanupClients();
} }
@Test @Test

View File

@@ -69,8 +69,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void clean() throws IOException { public void clean() throws IOException {
cleanupDFS(); cleanupResources();
cleanupSparkContexts();
} }
@Test @Test

View File

@@ -82,9 +82,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws IOException { public void tearDown() throws IOException {
cleanupSparkContexts(); cleanupResources();
cleanupFileSystem();
cleanupClients();
} }
@Test @Test

View File

@@ -66,11 +66,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
cleanupFileSystem(); cleanupResources();
cleanupTestDataGenerator();
cleanupSparkContexts();
cleanupClients();
cleanupFileSystem();
} }
@Test @Test

View File

@@ -44,7 +44,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
cleanupFileSystem(); cleanupResources();
} }
@Test @Test

View File

@@ -119,10 +119,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void clean() throws IOException { public void clean() throws IOException {
cleanupDFS(); cleanupResources();
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupClients();
} }
@Test @Test
@@ -151,9 +148,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.compact(compactionCommitTime); client.compact(compactionCommitTime);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent()); assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit // verify that there is a commit
@@ -305,7 +302,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect(); List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -316,13 +313,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent()); assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView = tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent()); assertFalse(dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles(); dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(), assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit"); "should list the parquet files we wrote in the delta commit");
@@ -358,11 +354,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent()); assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles(); dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent()); assertTrue(dataFilesToRead.findAny().isPresent());
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 // Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals(0, recordsRead.size(), "Must contain 0 records"); assertEquals(0, recordsRead.size(), "Must contain 0 records");
@@ -391,7 +387,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify there are no errors // verify there are no errors
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertTrue(commit.isPresent()); assertTrue(commit.isPresent());
assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); assertEquals("001", commit.get().getTimestamp(), "commit should be 001");
@@ -417,11 +413,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
HoodieTableFileSystemView roView = tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
final String absentCommit = newCommitTime; final String absentCommit = newCommitTime;
assertFalse(roView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime()))); assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
} }
} }
@@ -446,7 +441,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = writeStatusJavaRDD.collect(); List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -457,13 +452,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent()); assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView = tableView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent()); assertTrue(!dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles(); dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(), assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit"); "should list the parquet files we wrote in the delta commit");
@@ -479,7 +474,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200); assertEquals(recordsRead.size(), 200);
@@ -493,7 +488,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// After rollback, there should be no parquet file with the failed commit time // After rollback, there should be no parquet file with the failed commit time
assertEquals(0, Arrays.stream(allFiles) assertEquals(0, Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime1)).count()); .filter(file -> file.getPath().getName().contains(commitTime1)).count());
dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(200, recordsRead.size()); assertEquals(200, recordsRead.size());
} }
@@ -509,7 +504,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords); copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200)); copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(200, recordsRead.size()); assertEquals(200, recordsRead.size());
@@ -529,8 +524,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// check that the number of records read is still correct after rollback operation // check that the number of records read is still correct after rollback operation
assertEquals(200, recordsRead.size()); assertEquals(200, recordsRead.size());
@@ -556,20 +551,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime = final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
thirdClient.rollback(compactedCommitTime); thirdClient.rollback(compactedCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
assertFalse(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
} }
} }
} }
@@ -593,7 +588,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = writeStatusJavaRDD.collect(); List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -604,13 +599,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent()); assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView = tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent()); assertFalse(dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles(); dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(), assertTrue(dataFilesToRead.findAny().isPresent(),
"Should list the parquet files we wrote in the delta commit"); "Should list the parquet files we wrote in the delta commit");
@@ -626,7 +620,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
List<String> dataFiles = roView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(200, recordsRead.size()); assertEquals(200, recordsRead.size());
@@ -684,12 +678,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime = final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp(); metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime()))); assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
/** /**
* Write 5 (updates) * Write 5 (updates)
@@ -711,12 +705,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
roView = tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles();
dataFilesToRead = roView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent()); assertFalse(dataFilesToRead.findAny().isPresent());
SliceView rtView = SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
List<HoodieFileGroup> fileGroups = List<HoodieFileGroup> fileGroups =
((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList()); ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList());
assertTrue(fileGroups.isEmpty()); assertTrue(fileGroups.isEmpty());
@@ -756,7 +748,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect(); List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -767,13 +759,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent()); assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView = new HoodieTableFileSystemView(metaClient, BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles); metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
Map<String, Long> parquetFileIdToSize = Map<String, Long> parquetFileIdToSize =
dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize)); dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles(); dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList()); List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
assertTrue(dataFilesList.size() > 0, assertTrue(dataFilesList.size() > 0,
@@ -801,7 +793,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent()); assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, roView = getHoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles); hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles(); dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList()); List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
@@ -830,7 +822,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
writeClient.insert(recordsRDD, newCommitTime).collect(); writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records // Update all the 100 records
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); metaClient = getHoodieMetaClient(hadoopConf, basePath);
newCommitTime = "101"; newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -905,7 +897,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
writeClient.commit(newCommitTime, statuses); writeClient.commit(newCommitTime, statuses);
HoodieTable table = HoodieTable table =
HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView(); SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0; long numLogFiles = 0;
@@ -966,7 +958,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
// and calling rollback twice // and calling rollback twice
final String lastCommitTime = newCommitTime; final String lastCommitTime = newCommitTime;
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); metaClient = getHoodieMetaClient(hadoopConf, basePath);
HoodieInstant last = metaClient.getCommitsTimeline().getInstants() HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
.filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get(); .filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
String fileName = last.getFileName(); String fileName = last.getFileName();
@@ -1015,7 +1007,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
statuses.collect(); statuses.collect();
HoodieTable table = HoodieTable table =
HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView(); SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0; long numLogFiles = 0;
@@ -1036,7 +1028,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction // Trigger a rollback of compaction
writeClient.rollback(newCommitTime); writeClient.rollback(newCommitTime);
table = HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf); table = HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
tableRTFileSystemView = table.getSliceView(); tableRTFileSystemView = table.getSliceView();
((SyncableFileSystemView) tableRTFileSystemView).reset(); ((SyncableFileSystemView) tableRTFileSystemView).reset();
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant(); Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
@@ -1056,7 +1048,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath); metaClient = getHoodieMetaClient(hadoopConf, basePath);
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
// Create a commit without rolling stats in metadata to test backwards compatibility // Create a commit without rolling stats in metadata to test backwards compatibility
@@ -1155,7 +1147,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
public void testRollingStatsWithSmallFileHandling() throws Exception { public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
Map<String, Long> fileIdToInsertsMap = new HashMap<>(); Map<String, Long> fileIdToInsertsMap = new HashMap<>();
Map<String, Long> fileIdToUpsertsMap = new HashMap<>(); Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
@@ -1302,7 +1293,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect(); List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf); HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -1314,11 +1305,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView = BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent()); assertFalse(dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles(); dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(), assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit"); "should list the parquet files we wrote in the delta commit");
@@ -1398,7 +1389,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect(); List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath()); metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
@@ -1410,11 +1401,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView = BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles(); Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent()); assertTrue(!dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles(); dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(), assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit"); "should list the parquet files we wrote in the delta commit");

View File

@@ -37,7 +37,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.TestRawTripPayload; import org.apache.hudi.testutils.TestRawTripPayload;
import org.apache.hudi.testutils.TestRawTripPayload.MetadataMergeWriteStatus; import org.apache.hudi.testutils.TestRawTripPayload.MetadataMergeWriteStatus;
@@ -52,8 +52,6 @@ import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.TaskContext; import org.apache.spark.TaskContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.File; import java.io.File;
@@ -69,27 +67,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class); private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class);
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestCopyOnWriteActionExecutor");
initPath();
initMetaClient();
initTestDataGenerator();
initFileSystem();
}
@AfterEach
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupClients();
cleanupFileSystem();
cleanupTestDataGenerator();
}
@Test @Test
public void testMakeNewPath() throws Exception { public void testMakeNewPath() throws Exception {
String fileName = UUID.randomUUID().toString(); String fileName = UUID.randomUUID().toString();
@@ -173,7 +154,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
GenericRecord newRecord; GenericRecord newRecord;
int index = 0; int index = 0;
for (GenericRecord record : fileRecords) { for (GenericRecord record : fileRecords) {
System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey()); //System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey());
assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString()); assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString());
index++; index++;
} }
@@ -427,11 +408,4 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
}).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect(); }).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords()); assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords());
} }
@AfterEach
public void cleanup() {
if (jsc != null) {
jsc.stop();
}
}
} }

View File

@@ -29,14 +29,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieTestDataGenerator; import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.ArrayList; import java.util.ArrayList;
@@ -46,27 +44,10 @@ import scala.Tuple2;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestUpsertPartitioner extends HoodieClientTestHarness { public class TestUpsertPartitioner extends HoodieClientTestBase {
private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class); private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class);
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestUpsertPartitioner");
initPath();
initMetaClient();
initTestDataGenerator();
initFileSystem();
}
@AfterEach
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupClients();
cleanupFileSystem();
cleanupTestDataGenerator();
}
private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize,
String testPartitionPath, boolean autoSplitInserts) throws Exception { String testPartitionPath, boolean autoSplitInserts) throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder() HoodieWriteConfig config = makeHoodieClientConfigBuilder()

View File

@@ -523,7 +523,7 @@ public class TestAsyncCompaction extends HoodieClientTestBase {
private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException { private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
HoodieTableFileSystemView view = HoodieTableFileSystemView view =
new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
return view.getLatestBaseFiles().collect(Collectors.toList()); return view.getLatestBaseFiles().collect(Collectors.toList());
} }

View File

@@ -78,10 +78,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
cleanupFileSystem(); cleanupResources();
cleanupTestDataGenerator();
cleanupSparkContexts();
cleanupClients();
} }
private HoodieWriteConfig getConfig() { private HoodieWriteConfig getConfig() {

View File

@@ -22,11 +22,14 @@ import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -34,6 +37,8 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -60,7 +65,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
protected transient ExecutorService executorService; protected transient ExecutorService executorService;
protected transient HoodieTableMetaClient metaClient; protected transient HoodieTableMetaClient metaClient;
private static AtomicInteger instantGen = new AtomicInteger(1); private static AtomicInteger instantGen = new AtomicInteger(1);
protected transient HoodieWriteClient client; protected transient HoodieWriteClient writeClient;
protected transient HoodieReadClient readClient;
protected transient HoodieTableFileSystemView tableView;
protected transient HoodieTable hoodieTable;
protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
@@ -93,6 +101,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
cleanupSparkContexts(); cleanupSparkContexts();
cleanupTestDataGenerator(); cleanupTestDataGenerator();
cleanupFileSystem(); cleanupFileSystem();
cleanupDFS();
cleanupExecutorService();
System.gc();
} }
/** /**
@@ -163,6 +174,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
if (fs != null) { if (fs != null) {
LOG.warn("Closing file-system instance used in previous test-run"); LOG.warn("Closing file-system instance used in previous test-run");
fs.close(); fs.close();
fs = null;
} }
} }
@@ -185,13 +197,22 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
} }
/** /**
* Cleanups table type. * Cleanups hoodie clients.
*/ */
protected void cleanupClients() { protected void cleanupClients() throws IOException {
metaClient = null; if (metaClient != null) {
if (null != client) { metaClient = null;
client.close(); }
client = null; if (readClient != null) {
readClient = null;
}
if (writeClient != null) {
writeClient.close();
writeClient = null;
}
if (tableView != null) {
tableView.close();
tableView = null;
} }
} }
@@ -208,7 +229,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
* *
*/ */
protected void cleanupTestDataGenerator() { protected void cleanupTestDataGenerator() {
dataGen = null; if (dataGen != null) {
dataGen = null;
}
} }
/** /**
@@ -288,16 +311,32 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
} }
public HoodieReadClient getHoodieReadClient(String basePath) { public HoodieReadClient getHoodieReadClient(String basePath) {
return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc())); readClient = new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
return readClient;
} }
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
HoodieIndex index) { HoodieIndex index) {
if (null != client) { if (null != writeClient) {
client.close(); writeClient.close();
client = null; writeClient = null;
} }
client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
return client; return writeClient;
}
public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) {
metaClient = new HoodieTableMetaClient(conf, basePath);
return metaClient;
}
public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
FileStatus[] fileStatuses) {
if (tableView == null) {
tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
} else {
tableView.init(metaClient, visibleActiveTimeline, fileStatuses);
}
return tableView;
} }
} }

View File

@@ -89,6 +89,12 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
super.init(metaClient, visibleActiveTimeline); super.init(metaClient, visibleActiveTimeline);
} }
public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
FileStatus[] fileStatuses) {
init(metaClient, visibleActiveTimeline);
addFilesToView(fileStatuses);
}
@Override @Override
protected void resetViewState() { protected void resetViewState() {
this.fgIdToPendingCompaction = null; this.fgIdToPendingCompaction = null;

View File

@@ -117,7 +117,7 @@ public class FileSystemViewHandler {
synchronized (view) { synchronized (view) {
if (isLocalViewBehind(ctx)) { if (isLocalViewBehind(ctx)) {
HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline(); HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline();
LOG.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient
+ " as last known instant but server has the folling timeline :" + " as last known instant but server has the folling timeline :"
+ localTimeline.getInstants().collect(Collectors.toList())); + localTimeline.getInstants().collect(Collectors.toList()));
view.sync(); view.sync();

View File

@@ -246,7 +246,7 @@
<version>${maven-surefire-plugin.version}</version> <version>${maven-surefire-plugin.version}</version>
<configuration> <configuration>
<skip>${skipUTs}</skip> <skip>${skipUTs}</skip>
<argLine>-Xmx4g</argLine> <argLine>-Xmx2g</argLine>
<forkedProcessExitTimeoutInSeconds>120</forkedProcessExitTimeoutInSeconds> <forkedProcessExitTimeoutInSeconds>120</forkedProcessExitTimeoutInSeconds>
<systemPropertyVariables> <systemPropertyVariables>
<log4j.configuration> <log4j.configuration>