diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index f82f14d5a..7c1f7e00e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -115,6 +115,14 @@ public class HoodieIndexConfig extends HoodieConfig { + "When true, the input RDD will cached to speed up index lookup by reducing IO " + "for computing parallelism or affected partitions"); + public static final ConfigProperty BLOOM_INDEX_USE_METADATA = ConfigProperty + .key("hoodie.bloom.index.use.metadata") + .defaultValue(false) + .sinceVersion("0.11.0") + .withDocumentation("Only applies if index type is BLOOM." + + "When true, the index lookup uses bloom filters and column stats from metadata " + + "table when available to speed up the process."); + public static final ConfigProperty BLOOM_INDEX_TREE_BASED_FILTER = ConfigProperty .key("hoodie.bloom.index.use.treebased.filter") .defaultValue("true") @@ -490,6 +498,11 @@ public class HoodieIndexConfig extends HoodieConfig { return this; } + public Builder bloomIndexUseMetadata(boolean useMetadata) { + hoodieIndexConfig.setValue(BLOOM_INDEX_USE_METADATA, String.valueOf(useMetadata)); + return this; + } + public Builder bloomIndexTreebasedFilter(boolean useTreeFilter) { hoodieIndexConfig.setValue(BLOOM_INDEX_TREE_BASED_FILTER, String.valueOf(useTreeFilter)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index fd4e93365..d861ffe97 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1541,6 +1541,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING); } + public boolean getBloomIndexUseMetadata() { + return getBooleanOrDefault(HoodieIndexConfig.BLOOM_INDEX_USE_METADATA); + } + public boolean useBloomIndexTreebasedFilter() { return getBoolean(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index 889715229..aeaf78672 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -40,6 +40,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.io.HoodieRangeInfoHandle; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -141,7 +142,7 @@ public class HoodieBloomIndex extends HoodieIndex { if (config.getBloomIndexPruneByRanges()) { // load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available - if (config.isMetadataColumnStatsIndexEnabled() + if (config.getBloomIndexUseMetadata() && getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) { fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index bad822c8d..36ee7d967 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -37,6 +37,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; + /** * Takes a bunch of keys and returns ones that are present in the file group. */ @@ -60,7 +63,9 @@ public class HoodieKeyLookupHandle exten BloomFilter bloomFilter = null; HoodieTimer timer = new HoodieTimer().startTimer(); try { - if (config.isMetadataBloomFilterIndexEnabled()) { + if (config.getBloomIndexUseMetadata() + && getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()) + .contains(BLOOM_FILTERS.getPartitionPath())) { bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight()) .orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight())); } else { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java index 1659fe016..9c2f37d56 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java @@ -30,17 +30,22 @@ import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.io.HoodieKeyLookupResult; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; -import scala.Tuple2; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import scala.Tuple2; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; + /** * Helper for {@link HoodieBloomIndex} containing Spark-specific logic. */ @@ -75,7 +80,9 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper { + config.getBloomIndexParallelism() + "}"); JavaRDD> keyLookupResultRDD; - if (config.isMetadataBloomFilterIndexEnabled()) { + if (config.getBloomIndexUseMetadata() + && getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()) + .contains(BLOOM_FILTERS.getPartitionPath())) { // Step 1: Sort by file id JavaRDD> sortedFileIdAndKeyPairs = fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);