[HUDI-566] Added new test cases for class HoodieTimeline, HoodieDefaultTimeline and HoodieActiveTimeline.
This commit is contained in:
4
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
Normal file → Executable file
4
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
Normal file → Executable file
@@ -56,6 +56,10 @@ public interface HoodieTimeline extends Serializable {
|
||||
String REQUESTED_EXTENSION = ".requested";
|
||||
String RESTORE_ACTION = "restore";
|
||||
|
||||
String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
|
||||
CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
|
||||
COMPACTION_ACTION};
|
||||
|
||||
String COMMIT_EXTENSION = "." + COMMIT_ACTION;
|
||||
String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION;
|
||||
String CLEAN_EXTENSION = "." + CLEAN_ACTION;
|
||||
|
||||
276
hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
Normal file → Executable file
276
hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
Normal file → Executable file
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.common.table.string;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.model.TimelineLayoutVersion;
|
||||
@@ -29,12 +28,22 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.model.TimelineLayoutVersion.VERSION_0;
|
||||
@@ -164,4 +173,269 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
assertFalse("", activeCommitTimeline.isBeforeTimelineStarts("02"));
|
||||
assertTrue("", activeCommitTimeline.isBeforeTimelineStarts("00"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimelineGetOperations() {
|
||||
List<HoodieInstant> allInstants = getAllInstants();
|
||||
Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream();
|
||||
timeline = new HoodieActiveTimeline(metaClient, true);
|
||||
timeline.setInstants(allInstants);
|
||||
|
||||
/**
|
||||
* Helper function to check HoodieTimeline only contains some type of Instant actions.
|
||||
* @param timeline The HoodieTimeline to check
|
||||
* @param actions The actions that should be present in the timeline being checked
|
||||
*/
|
||||
BiConsumer<HoodieTimeline, Set<String>> checkTimeline = (HoodieTimeline timeline, Set<String> actions) -> {
|
||||
sup.get().filter(i -> actions.contains(i.getAction())).forEach(i -> assertTrue(timeline.containsInstant(i)));
|
||||
sup.get().filter(i -> !actions.contains(i.getAction())).forEach(i -> assertFalse(timeline.containsInstant(i)));
|
||||
};
|
||||
|
||||
// Test that various types of getXXX operations from HoodieActiveTimeline
|
||||
// return the correct set of Instant
|
||||
checkTimeline.accept(timeline.getCommitsTimeline(),
|
||||
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION));
|
||||
checkTimeline.accept(timeline.getCommitsAndCompactionTimeline(),
|
||||
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION));
|
||||
checkTimeline.accept(timeline.getCommitTimeline(), Sets.newHashSet(HoodieTimeline.COMMIT_ACTION));
|
||||
|
||||
checkTimeline.accept(timeline.getDeltaCommitTimeline(), Sets.newHashSet(HoodieTimeline.DELTA_COMMIT_ACTION));
|
||||
checkTimeline.accept(timeline.getCleanerTimeline(), Sets.newHashSet(HoodieTimeline.CLEAN_ACTION));
|
||||
checkTimeline.accept(timeline.getRollbackTimeline(), Sets.newHashSet(HoodieTimeline.ROLLBACK_ACTION));
|
||||
checkTimeline.accept(timeline.getRestoreTimeline(), Sets.newHashSet(HoodieTimeline.RESTORE_ACTION));
|
||||
checkTimeline.accept(timeline.getSavePointTimeline(), Sets.newHashSet(HoodieTimeline.SAVEPOINT_ACTION));
|
||||
checkTimeline.accept(timeline.getAllCommitsTimeline(),
|
||||
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION,
|
||||
HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION,
|
||||
HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION));
|
||||
|
||||
// Get some random Instants
|
||||
Random rand = new Random();
|
||||
Set<String> randomInstants = sup.get().filter(i -> rand.nextBoolean())
|
||||
.map(i -> i.getAction())
|
||||
.collect(Collectors.toSet());
|
||||
checkTimeline.accept(timeline.getTimelineOfActions(randomInstants), randomInstants);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimelineInstantOperations() {
|
||||
timeline = new HoodieActiveTimeline(metaClient, true);
|
||||
assertEquals("No instant present", timeline.countInstants(), 0);
|
||||
|
||||
// revertToInflight
|
||||
HoodieInstant commit = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1");
|
||||
timeline.createNewInstant(commit);
|
||||
timeline = timeline.reload();
|
||||
assertEquals(timeline.countInstants(), 1);
|
||||
assertTrue(timeline.containsInstant(commit));
|
||||
HoodieInstant inflight = timeline.revertToInflight(commit);
|
||||
// revert creates the .requested file
|
||||
timeline = timeline.reload();
|
||||
assertEquals(timeline.countInstants(), 1);
|
||||
assertTrue(timeline.containsInstant(inflight));
|
||||
assertFalse(timeline.containsInstant(commit));
|
||||
|
||||
// deleteInflight
|
||||
timeline.deleteInflight(inflight);
|
||||
timeline = timeline.reload();
|
||||
assertEquals(timeline.countInstants(), 1);
|
||||
assertFalse(timeline.containsInstant(inflight));
|
||||
assertFalse(timeline.containsInstant(commit));
|
||||
|
||||
// deletePending
|
||||
timeline.createNewInstant(commit);
|
||||
timeline.createNewInstant(inflight);
|
||||
timeline = timeline.reload();
|
||||
assertEquals(timeline.countInstants(), 1);
|
||||
timeline.deletePending(inflight);
|
||||
timeline = timeline.reload();
|
||||
assertEquals(timeline.countInstants(), 1);
|
||||
assertFalse(timeline.containsInstant(inflight));
|
||||
assertTrue(timeline.containsInstant(commit));
|
||||
|
||||
// deleteCompactionRequested
|
||||
HoodieInstant compaction = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "2");
|
||||
timeline.createNewInstant(compaction);
|
||||
timeline = timeline.reload();
|
||||
assertEquals(timeline.countInstants(), 2);
|
||||
timeline.deleteCompactionRequested(compaction);
|
||||
timeline = timeline.reload();
|
||||
assertEquals(timeline.countInstants(), 1);
|
||||
assertFalse(timeline.containsInstant(inflight));
|
||||
assertFalse(timeline.containsInstant(compaction));
|
||||
assertTrue(timeline.containsInstant(commit));
|
||||
|
||||
// transitionCompactionXXXtoYYY and revertCompactionXXXtoYYY
|
||||
compaction = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "3");
|
||||
timeline.createNewInstant(compaction);
|
||||
timeline = timeline.reload();
|
||||
assertTrue(timeline.containsInstant(compaction));
|
||||
inflight = timeline.transitionCompactionRequestedToInflight(compaction);
|
||||
timeline = timeline.reload();
|
||||
assertFalse(timeline.containsInstant(compaction));
|
||||
assertTrue(timeline.containsInstant(inflight));
|
||||
compaction = timeline.revertCompactionInflightToRequested(inflight);
|
||||
timeline = timeline.reload();
|
||||
assertTrue(timeline.containsInstant(compaction));
|
||||
assertFalse(timeline.containsInstant(inflight));
|
||||
inflight = timeline.transitionCompactionRequestedToInflight(compaction);
|
||||
compaction = timeline.transitionCompactionInflightToComplete(inflight, Option.empty());
|
||||
timeline = timeline.reload();
|
||||
assertTrue(timeline.containsInstant(compaction));
|
||||
assertFalse(timeline.containsInstant(inflight));
|
||||
|
||||
// transitionCleanXXXtoYYY
|
||||
HoodieInstant clean = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "4");
|
||||
timeline.saveToCleanRequested(clean, Option.empty());
|
||||
timeline = timeline.reload();
|
||||
assertTrue(timeline.containsInstant(clean));
|
||||
inflight = timeline.transitionCleanRequestedToInflight(clean, Option.empty());
|
||||
timeline = timeline.reload();
|
||||
assertFalse(timeline.containsInstant(clean));
|
||||
assertTrue(timeline.containsInstant(inflight));
|
||||
clean = timeline.transitionCleanInflightToComplete(inflight, Option.empty());
|
||||
timeline = timeline.reload();
|
||||
assertTrue(timeline.containsInstant(clean));
|
||||
assertFalse(timeline.containsInstant(inflight));
|
||||
|
||||
// Various states of Instants
|
||||
HoodieInstant srcInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.RESTORE_ACTION, "5");
|
||||
HoodieInstant otherInstant = HoodieTimeline.getRequestedInstant(srcInstant);
|
||||
assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, HoodieTimeline.RESTORE_ACTION, "5"));
|
||||
otherInstant = HoodieTimeline.getCleanRequestedInstant("5");
|
||||
assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "5"));
|
||||
otherInstant = HoodieTimeline.getCleanInflightInstant("5");
|
||||
assertEquals(otherInstant, new HoodieInstant(State.INFLIGHT, HoodieTimeline.CLEAN_ACTION, "5"));
|
||||
otherInstant = HoodieTimeline.getCompactionRequestedInstant("5");
|
||||
assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "5"));
|
||||
otherInstant = HoodieTimeline.getCompactionInflightInstant("5");
|
||||
assertEquals(otherInstant, new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "5"));
|
||||
|
||||
// containsOrBeforeTimelineStarts
|
||||
List<HoodieInstant> allInstants = getAllInstants();
|
||||
timeline = new HoodieActiveTimeline(metaClient, true);
|
||||
timeline.setInstants(allInstants);
|
||||
|
||||
timeline.setInstants(allInstants);
|
||||
timeline.createNewInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "2"));
|
||||
allInstants.stream().map(i -> i.getTimestamp()).forEach(s -> assertTrue(timeline.containsOrBeforeTimelineStarts(s)));
|
||||
assertTrue(timeline.containsOrBeforeTimelineStarts("0"));
|
||||
assertFalse(timeline.containsOrBeforeTimelineStarts(String.valueOf(System.currentTimeMillis() + 1000)));
|
||||
assertFalse(timeline.getTimelineHash().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateInstants() {
|
||||
List<HoodieInstant> allInstants = getAllInstants();
|
||||
for (HoodieInstant instant : allInstants) {
|
||||
timeline.createNewInstant(instant);
|
||||
}
|
||||
|
||||
timeline = timeline.reload();
|
||||
for (HoodieInstant instant : allInstants) {
|
||||
assertTrue(timeline.containsInstant(instant));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInstantFilenameOperations() {
|
||||
HoodieInstant instantRequested = new HoodieInstant(State.REQUESTED, HoodieTimeline.RESTORE_ACTION, "5");
|
||||
HoodieInstant instantInflight = new HoodieInstant(State.INFLIGHT, HoodieTimeline.RESTORE_ACTION, "5");
|
||||
HoodieInstant instantComplete = new HoodieInstant(State.COMPLETED, HoodieTimeline.RESTORE_ACTION, "5");
|
||||
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantRequested.getFileName()), "5");
|
||||
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantInflight.getFileName()), "5");
|
||||
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantComplete.getFileName()), "5");
|
||||
|
||||
assertEquals(HoodieTimeline.makeFileNameAsComplete(instantInflight.getFileName()),
|
||||
instantComplete.getFileName());
|
||||
|
||||
assertEquals(HoodieTimeline.makeFileNameAsInflight(instantComplete.getFileName()),
|
||||
instantInflight.getFileName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFiltering() {
|
||||
List<HoodieInstant> allInstants = getAllInstants();
|
||||
Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream();
|
||||
|
||||
timeline = new HoodieActiveTimeline(metaClient);
|
||||
timeline.setInstants(allInstants);
|
||||
|
||||
// getReverseOrderedInstants
|
||||
Stream<HoodieInstant> instants = timeline.getReverseOrderedInstants();
|
||||
List<HoodieInstant> v1 = instants.collect(Collectors.toList());
|
||||
List<HoodieInstant> v2 = sup.get().collect(Collectors.toList());
|
||||
Collections.reverse(v2);
|
||||
assertEquals(v1, v2);
|
||||
|
||||
/**
|
||||
* Helper function to check HoodieTimeline only contains some type of Instant states.
|
||||
* @param timeline The HoodieTimeline to check
|
||||
* @param states The states that should be present in the timeline being checked
|
||||
*/
|
||||
BiConsumer<HoodieTimeline, Set<State>> checkFilter = (HoodieTimeline timeline, Set<State> states) -> {
|
||||
sup.get().filter(i -> states.contains(i.getState())).forEach(i -> assertTrue(timeline.containsInstant(i)));
|
||||
sup.get().filter(i -> !states.contains(i.getState())).forEach(i -> assertFalse(timeline.containsInstant(i)));
|
||||
};
|
||||
|
||||
checkFilter.accept(timeline.filter(i -> false), Sets.newHashSet());
|
||||
checkFilter.accept(timeline.filterInflights(), Sets.newHashSet(State.INFLIGHT));
|
||||
checkFilter.accept(timeline.filterInflightsAndRequested(),
|
||||
Sets.newHashSet(State.INFLIGHT, State.REQUESTED));
|
||||
|
||||
// filterCompletedAndCompactionInstants
|
||||
// This cannot be done using checkFilter as it involves both states and actions
|
||||
final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
|
||||
final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED);
|
||||
final Set<String> actions = Sets.newHashSet(HoodieTimeline.COMPACTION_ACTION);
|
||||
sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
|
||||
.forEach(i -> assertTrue(t1.containsInstant(i)));
|
||||
sup.get().filter(i -> !(states.contains(i.getState()) || actions.contains(i.getAction())))
|
||||
.forEach(i -> assertFalse(t1.containsInstant(i)));
|
||||
|
||||
// filterPendingCompactionTimeline
|
||||
final HoodieTimeline t2 = timeline.filterPendingCompactionTimeline();
|
||||
sup.get().filter(i -> i.getAction() == HoodieTimeline.COMPACTION_ACTION)
|
||||
.forEach(i -> assertTrue(t2.containsInstant(i)));
|
||||
sup.get().filter(i -> i.getAction() != HoodieTimeline.COMPACTION_ACTION)
|
||||
.forEach(i -> assertFalse(t2.containsInstant(i)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an exhaustive list of all possible HoodieInstant.
|
||||
* @return list of HoodieInstant
|
||||
*/
|
||||
private List<HoodieInstant> getAllInstants() {
|
||||
timeline = new HoodieActiveTimeline(metaClient);
|
||||
List<HoodieInstant> allInstants = new ArrayList<HoodieInstant>();
|
||||
long commitTime = 1;
|
||||
for (State state : State.values()) {
|
||||
if (state == State.INVALID) {
|
||||
continue;
|
||||
}
|
||||
for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {
|
||||
// Following are not valid combinations of actions and state so we should
|
||||
// not be generating them.
|
||||
if (state == State.REQUESTED) {
|
||||
if (action == HoodieTimeline.SAVEPOINT_ACTION || action == HoodieTimeline.RESTORE_ACTION
|
||||
|| action == HoodieTimeline.ROLLBACK_ACTION) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (state == State.INFLIGHT && action == HoodieTimeline.ROLLBACK_ACTION) {
|
||||
continue;
|
||||
}
|
||||
if (state == State.COMPLETED && action == HoodieTimeline.ROLLBACK_ACTION) {
|
||||
continue;
|
||||
}
|
||||
// Compaction complete is called commit complete
|
||||
if (state == State.COMPLETED && action == HoodieTimeline.COMPACTION_ACTION) {
|
||||
action = HoodieTimeline.COMMIT_ACTION;
|
||||
}
|
||||
|
||||
allInstants.add(new HoodieInstant(state, action, String.format("%03d", commitTime++)));
|
||||
}
|
||||
}
|
||||
return allInstants;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user