From ecbd389a3f9215e219cb19b8641f2faea4fa3ad7 Mon Sep 17 00:00:00 2001 From: Shen Hong Date: Sun, 28 Mar 2021 20:28:40 +0800 Subject: [PATCH] [HUDI-1478] Introduce HoodieBloomIndex to hudi-java-client (#2608) --- .../index/bloom/HoodieBaseBloomIndex.java | 261 ++++++++++++++++++ .../HoodieBaseBloomIndexCheckFunction.java} | 4 +- .../apache/hudi/index/FlinkHoodieIndex.java | 2 +- .../index/bloom/FlinkHoodieBloomIndex.java | 235 +--------------- .../hudi/index/JavaHoodieBloomIndex.java | 32 +++ .../apache/hudi/index/JavaHoodieIndex.java | 4 +- 6 files changed, 300 insertions(+), 238 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndex.java rename hudi-client/{hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java => hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java} (96%) create mode 100644 hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieBloomIndex.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndex.java new file mode 100644 index 000000000..75ab693d1 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndex.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import com.beust.jcommander.internal.Lists; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.MetadataNotFoundException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.io.HoodieKeyLookupHandle; +import org.apache.hudi.io.HoodieRangeInfoHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; + +@SuppressWarnings("checkstyle:LineLength") +public class HoodieBaseBloomIndex extends HoodieIndex>, List, List> { + + private static final Logger LOG = LogManager.getLogger(HoodieBaseBloomIndex.class); + + public HoodieBaseBloomIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public List> tagLocation(List> records, HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable) { + // Step 1: Extract out thinner Map of (partitionPath, recordKey) + Map> partitionRecordKeyMap = new HashMap<>(); + records.forEach(record -> { + if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) { + partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey()); + } else { + List recordKeys = Lists.newArrayList(); + recordKeys.add(record.getRecordKey()); + partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys); + } + }); + + // Step 2: Lookup indexes for all the partition/recordkey pair + Map keyFilenamePairMap = + lookupIndex(partitionRecordKeyMap, context, hoodieTable); + + if (LOG.isDebugEnabled()) { + long totalTaggedRecords = keyFilenamePairMap.values().size(); + LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords); + } + + // Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys + List> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records); + + return taggedRecords; + } + + /** + * Lookup the location for each record key and return the pair for all record keys already + * present and drop the record keys if not present. + */ + private Map lookupIndex( + Map> partitionRecordKeyMap, final HoodieEngineContext context, + final HoodieTable hoodieTable) { + // Obtain records per partition, in the incoming records + Map recordsPerPartition = new HashMap<>(); + partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size()))); + List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); + + // Step 2: Load all involved files as pairs + List> fileInfoList = + loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable); + final Map> partitionToFileInfo = + fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList()))); + + // Step 3: Obtain a List, for each incoming record, that already exists, with the file id, + // that contains it. + List> fileComparisons = + explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap); + return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable); + } + + /** + * Load all involved files as pair List. + */ + //TODO duplicate code with spark, we can optimize this method later + List> loadInvolvedFiles(List partitions, final HoodieEngineContext context, + final HoodieTable hoodieTable) { + // Obtain the latest data files from all the partitions. + List> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() + .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) + .collect(toList()); + + if (config.getBloomIndexPruneByRanges()) { + // also obtain file ranges, if range pruning is enabled + context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)"); + return context.map(partitionPathFileIDList, pf -> { + try { + HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); + String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys(); + return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1])); + } catch (MetadataNotFoundException me) { + LOG.warn("Unable to find range metadata in file :" + pf); + return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue())); + } + }, Math.max(partitionPathFileIDList.size(), 1)); + } else { + return partitionPathFileIDList.stream() + .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList()); + } + } + + @Override + public boolean rollbackCommit(String instantTime) { + // Nope, don't need to do anything. + return true; + } + + /** + * This is not global, since we depend on the partitionPath to do the lookup. + */ + @Override + public boolean isGlobal() { + return false; + } + + /** + * No indexes into log files yet. + */ + @Override + public boolean canIndexLogFiles() { + return false; + } + + /** + * Bloom filters are stored, into the same data files. + */ + @Override + public boolean isImplicitWithStorage() { + return true; + } + + /** + * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be + * checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files + * to be compared gets cut down a lot from range pruning. + *

+ * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on + * recordKey ranges in the index info. + */ + List> explodeRecordsWithFileComparisons( + final Map> partitionToFileIndexInfo, + Map> partitionRecordKeyMap) { + IndexFileFilter indexFileFilter = + config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) + : new ListBasedIndexFileFilter(partitionToFileIndexInfo); + + List> fileRecordPairs = new ArrayList<>(); + partitionRecordKeyMap.keySet().forEach(partitionPath -> { + List hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath); + hoodieRecordKeys.forEach(hoodieRecordKey -> { + indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> { + fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(), + new HoodieKey(hoodieRecordKey, partitionPath))); + }); + }); + }); + return fileRecordPairs; + } + + /** + * Find out pair. + */ + Map findMatchingFilesForRecordKeys( + List> fileComparisons, + HoodieTable hoodieTable) { + + fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList()); + + List keyLookupResults = new ArrayList<>(); + + Iterator> iterator = new HoodieBaseBloomIndexCheckFunction(hoodieTable, config).apply(fileComparisons.iterator()); + while (iterator.hasNext()) { + keyLookupResults.addAll(iterator.next()); + } + + Map hoodieRecordLocationMap = new HashMap<>(); + + keyLookupResults = keyLookupResults.stream().filter(lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList()); + keyLookupResults.forEach(lookupResult -> { + lookupResult.getMatchingRecordKeys().forEach(r -> { + hoodieRecordLocationMap.put(new HoodieKey(r, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())); + }); + }); + + return hoodieRecordLocationMap; + } + + + /** + * Tag the back to the original HoodieRecord List. + */ + protected List> tagLocationBacktoRecords( + Map keyFilenamePair, List> records) { + Map> keyRecordPairMap = new HashMap<>(); + records.forEach(r -> keyRecordPairMap.put(r.getKey(), r)); + // Here as the record might have more data than rowKey (some rowKeys' fileId is null), + // so we do left outer join. + List, HoodieRecordLocation>> newList = new ArrayList<>(); + keyRecordPairMap.keySet().forEach(k -> { + if (keyFilenamePair.containsKey(k)) { + newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k))); + } else { + newList.add(Pair.of(keyRecordPairMap.get(k), null)); + } + }); + List> res = Lists.newArrayList(); + for (Pair, HoodieRecordLocation> v : newList) { + res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight()))); + } + return res; + } + + @Override + public List updateLocation(List writeStatusList, HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable) { + return writeStatusList; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java similarity index 96% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java index a147c145d..95a225547 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java @@ -37,14 +37,14 @@ import java.util.List; * Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files. */ //TODO we can move this class into the hudi-client-common and reuse it for spark client -public class HoodieFlinkBloomIndexCheckFunction +public class HoodieBaseBloomIndexCheckFunction implements Function>, Iterator>> { private final HoodieTable hoodieTable; private final HoodieWriteConfig config; - public HoodieFlinkBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) { + public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) { this.hoodieTable = hoodieTable; this.config = config; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java index ecd6490bf..272da8c6c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java @@ -46,7 +46,7 @@ public abstract class FlinkHoodieIndex extends Ho super(config); } - public static FlinkHoodieIndex createIndex(HoodieFlinkEngineContext context, HoodieWriteConfig config) { + public static HoodieIndex createIndex(HoodieFlinkEngineContext context, HoodieWriteConfig config) { // first use index class config to create index. if (!StringUtils.isNullOrEmpty(config.getIndexClass())) { Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java index 255a66b49..355dced71 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java @@ -18,248 +18,15 @@ package org.apache.hudi.index.bloom; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.MetadataNotFoundException; -import org.apache.hudi.index.FlinkHoodieIndex; -import org.apache.hudi.index.HoodieIndexUtils; -import org.apache.hudi.io.HoodieKeyLookupHandle; -import org.apache.hudi.io.HoodieRangeInfoHandle; -import org.apache.hudi.table.HoodieTable; - -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import com.beust.jcommander.internal.Lists; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import static java.util.stream.Collectors.mapping; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.toList; -import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; /** * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata. */ @SuppressWarnings("checkstyle:LineLength") -public class FlinkHoodieBloomIndex extends FlinkHoodieIndex { - - private static final Logger LOG = LogManager.getLogger(FlinkHoodieBloomIndex.class); - +public class FlinkHoodieBloomIndex extends HoodieBaseBloomIndex { public FlinkHoodieBloomIndex(HoodieWriteConfig config) { super(config); } - - @Override - public List> tagLocation(List> records, HoodieEngineContext context, - HoodieTable>, List, List> hoodieTable) { - // Step 1: Extract out thinner Map of (partitionPath, recordKey) - Map> partitionRecordKeyMap = new HashMap<>(); - records.forEach(record -> { - if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) { - partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey()); - } else { - List recordKeys = Lists.newArrayList(); - recordKeys.add(record.getRecordKey()); - partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys); - } - }); - - // Step 2: Lookup indexes for all the partition/recordkey pair - Map keyFilenamePairMap = - lookupIndex(partitionRecordKeyMap, context, hoodieTable); - - if (LOG.isDebugEnabled()) { - long totalTaggedRecords = keyFilenamePairMap.values().size(); - LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords); - } - - // Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys - List> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records); - - return taggedRecords; - } - - /** - * Lookup the location for each record key and return the pair for all record keys already - * present and drop the record keys if not present. - */ - private Map lookupIndex( - Map> partitionRecordKeyMap, final HoodieEngineContext context, - final HoodieTable hoodieTable) { - // Obtain records per partition, in the incoming records - Map recordsPerPartition = new HashMap<>(); - partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size()))); - List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); - - // Step 2: Load all involved files as pairs - List> fileInfoList = - loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable); - final Map> partitionToFileInfo = - fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList()))); - - // Step 3: Obtain a List, for each incoming record, that already exists, with the file id, - // that contains it. - List> fileComparisons = - explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap); - return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable); - } - - /** - * Load all involved files as pair List. - */ - //TODO duplicate code with spark, we can optimize this method later - List> loadInvolvedFiles(List partitions, final HoodieEngineContext context, - final HoodieTable hoodieTable) { - // Obtain the latest data files from all the partitions. - List> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() - .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) - .collect(toList()); - - if (config.getBloomIndexPruneByRanges()) { - // also obtain file ranges, if range pruning is enabled - context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)"); - return context.map(partitionPathFileIDList, pf -> { - try { - HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); - String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys(); - return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1])); - } catch (MetadataNotFoundException me) { - LOG.warn("Unable to find range metadata in file :" + pf); - return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue())); - } - }, Math.max(partitionPathFileIDList.size(), 1)); - } else { - return partitionPathFileIDList.stream() - .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList()); - } - } - - @Override - public boolean rollbackCommit(String instantTime) { - // Nope, don't need to do anything. - return true; - } - - /** - * This is not global, since we depend on the partitionPath to do the lookup. - */ - @Override - public boolean isGlobal() { - return false; - } - - /** - * No indexes into log files yet. - */ - @Override - public boolean canIndexLogFiles() { - return false; - } - - /** - * Bloom filters are stored, into the same data files. - */ - @Override - public boolean isImplicitWithStorage() { - return true; - } - - /** - * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be - * checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files - * to be compared gets cut down a lot from range pruning. - *

- * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on - * recordKey ranges in the index info. - */ - List> explodeRecordsWithFileComparisons( - final Map> partitionToFileIndexInfo, - Map> partitionRecordKeyMap) { - IndexFileFilter indexFileFilter = - config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) - : new ListBasedIndexFileFilter(partitionToFileIndexInfo); - - List> fileRecordPairs = new ArrayList<>(); - partitionRecordKeyMap.keySet().forEach(partitionPath -> { - List hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath); - hoodieRecordKeys.forEach(hoodieRecordKey -> { - indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> { - fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(), - new HoodieKey(hoodieRecordKey, partitionPath))); - }); - }); - }); - return fileRecordPairs; - } - - /** - * Find out pair. - */ - Map findMatchingFilesForRecordKeys( - List> fileComparisons, - HoodieTable hoodieTable) { - - fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList()); - - List keyLookupResults = new ArrayList<>(); - - Iterator> iterator = new HoodieFlinkBloomIndexCheckFunction(hoodieTable, config).apply(fileComparisons.iterator()); - while (iterator.hasNext()) { - keyLookupResults.addAll(iterator.next()); - } - - Map hoodieRecordLocationMap = new HashMap<>(); - - keyLookupResults = keyLookupResults.stream().filter(lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList()); - keyLookupResults.forEach(lookupResult -> { - lookupResult.getMatchingRecordKeys().forEach(r -> { - hoodieRecordLocationMap.put(new HoodieKey(r, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())); - }); - }); - - return hoodieRecordLocationMap; - } - - - /** - * Tag the back to the original HoodieRecord List. - */ - protected List> tagLocationBacktoRecords( - Map keyFilenamePair, List> records) { - Map> keyRecordPairMap = new HashMap<>(); - records.forEach(r -> keyRecordPairMap.put(r.getKey(), r)); - // Here as the record might have more data than rowKey (some rowKeys' fileId is null), - // so we do left outer join. - List, HoodieRecordLocation>> newList = new ArrayList<>(); - keyRecordPairMap.keySet().forEach(k -> { - if (keyFilenamePair.containsKey(k)) { - newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k))); - } else { - newList.add(Pair.of(keyRecordPairMap.get(k), null)); - } - }); - List> res = Lists.newArrayList(); - for (Pair, HoodieRecordLocation> v : newList) { - res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight()))); - } - return res; - } - - @Override - public List updateLocation(List writeStatusList, HoodieEngineContext context, - HoodieTable>, List, List> hoodieTable) { - return writeStatusList; - } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieBloomIndex.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieBloomIndex.java new file mode 100644 index 000000000..47d47c847 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieBloomIndex.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index; + +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.bloom.HoodieBaseBloomIndex; + +/** + * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata. + */ +public class JavaHoodieBloomIndex extends HoodieBaseBloomIndex { + public JavaHoodieBloomIndex(HoodieWriteConfig config) { + super(config); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java index 0239ee903..fc7a451dc 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java @@ -38,7 +38,7 @@ public abstract class JavaHoodieIndex extends Hoo super(config); } - public static JavaHoodieIndex createIndex(HoodieWriteConfig config) { + public static HoodieIndex createIndex(HoodieWriteConfig config) { // first use index class config to create index. if (!StringUtils.isNullOrEmpty(config.getIndexClass())) { Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config); @@ -52,6 +52,8 @@ public abstract class JavaHoodieIndex extends Hoo switch (config.getIndexType()) { case INMEMORY: return new JavaInMemoryHashIndex(config); + case BLOOM: + return new JavaHoodieBloomIndex(config); default: throw new HoodieIndexException("Unsupported index type " + config.getIndexType()); }