[HUDI-3807] Add a new config to control the use of metadata index in HoodieBloomIndex (#5268)
This commit is contained in:
@@ -115,6 +115,14 @@ public class HoodieIndexConfig extends HoodieConfig {
|
|||||||
+ "When true, the input RDD will cached to speed up index lookup by reducing IO "
|
+ "When true, the input RDD will cached to speed up index lookup by reducing IO "
|
||||||
+ "for computing parallelism or affected partitions");
|
+ "for computing parallelism or affected partitions");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Boolean> BLOOM_INDEX_USE_METADATA = ConfigProperty
|
||||||
|
.key("hoodie.bloom.index.use.metadata")
|
||||||
|
.defaultValue(false)
|
||||||
|
.sinceVersion("0.11.0")
|
||||||
|
.withDocumentation("Only applies if index type is BLOOM."
|
||||||
|
+ "When true, the index lookup uses bloom filters and column stats from metadata "
|
||||||
|
+ "table when available to speed up the process.");
|
||||||
|
|
||||||
public static final ConfigProperty<String> BLOOM_INDEX_TREE_BASED_FILTER = ConfigProperty
|
public static final ConfigProperty<String> BLOOM_INDEX_TREE_BASED_FILTER = ConfigProperty
|
||||||
.key("hoodie.bloom.index.use.treebased.filter")
|
.key("hoodie.bloom.index.use.treebased.filter")
|
||||||
.defaultValue("true")
|
.defaultValue("true")
|
||||||
@@ -490,6 +498,11 @@ public class HoodieIndexConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder bloomIndexUseMetadata(boolean useMetadata) {
|
||||||
|
hoodieIndexConfig.setValue(BLOOM_INDEX_USE_METADATA, String.valueOf(useMetadata));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder bloomIndexTreebasedFilter(boolean useTreeFilter) {
|
public Builder bloomIndexTreebasedFilter(boolean useTreeFilter) {
|
||||||
hoodieIndexConfig.setValue(BLOOM_INDEX_TREE_BASED_FILTER, String.valueOf(useTreeFilter));
|
hoodieIndexConfig.setValue(BLOOM_INDEX_TREE_BASED_FILTER, String.valueOf(useTreeFilter));
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -1541,6 +1541,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING);
|
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getBloomIndexUseMetadata() {
|
||||||
|
return getBooleanOrDefault(HoodieIndexConfig.BLOOM_INDEX_USE_METADATA);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean useBloomIndexTreebasedFilter() {
|
public boolean useBloomIndexTreebasedFilter() {
|
||||||
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER);
|
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ import org.apache.hudi.index.HoodieIndex;
|
|||||||
import org.apache.hudi.index.HoodieIndexUtils;
|
import org.apache.hudi.index.HoodieIndexUtils;
|
||||||
import org.apache.hudi.io.HoodieRangeInfoHandle;
|
import org.apache.hudi.io.HoodieRangeInfoHandle;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -141,7 +142,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
|
|||||||
|
|
||||||
if (config.getBloomIndexPruneByRanges()) {
|
if (config.getBloomIndexPruneByRanges()) {
|
||||||
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
|
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
|
||||||
if (config.isMetadataColumnStatsIndexEnabled()
|
if (config.getBloomIndexUseMetadata()
|
||||||
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) {
|
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig()).contains(COLUMN_STATS.getPartitionPath())) {
|
||||||
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
|
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,6 +37,9 @@ import java.io.IOException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||||
|
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Takes a bunch of keys and returns ones that are present in the file group.
|
* Takes a bunch of keys and returns ones that are present in the file group.
|
||||||
*/
|
*/
|
||||||
@@ -60,7 +63,9 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> exten
|
|||||||
BloomFilter bloomFilter = null;
|
BloomFilter bloomFilter = null;
|
||||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||||
try {
|
try {
|
||||||
if (config.isMetadataBloomFilterIndexEnabled()) {
|
if (config.getBloomIndexUseMetadata()
|
||||||
|
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
|
||||||
|
.contains(BLOOM_FILTERS.getPartitionPath())) {
|
||||||
bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
|
bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
|
||||||
.orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
|
.orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -30,17 +30,22 @@ import org.apache.hudi.data.HoodieJavaPairRDD;
|
|||||||
import org.apache.hudi.data.HoodieJavaRDD;
|
import org.apache.hudi.data.HoodieJavaRDD;
|
||||||
import org.apache.hudi.io.HoodieKeyLookupResult;
|
import org.apache.hudi.io.HoodieKeyLookupResult;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.Partitioner;
|
import org.apache.spark.Partitioner;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
|
||||||
|
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper for {@link HoodieBloomIndex} containing Spark-specific logic.
|
* Helper for {@link HoodieBloomIndex} containing Spark-specific logic.
|
||||||
*/
|
*/
|
||||||
@@ -75,7 +80,9 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper {
|
|||||||
+ config.getBloomIndexParallelism() + "}");
|
+ config.getBloomIndexParallelism() + "}");
|
||||||
|
|
||||||
JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
|
JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
|
||||||
if (config.isMetadataBloomFilterIndexEnabled()) {
|
if (config.getBloomIndexUseMetadata()
|
||||||
|
&& getCompletedMetadataPartitions(hoodieTable.getMetaClient().getTableConfig())
|
||||||
|
.contains(BLOOM_FILTERS.getPartitionPath())) {
|
||||||
// Step 1: Sort by file id
|
// Step 1: Sort by file id
|
||||||
JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
|
JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
|
||||||
fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
|
fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
|
||||||
|
|||||||
Reference in New Issue
Block a user