[minor] following 4270, add unit tests for the keys lost case (#5918)
This commit is contained in:
@@ -38,6 +38,31 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexStateBootstrapWithMultiFilesInOneSlice() throws Exception {
|
||||
// open the function and ingest data
|
||||
preparePipeline(conf)
|
||||
.consume(TestData.filterOddRows(TestData.DATA_SET_INSERT))
|
||||
.assertEmptyDataFiles()
|
||||
.checkpoint(1)
|
||||
.assertNextEvent()
|
||||
.checkpointComplete(1)
|
||||
.consume(TestData.filterEvenRows(TestData.DATA_SET_INSERT))
|
||||
.checkpoint(2)
|
||||
.assertNextEvent()
|
||||
.checkpointComplete(2)
|
||||
.checkWrittenData(EXPECTED1, 4)
|
||||
// write another commit but does not complete it
|
||||
.consume(TestData.filterEvenRows(TestData.DATA_SET_INSERT))
|
||||
.checkpoint(3)
|
||||
.assertNextEvent()
|
||||
.end();
|
||||
|
||||
// reset the config option
|
||||
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
|
||||
validateIndexLoaded();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexStateBootstrapWithCompactionScheduled() throws Exception {
|
||||
// sets up the delta commits as 1 to generate a new compaction plan.
|
||||
|
||||
@@ -64,6 +64,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
@@ -304,6 +305,24 @@ public class TestData {
|
||||
return inserts;
|
||||
}
|
||||
|
||||
public static List<RowData> filterOddRows(List<RowData> rows) {
|
||||
return filterRowsByIndexPredicate(rows, i -> i % 2 != 0);
|
||||
}
|
||||
|
||||
public static List<RowData> filterEvenRows(List<RowData> rows) {
|
||||
return filterRowsByIndexPredicate(rows, i -> i % 2 == 0);
|
||||
}
|
||||
|
||||
private static List<RowData> filterRowsByIndexPredicate(List<RowData> rows, Predicate<Integer> predicate) {
|
||||
List<RowData> filtered = new ArrayList<>();
|
||||
for (int i = 0; i < rows.size(); i++) {
|
||||
if (predicate.test(i)) {
|
||||
filtered.add(rows.get(i));
|
||||
}
|
||||
}
|
||||
return filtered;
|
||||
}
|
||||
|
||||
private static Integer toIdSafely(Object id) {
|
||||
if (id == null) {
|
||||
return -1;
|
||||
|
||||
Reference in New Issue
Block a user