Fix HBASE index MOR tables not considering record index valid
This commit is contained in:
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
|
|||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ReflectionUtils;
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
@@ -177,13 +176,11 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) {
|
private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) {
|
||||||
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().filterCompletedInstants();
|
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
|
||||||
// Check if the last commit ts for this row is 1) present in the timeline or
|
// Check if the last commit ts for this row is 1) present in the timeline or
|
||||||
// 2) is less than the first commit ts in the timeline
|
// 2) is less than the first commit ts in the timeline
|
||||||
return !commitTimeline.empty()
|
return !commitTimeline.empty()
|
||||||
&& (commitTimeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs))
|
&& commitTimeline.containsOrBeforeTimelineStarts(commitTs);
|
||||||
|| HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), HoodieTimeline.GREATER_THAN, commitTs
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -21,9 +21,11 @@ package org.apache.hudi.index.hbase;
|
|||||||
import org.apache.hudi.client.HoodieWriteClient;
|
import org.apache.hudi.client.HoodieWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
|
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
||||||
@@ -116,7 +118,17 @@ public class TestHBaseIndex extends FunctionalTestHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleTagLocationAndUpdate() throws Exception {
|
public void testSimpleTagLocationAndUpdateCOW() throws Exception {
|
||||||
|
testSimpleTagLocationAndUpdate(HoodieTableType.COPY_ON_WRITE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test void testSimpleTagLocationAndUpdateMOR() throws Exception {
|
||||||
|
testSimpleTagLocationAndUpdate(HoodieTableType.MERGE_ON_READ);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSimpleTagLocationAndUpdate(HoodieTableType tableType) throws Exception {
|
||||||
|
metaClient = HoodieTestUtils.init(hadoopConf, basePath(), tableType);
|
||||||
|
|
||||||
final String newCommitTime = "001";
|
final String newCommitTime = "001";
|
||||||
final int numRecords = 10;
|
final int numRecords = 10;
|
||||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
|
||||||
@@ -138,8 +150,7 @@ public class TestHBaseIndex extends FunctionalTestHarness {
|
|||||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||||
assertNoWriteErrors(writeStatues.collect());
|
assertNoWriteErrors(writeStatues.collect());
|
||||||
|
|
||||||
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
|
// Now tagLocation for these records, hbaseIndex should not tag them since commit never occurred
|
||||||
// commit
|
|
||||||
JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
|
JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
|
||||||
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
|
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user