1
0

[HUDI-3455] Fixing checkpoint management in hoodie incr source (#4850)

This commit is contained in:
Sivabalan Narayanan
2022-02-21 08:19:57 -05:00
committed by GitHub
parent 17cb5cb433
commit d36fe24c9e
2 changed files with 16 additions and 4 deletions

View File

@@ -126,8 +126,8 @@ public class HoodieIncrSource extends RowSource {
numInstantsPerFetch, beginInstant, missingCheckpointStrategy);
if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getKey());
return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey());
LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getValue().getKey());
return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getValue().getKey());
}
Dataset<Row> source = null;

View File

@@ -48,6 +48,7 @@ import java.util.Properties;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class TestHoodieIncrSource extends HoodieClientTestHarness {
@@ -77,7 +78,6 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness {
Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient, true, null, "400");
Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient, true, null, "500");
// read everything upto latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey());
@@ -89,6 +89,14 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness {
// read just the latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, inserts5.getKey());
// ensure checkpoint does not move
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 0, inserts5.getKey());
Pair<String, List<HoodieRecord>> inserts6 = writeRecords(writeClient, true, null, "600");
// insert new batch and ensure the checkpoint moves
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey());
}
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, String expectedCheckpoint) {
@@ -102,7 +110,11 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness {
// read everything until latest
Pair<Option<Dataset<Row>>, String> batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500);
Assertions.assertNotNull(batchCheckPoint.getValue());
assertEquals(batchCheckPoint.getKey().get().count(), expectedCount);
if (expectedCount == 0) {
assertFalse(batchCheckPoint.getKey().isPresent());
} else {
assertEquals(batchCheckPoint.getKey().get().count(), expectedCount);
}
Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint);
}