From d36fe24c9e5cbd5a20d14818671c28eca6f364ff Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 21 Feb 2022 08:19:57 -0500 Subject: [PATCH] [HUDI-3455] Fixing checkpoint management in hoodie incr source (#4850) --- .../hudi/utilities/sources/HoodieIncrSource.java | 4 ++-- .../utilities/sources/TestHoodieIncrSource.java | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 8d310223d..aa1e261c2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -126,8 +126,8 @@ public class HoodieIncrSource extends RowSource { numInstantsPerFetch, beginInstant, missingCheckpointStrategy); if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) { - LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getKey()); - return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey()); + LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getValue().getKey()); + return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getValue().getKey()); } Dataset source = null; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index 708d45477..1f15cc309 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -48,6 +48,7 @@ import java.util.Properties; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; public class TestHoodieIncrSource extends HoodieClientTestHarness { @@ -77,7 +78,6 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness { Pair> inserts4 = writeRecords(writeClient, true, null, "400"); Pair> inserts5 = writeRecords(writeClient, true, null, "500"); - // read everything upto latest readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey()); @@ -89,6 +89,14 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness { // read just the latest readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, inserts5.getKey()); + + // ensure checkpoint does not move + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 0, inserts5.getKey()); + + Pair> inserts6 = writeRecords(writeClient, true, null, "600"); + + // insert new batch and ensure the checkpoint moves + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey()); } private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, int expectedCount, String expectedCheckpoint) { @@ -102,7 +110,11 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness { // read everything until latest Pair>, String> batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500); Assertions.assertNotNull(batchCheckPoint.getValue()); - assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + if (expectedCount == 0) { + assertFalse(batchCheckPoint.getKey().isPresent()); + } else { + assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + } Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint); }