1
0

[HUDI-3776] Fix BloomIndex incorrectly using ColStats to lookup records locations (#5213)

This commit is contained in:
Sagar Sumit
2022-04-03 03:52:57 +05:30
committed by GitHub
parent 20964df770
commit 74eb09be9b
3 changed files with 98 additions and 16 deletions

View File

@@ -53,7 +53,10 @@ import java.util.stream.Stream;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
/**
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
@@ -118,14 +121,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList;
if (config.getBloomIndexPruneByRanges()) {
fileInfoList = (config.isMetadataColumnStatsIndexEnabled()
? loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable)
: loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable));
} else {
fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable);
}
List<Pair<String, BloomIndexFileInfo>> fileInfoList = getBloomIndexFileInfoForPartitions(context, hoodieTable, affectedPartitionPathList);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
@@ -138,6 +134,28 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
partitionRecordKeyPairs, fileComparisonPairs, partitionToFileInfo, recordsPerPartition);
}
private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartitions(HoodieEngineContext context,
HoodieTable hoodieTable,
List<String> affectedPartitionPathList) {
List<Pair<String, BloomIndexFileInfo>> fileInfoList = new ArrayList<>();
if (config.getBloomIndexPruneByRanges()) {
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
if (config.isMetadataColumnStatsIndexEnabled()
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) {
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
}
// fallback to loading column ranges from files
if (isNullOrEmpty(fileInfoList)) {
fileInfoList = loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable);
}
} else {
fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable);
}
return fileInfoList;
}
/**
* Load all involved files as <Partition, filename> pair List.
*/

View File

@@ -78,6 +78,10 @@ import java.util.stream.Stream;
import scala.Tuple2;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -198,6 +202,62 @@ public class TestHoodieIndex extends TestHoodieMetadataBase {
recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch"));
}
@Test
public void testLookupIndexWithOrWithoutColumnStats() throws Exception {
setUp(IndexType.BLOOM, true, true);
String newCommitTime = "001";
int totalRecords = 10 + random.nextInt(20);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
// Test tagLocation without any entries in index
JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writeRecords, hoodieTable);
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
// Insert totalRecords records
writeClient.startCommitWithTime(newCommitTime);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
Assertions.assertNoWriteErrors(writeStatues.collect());
// Now tagLocation for these records
javaRDD = tagLocation(index, writeRecords, hoodieTable);
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
// Now commit this & update location of records inserted
writeClient.commit(newCommitTime, writeStatues);
// check column_stats partition exists
metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS));
assertTrue(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
// delete the column_stats partition
deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS);
// Now tagLocation for these records, they should be tagged correctly despite column_stats being enabled but not present
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
javaRDD = tagLocation(index, writeRecords, hoodieTable);
Map<String, String> recordKeyToPartitionPathMap = new HashMap();
List<HoodieRecord> hoodieRecords = writeRecords.collect();
hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath()));
assertEquals(totalRecords, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
assertEquals(totalRecords, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
assertEquals(totalRecords, javaRDD.filter(record -> (record.getCurrentLocation() != null
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch"));
JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey());
JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = getRecordLocations(hoodieKeyJavaRDD, hoodieTable);
List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
assertEquals(totalRecords, recordLocations.collect().size());
assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count());
recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey"));
recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch"));
}
@ParameterizedTest
@MethodSource("indexTypeParams")
public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception {