From 74eb09be9b40a205365bf920ed98cfff564f79cf Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Sun, 3 Apr 2022 03:52:57 +0530 Subject: [PATCH] [HUDI-3776] Fix BloomIndex incorrectly using ColStats to lookup records locations (#5213) --- .../hudi/index/bloom/HoodieBloomIndex.java | 34 ++++++++--- .../client/functional/TestHoodieIndex.java | 60 +++++++++++++++++++ .../metadata/HoodieBackedTableMetadata.java | 20 ++++--- 3 files changed, 98 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index a923a4c07..522bbb35e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -53,7 +53,10 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; /** * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata. @@ -118,14 +121,7 @@ public class HoodieBloomIndex extends HoodieIndex { List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // Step 2: Load all involved files as pairs - List> fileInfoList; - if (config.getBloomIndexPruneByRanges()) { - fileInfoList = (config.isMetadataColumnStatsIndexEnabled() - ? loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable) - : loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable)); - } else { - fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable); - } + List> fileInfoList = getBloomIndexFileInfoForPartitions(context, hoodieTable, affectedPartitionPathList); final Map> partitionToFileInfo = fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList()))); @@ -138,6 +134,28 @@ public class HoodieBloomIndex extends HoodieIndex { partitionRecordKeyPairs, fileComparisonPairs, partitionToFileInfo, recordsPerPartition); } + private List> getBloomIndexFileInfoForPartitions(HoodieEngineContext context, + HoodieTable hoodieTable, + List affectedPartitionPathList) { + List> fileInfoList = new ArrayList<>(); + + if (config.getBloomIndexPruneByRanges()) { + // load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available + if (config.isMetadataColumnStatsIndexEnabled() + && getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) { + fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable); + } + // fallback to loading column ranges from files + if (isNullOrEmpty(fileInfoList)) { + fileInfoList = loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable); + } + } else { + fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable); + } + + return fileInfoList; + } + /** * Load all involved files as pair List. */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index 8c27e488d..024cf1ff5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -78,6 +78,10 @@ import java.util.stream.Stream; import scala.Tuple2; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -198,6 +202,62 @@ public class TestHoodieIndex extends TestHoodieMetadataBase { recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch")); } + @Test + public void testLookupIndexWithOrWithoutColumnStats() throws Exception { + setUp(IndexType.BLOOM, true, true); + String newCommitTime = "001"; + int totalRecords = 10 + random.nextInt(20); + List records = dataGen.generateInserts(newCommitTime, totalRecords); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); + + // Test tagLocation without any entries in index + JavaRDD javaRDD = tagLocation(index, writeRecords, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + + // Insert totalRecords records + writeClient.startCommitWithTime(newCommitTime); + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + Assertions.assertNoWriteErrors(writeStatues.collect()); + + // Now tagLocation for these records + javaRDD = tagLocation(index, writeRecords, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + // Now commit this & update location of records inserted + writeClient.commit(newCommitTime, writeStatues); + + // check column_stats partition exists + metaClient = HoodieTableMetaClient.reload(metaClient); + assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS)); + assertTrue(getCompletedMetadataPartitions(metaClient.getTableConfig()).contains(COLUMN_STATS.getPartitionPath())); + + // delete the column_stats partition + deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS); + + // Now tagLocation for these records, they should be tagged correctly despite column_stats being enabled but not present + hoodieTable = HoodieSparkTable.create(config, context, metaClient); + javaRDD = tagLocation(index, writeRecords, hoodieTable); + Map recordKeyToPartitionPathMap = new HashMap(); + List hoodieRecords = writeRecords.collect(); + hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath())); + + assertEquals(totalRecords, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); + assertEquals(totalRecords, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(totalRecords, javaRDD.filter(record -> (record.getCurrentLocation() != null + && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); + javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch")); + + JavaRDD hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey()); + JavaPairRDD>> recordLocations = getRecordLocations(hoodieKeyJavaRDD, hoodieTable); + List hoodieKeys = hoodieKeyJavaRDD.collect(); + assertEquals(totalRecords, recordLocations.collect().size()); + assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count()); + recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey")); + recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch")); + } + @ParameterizedTest @MethodSource("indexTypeParams") public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean populateMetaFields, boolean enableMetadataIndex) throws Exception { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 7b4dbd9a0..a4bc8c552 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -18,9 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -52,6 +49,10 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -67,6 +68,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; + /** * Table metadata provided by an internal DFS backed Hudi metadata table. */ @@ -164,7 +167,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { } }); - ValidationUtils.checkState(keys.size() == fileSlicesKeysCount.get()); return result; } @@ -263,10 +265,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { Map, List> partitionFileSliceToKeysMap = new HashMap<>(); for (String key : keys) { - final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, - latestFileSlices.size())); - final Pair partitionNameFileSlicePair = Pair.of(partitionName, slice); - partitionFileSliceToKeysMap.computeIfAbsent(partitionNameFileSlicePair, k -> new ArrayList<>()).add(key); + if (!isNullOrEmpty(latestFileSlices)) { + final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, + latestFileSlices.size())); + final Pair partitionNameFileSlicePair = Pair.of(partitionName, slice); + partitionFileSliceToKeysMap.computeIfAbsent(partitionNameFileSlicePair, k -> new ArrayList<>()).add(key); + } } return partitionFileSliceToKeysMap; }