1
0

[HUDI-368] code clean up in TestAsyncCompaction class (#1050)

This commit is contained in:
Pratyaksh Sharma
2019-12-11 03:22:41 +05:30
committed by leesf
parent 24a09c775f
commit 3790b75e05

View File

@@ -35,7 +35,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
@@ -108,17 +107,15 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
HoodieInstant pendingCompactionInstant = HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time", assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(),
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); compactionInstantTime);
assertTrue("Pending Compaction instant has expected state", assertEquals("Pending Compaction instant has expected state", pendingCompactionInstant.getState(), State.REQUESTED);
pendingCompactionInstant.getState().equals(State.REQUESTED));
moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg); moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
// Reload and rollback inflight compaction // Reload and rollback inflight compaction
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
hoodieTable.rollback(jsc, compactionInstantTime, false);
client.rollbackInflightCompaction( client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable);
@@ -139,11 +136,6 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
} }
} }
private Path getInstantPath(HoodieTableMetaClient metaClient, String timestamp, String action, State state) {
HoodieInstant instant = new HoodieInstant(state, action, timestamp);
return new Path(metaClient.getMetaPath(), instant.getFileName());
}
@Test @Test
public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
// Rollback inflight ingestion when there is pending compaction // Rollback inflight ingestion when there is pending compaction
@@ -171,12 +163,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant pendingCompactionInstant = HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time", assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(),
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); compactionInstantTime);
HoodieInstant inflightInstant = HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time", assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime);
inflightInstant.getTimestamp().equals(inflightInstantTime));
// This should rollback // This should rollback
client.startCommitWithTime(nextInflightInstantTime); client.startCommitWithTime(nextInflightInstantTime);
@@ -184,14 +175,13 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// Validate // Validate
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); inflightInstant = metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time", assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), nextInflightInstantTime);
inflightInstant.getTimestamp().equals(nextInflightInstantTime)); assertEquals("Expect only one inflight instant", 1, metaClient.getActiveTimeline()
assertTrue("Expect only one inflight instant", .filterInflightsExcludingCompaction().getInstants().count());
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().getInstants().count() == 1);
// Expect pending Compaction to be present // Expect pending Compaction to be present
pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); pendingCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time", assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(),
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); compactionInstantTime);
} }
} }
@@ -217,7 +207,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg); scheduleCompaction(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg); moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
// Complete ingestions // Complete ingestions
runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false, runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), records, cfg, false,
@@ -245,13 +235,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
new ArrayList<>()); new ArrayList<>());
// Schedule compaction but do not run them // Schedule compaction but do not run them
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
scheduleCompaction(compactionInstantTime, client, cfg); scheduleCompaction(compactionInstantTime, client, cfg);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant pendingCompactionInstant = HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time", assertEquals("Pending Compaction instant has expected instant time", pendingCompactionInstant.getTimestamp(), compactionInstantTime);
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
boolean gotException = false; boolean gotException = false;
try { try {
@@ -287,8 +275,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant inflightInstant = HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time", assertEquals("inflight instant has expected instant time", inflightInstant.getTimestamp(), inflightInstantTime);
inflightInstant.getTimestamp().equals(inflightInstantTime));
boolean gotException = false; boolean gotException = false;
try { try {
@@ -314,10 +301,9 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000; int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs); List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>()); new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
boolean gotException = false; boolean gotException = false;
try { try {
// Schedule compaction but do not run them // Schedule compaction but do not run them
@@ -329,10 +315,9 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// Schedule with timestamp same as that of committed instant // Schedule with timestamp same as that of committed instant
gotException = false; gotException = false;
String dupCompactionInstantTime = secondInstantTime;
try { try {
// Schedule compaction but do not run them // Schedule compaction but do not run them
scheduleCompaction(dupCompactionInstantTime, client, cfg); scheduleCompaction(secondInstantTime, client, cfg);
} catch (IllegalArgumentException iex) { } catch (IllegalArgumentException iex) {
gotException = true; gotException = true;
} }
@@ -343,7 +328,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
gotException = false; gotException = false;
try { try {
// Schedule compaction with the same times as a pending compaction // Schedule compaction with the same times as a pending compaction
scheduleCompaction(dupCompactionInstantTime, client, cfg); scheduleCompaction(secondInstantTime, client, cfg);
} catch (IllegalArgumentException iex) { } catch (IllegalArgumentException iex) {
gotException = true; gotException = true;
} }
@@ -354,7 +339,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testCompactionAfterTwoDeltaCommits() throws Exception { public void testCompactionAfterTwoDeltaCommits() throws Exception {
// No Delta Commits after compaction request // No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) { try (HoodieWriteClient client = getHoodieWriteClient(cfg, true)) {
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
@@ -362,7 +347,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000; int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs); List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>()); new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
@@ -405,15 +390,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
private void validateDeltaCommit(String latestDeltaCommit, private void validateDeltaCommit(String latestDeltaCommit,
final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation, final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> fgIdToCompactionOperation,
HoodieWriteConfig cfg) throws IOException { HoodieWriteConfig cfg) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable table = getHoodieTable(metaClient, cfg); HoodieTable table = getHoodieTable(metaClient, cfg);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg); List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
fileSliceList.forEach(fileSlice -> { fileSliceList.forEach(fileSlice -> {
Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId()); Pair<String, HoodieCompactionOperation> opPair = fgIdToCompactionOperation.get(fileSlice.getFileGroupId());
if (opPair != null) { if (opPair != null) {
assertTrue("Expect baseInstant to match compaction Instant", assertEquals("Expect baseInstant to match compaction Instant", fileSlice.getBaseInstantTime(), opPair.getKey());
fileSlice.getBaseInstantTime().equals(opPair.getKey()));
assertTrue("Expect atleast one log file to be present where the latest delta commit was written", assertTrue("Expect atleast one log file to be present where the latest delta commit was written",
fileSlice.getLogFiles().count() > 0); fileSlice.getLogFiles().count() > 0);
assertFalse("Expect no data-file to be present", fileSlice.getDataFile().isPresent()); assertFalse("Expect no data-file to be present", fileSlice.getDataFile().isPresent());
@@ -469,12 +453,9 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
return records; return records;
} }
private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteClient client, private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteConfig cfg) {
HoodieWriteConfig cfg) throws IOException {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
HoodieCompactionPlan workload = AvroUtils
.deserializeCompactionPlan(metaClient.getActiveTimeline().getInstantAuxiliaryDetails(compactionInstant).get());
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants() HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
.filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get(); .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
@@ -499,19 +480,19 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException { HoodieWriteConfig cfg, int expectedNumRecs, boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
client.compact(compactionInstantTime); client.compact(compactionInstantTime);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg); List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
assertTrue("Ensure latest file-slices are not empty", fileSliceList.stream().findAny().isPresent()); assertTrue("Ensure latest file-slices are not empty", fileSliceList.stream().findAny().isPresent());
assertFalse("Verify all file-slices have base-instant same as compaction instant", fileSliceList.stream() assertFalse("Verify all file-slices have base-instant same as compaction instant", fileSliceList.stream()
.filter(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)).findAny().isPresent()); .anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)));
assertFalse("Verify all file-slices have data-files", assertFalse("Verify all file-slices have data-files",
fileSliceList.stream().filter(fs -> !fs.getDataFile().isPresent()).findAny().isPresent()); fileSliceList.stream().anyMatch(fs -> !fs.getDataFile().isPresent()));
if (hasDeltaCommitAfterPendingCompaction) { if (hasDeltaCommitAfterPendingCompaction) {
assertFalse("Verify all file-slices have atleast one log-file", assertFalse("Verify all file-slices have atleast one log-file",
fileSliceList.stream().filter(fs -> fs.getLogFiles().count() == 0).findAny().isPresent()); fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() == 0));
} else { } else {
assertFalse("Verify all file-slices have no log-files", assertFalse("Verify all file-slices have no log-files",
fileSliceList.stream().filter(fs -> fs.getLogFiles().count() > 0).findAny().isPresent()); fileSliceList.stream().anyMatch(fs -> fs.getLogFiles().count() > 0));
} }
// verify that there is a commit // verify that there is a commit
@@ -554,16 +535,14 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
HoodieTableFileSystemView view = HoodieTableFileSystemView view =
new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
List<HoodieDataFile> dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList()); return view.getLatestDataFiles().collect(Collectors.toList());
return dataFilesToRead;
} }
private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table, HoodieWriteConfig cfg) throws IOException { private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline()); table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
List<FileSlice> fileSliceList = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).stream() return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
.flatMap(partition -> view.getLatestFileSlices(partition)).collect(Collectors.toList()); .flatMap(view::getLatestFileSlices).collect(Collectors.toList());
return fileSliceList;
} }
protected HoodieTableType getTableType() { protected HoodieTableType getTableType() {