1
0

[HUDI-1474] Add additional unit tests to TestHBaseIndex (#2349)

This commit is contained in:
Balajee Nagasubramaniam
2020-12-28 20:04:38 -08:00
committed by GitHub
parent b83d1d3e61
commit da51aa64fc

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.index.hbase;
import avro.shaded.com.google.common.collect.Maps;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -310,6 +311,125 @@ public class TestHBaseIndex extends FunctionalTestHarness {
assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
}
/*
* Test case to verify that for taglocation entries present in HBase, if the corresponding commit instant is missing
* in timeline and the commit is not archived, taglocation would reset the current record location to null.
*/
@Test
public void testSimpleTagLocationWithInvalidCommit() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfig();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
String newCommitTime = writeClient.startCommit();
// make a commit with 199 records
JavaRDD<HoodieRecord> writeRecords = generateAndCommitRecords(writeClient, 199);
// make a second commit with a single record
String invalidCommit = writeClient.startCommit();
JavaRDD<HoodieRecord> invalidWriteRecords = generateAndCommitRecords(writeClient, 1, invalidCommit);
// verify location is tagged.
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD0 = index.tagLocation(invalidWriteRecords, context(), hoodieTable);
assert (javaRDD0.collect().size() == 1); // one record present
assert (javaRDD0.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 1); // it is tagged
assert (javaRDD0.collect().get(0).getCurrentLocation().getInstantTime().equals(invalidCommit));
// rollback the invalid commit, so that hbase will be left with a stale entry.
writeClient.rollback(invalidCommit);
// Now tagLocation for the valid records, hbaseIndex should tag them
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, context(), hoodieTable);
assert (javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 199);
// tagLocation for the invalid record - commit is not present in timeline due to rollback.
JavaRDD<HoodieRecord> javaRDD2 = index.tagLocation(invalidWriteRecords, context(), hoodieTable);
assert (javaRDD2.collect().size() == 1); // one record present
assert (javaRDD2.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0); // it is not tagged
}
/*
* Test case to verify that taglocation() uses the commit timeline to validate the commitTS stored in hbase.
* When CheckIfValidCommit() in HbaseIndex uses the incorrect timeline filtering, this test would fail.
*/
@Test
public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
// Load to memory
HoodieWriteConfig config = getConfig();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
String commitTime1 = writeClient.startCommit();
JavaRDD<HoodieRecord> writeRecords1 = generateAndCommitRecords(writeClient, 20, commitTime1);
// rollback the commit - leaves a clean file in timeline.
writeClient.rollback(commitTime1);
// create a second commit with 20 records
metaClient = HoodieTableMetaClient.reload(metaClient);
generateAndCommitRecords(writeClient, 20);
// Now tagLocation for the first set of rolledback records, hbaseIndex should tag them
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords1, context(), hoodieTable);
assert (javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 20);
}
private JavaRDD<HoodieRecord> generateAndCommitRecords(SparkRDDWriteClient writeClient, int numRecs) throws Exception {
String commitTime = writeClient.startCommit();
return generateAndCommitRecords(writeClient, numRecs, commitTime);
}
private JavaRDD<HoodieRecord> generateAndCommitRecords(SparkRDDWriteClient writeClient,
int numRecs, String commitTime) throws Exception {
// first batch of records
List<HoodieRecord> records = dataGen.generateInserts(commitTime, numRecs);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
// Insert records
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, commitTime);
assertNoWriteErrors(writeStatues.collect());
// commit this upsert
writeClient.commit(commitTime, writeStatues);
return writeRecords;
}
// Verify hbase is tagging records belonging to an archived commit as valid.
@Test
public void testHbaseTagLocationForArchivedCommits() throws Exception {
// Load to memory
Map<String, String> params = Maps.newHashMap();
params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP, "1");
params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP, "3");
params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP, "2");
HoodieWriteConfig config = getConfigBuilder(100, false).withProps(params).build();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
// make first commit with 20 records
JavaRDD<HoodieRecord> writeRecords1 = generateAndCommitRecords(writeClient, 20);
// Make 3 additional commits, so that first commit is archived
for (int nCommit = 0; nCommit < 3; nCommit++) {
generateAndCommitRecords(writeClient, 20);
}
// tagLocation for the first set of records (for the archived commit), hbaseIndex should tag them as valid
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords1, context(), hoodieTable);
assertEquals(20, javaRDD1.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
}
@Test
public void testTotalGetsBatching() throws Exception {
HoodieWriteConfig config = getConfig();