diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index aab12be2c..4540f2f6e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -177,13 +176,11 @@ public class HBaseIndex extends HoodieIndex { } private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) { - HoodieTimeline commitTimeline = metaClient.getActiveTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); // Check if the last commit ts for this row is 1) present in the timeline or // 2) is less than the first commit ts in the timeline return !commitTimeline.empty() - && (commitTimeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs)) - || HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), HoodieTimeline.GREATER_THAN, commitTs - )); + && commitTimeline.containsOrBeforeTimelineStarts(commitTs); } /** diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java index 733985540..20406cd2c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java @@ -21,9 +21,11 @@ package org.apache.hudi.index.hbase; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieHBaseIndexConfig; @@ -116,7 +118,17 @@ public class TestHBaseIndex extends FunctionalTestHarness { } @Test - public void testSimpleTagLocationAndUpdate() throws Exception { + public void testSimpleTagLocationAndUpdateCOW() throws Exception { + testSimpleTagLocationAndUpdate(HoodieTableType.COPY_ON_WRITE); + } + + @Test void testSimpleTagLocationAndUpdateMOR() throws Exception { + testSimpleTagLocationAndUpdate(HoodieTableType.MERGE_ON_READ); + } + + public void testSimpleTagLocationAndUpdate(HoodieTableType tableType) throws Exception { + metaClient = HoodieTestUtils.init(hadoopConf, basePath(), tableType); + final String newCommitTime = "001"; final int numRecords = 10; List records = dataGen.generateInserts(newCommitTime, numRecords); @@ -138,8 +150,7 @@ public class TestHBaseIndex extends FunctionalTestHarness { JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); assertNoWriteErrors(writeStatues.collect()); - // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed - // commit + // Now tagLocation for these records, hbaseIndex should not tag them since commit never occurred JavaRDD records2 = index.tagLocation(writeRecords, jsc(), hoodieTable); assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());