Fix the filter condition is missing in the judgment condition of compaction instance (#3025)
This commit is contained in:
@@ -63,7 +63,7 @@ public abstract class BaseScheduleCompactionActionExecutor<T extends HoodieRecor
|
|||||||
+ ", Compaction scheduled at " + instantTime));
|
+ ", Compaction scheduled at " + instantTime));
|
||||||
// Committed and pending compaction instants should have strictly lower timestamps
|
// Committed and pending compaction instants should have strictly lower timestamps
|
||||||
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
|
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
|
||||||
.getWriteTimeline().getInstants()
|
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
|
||||||
.filter(instant -> HoodieTimeline.compareTimestamps(
|
.filter(instant -> HoodieTimeline.compareTimestamps(
|
||||||
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
|
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|||||||
@@ -132,6 +132,30 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduleCompactionWithInflightInstant() {
|
||||||
|
HoodieWriteConfig config = getConfig();
|
||||||
|
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
|
||||||
|
// insert 100 records.
|
||||||
|
String newCommitTime = "100";
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
|
||||||
|
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
|
||||||
|
writeClient.insert(recordsRDD, newCommitTime).collect();
|
||||||
|
|
||||||
|
// create one inflight instance.
|
||||||
|
newCommitTime = "102";
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
|
||||||
|
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
|
||||||
|
|
||||||
|
// create one compaction instance before exist inflight instance.
|
||||||
|
String compactionTime = "101";
|
||||||
|
writeClient.scheduleCompactionAtInstant(compactionTime, Option.empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteStatusContentsAfterCompaction() throws Exception {
|
public void testWriteStatusContentsAfterCompaction() throws Exception {
|
||||||
// insert 100 records
|
// insert 100 records
|
||||||
|
|||||||
Reference in New Issue
Block a user