1
0

[HUDI-2502] Refactor index in hudi-client module (#3778)

- Refactor Index to reduce Line of Code and re-use across engines.
This commit is contained in:
Y Ethan Guo
2021-10-28 01:16:00 -07:00
committed by GitHub
parent e5b6b8602c
commit 0223c442ec
70 changed files with 2196 additions and 1567 deletions

View File

@@ -101,7 +101,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
protected final transient HoodieMetrics metrics;
private final transient HoodieIndex<T, I, K, O> index;
private final transient HoodieIndex<T, ?, ?, ?> index;
protected transient Timer.Context writeTimer = null;
protected transient Timer.Context compactionTimer;
@@ -138,7 +138,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
this.txnManager = new TransactionManager(config, fs);
}
protected abstract HoodieIndex<T, I, K, O> createIndex(HoodieWriteConfig writeConfig);
protected abstract HoodieIndex<T, ?, ?, ?> createIndex(HoodieWriteConfig writeConfig);
public void setOperationType(WriteOperationType operationType) {
this.operationType = operationType;
@@ -1006,7 +1006,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
return metrics;
}
public HoodieIndex<T, I, K, O> getIndex() {
public HoodieIndex<T, ?, ?, ?> getIndex() {
return index;
}

View File

@@ -21,12 +21,16 @@ package org.apache.hudi.index;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.table.HoodieTable;
import java.io.Serializable;
@@ -35,9 +39,9 @@ import java.io.Serializable;
* Base class for different types of indexes to determine the mapping from uuid.
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
* @param <I> Type of inputs for deprecated APIs
* @param <K> Type of keys for deprecated APIs
* @param <O> Type of outputs for deprecated APIs
*/
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O> implements Serializable {
@@ -52,18 +56,39 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O> implem
* Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually
* present).
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract I tagLocation(I records, HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException;
@Deprecated
@PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
public I tagLocation(I records, HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException {
throw new HoodieNotSupportedException("Deprecated API should not be called");
}
/**
* Extracts the location of written records, and updates the index.
* <p>
* TODO(vc): We may need to propagate the record as well in a WriteStatus class
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public abstract O updateLocation(O writeStatuses, HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException;
@Deprecated
@PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
public O updateLocation(O writeStatuses, HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException {
throw new HoodieNotSupportedException("Deprecated API should not be called");
}
/**
* Looks up the index and tags each incoming record with a location of a file that contains
* the row (if it is actually present).
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException;
/**
* Extracts the location of written records, and updates the index.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException;
/**
* Rollback the effects of the commit made at instantTime.

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.index.bloom;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* Helper for {@link HoodieBloomIndex} containing engine-specific logic.
*/
public abstract class BaseHoodieBloomIndexHelper implements Serializable {
/**
* Find out <RowKey, filename> pair.
*
* @param config Write config.
* @param context {@link HoodieEngineContext} instance to use.
* @param hoodieTable {@link HoodieTable} instance to use.
* @param partitionRecordKeyPairs Pairs of partition path and record key.
* @param fileComparisonPairs Pairs of filename and record key based on file comparisons.
* @param partitionToFileInfo Partition path to {@link BloomIndexFileInfo} map.
* @param recordsPerPartition Number of records per partition in a map.
* @return {@link HoodiePairData} of {@link HoodieKey} and {@link HoodieRecordLocation} pairs.
*/
public abstract HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
HoodieData<ImmutablePair<String, HoodieKey>> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition);
}

View File

@@ -1,261 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.index.bloom;
import com.beust.jcommander.internal.Lists;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieKeyLookupHandle;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
@SuppressWarnings("checkstyle:LineLength")
public class HoodieBaseBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(HoodieBaseBloomIndex.class);
public HoodieBaseBloomIndex(HoodieWriteConfig config) {
super(config);
}
@Override
public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, HoodieEngineContext context,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
// Step 1: Extract out thinner Map of (partitionPath, recordKey)
Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
records.forEach(record -> {
if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) {
partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey());
} else {
List<String> recordKeys = Lists.newArrayList();
recordKeys.add(record.getRecordKey());
partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys);
}
});
// Step 2: Lookup indexes for all the partition/recordkey pair
Map<HoodieKey, HoodieRecordLocation> keyFilenamePairMap =
lookupIndex(partitionRecordKeyMap, context, hoodieTable);
if (LOG.isDebugEnabled()) {
long totalTaggedRecords = keyFilenamePairMap.values().size();
LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
}
// Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
List<HoodieRecord<T>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records);
return taggedRecords;
}
/**
* Lookup the location for each record key and return the pair<record_key,location> for all record keys already
* present and drop the record keys if not present.
*/
private Map<HoodieKey, HoodieRecordLocation> lookupIndex(
Map<String, List<String>> partitionRecordKeyMap, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = new HashMap<>();
partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size())));
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
// Step 3: Obtain a List, for each incoming record, that already exists, with the file id,
// that contains it.
List<Pair<String, HoodieKey>> fileComparisons =
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap);
return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
}
/**
* Load all involved files as <Partition, filename> pair List.
*/
//TODO duplicate code with spark, we can optimize this method later
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
if (config.getBloomIndexPruneByRanges()) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
} else {
return partitionPathFileIDList.stream()
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
}
@Override
public boolean rollbackCommit(String instantTime) {
// Nope, don't need to do anything.
return true;
}
/**
* This is not global, since we depend on the partitionPath to do the lookup.
*/
@Override
public boolean isGlobal() {
return false;
}
/**
* No indexes into log files yet.
*/
@Override
public boolean canIndexLogFiles() {
return false;
}
/**
* Bloom filters are stored, into the same data files.
*/
@Override
public boolean isImplicitWithStorage() {
return true;
}
/**
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
* <p>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
Map<String, List<String>> partitionRecordKeyMap) {
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
partitionRecordKeyMap.keySet().forEach(partitionPath -> {
List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
hoodieRecordKeys.forEach(hoodieRecordKey -> {
indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> {
fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
new HoodieKey(hoodieRecordKey, partitionPath)));
});
});
});
return fileRecordPairs;
}
/**
* Find out <RowKey, filename> pair.
*/
Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
List<Pair<String, HoodieKey>> fileComparisons,
HoodieTable hoodieTable) {
fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList());
List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();
Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(hoodieTable, config).apply(fileComparisons.iterator());
while (iterator.hasNext()) {
keyLookupResults.addAll(iterator.next());
}
Map<HoodieKey, HoodieRecordLocation> hoodieRecordLocationMap = new HashMap<>();
keyLookupResults = keyLookupResults.stream().filter(lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
keyLookupResults.forEach(lookupResult -> {
lookupResult.getMatchingRecordKeys().forEach(r -> {
hoodieRecordLocationMap.put(new HoodieKey(r, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
});
});
return hoodieRecordLocationMap;
}
/**
* Tag the <rowKey, filename> back to the original HoodieRecord List.
*/
protected List<HoodieRecord<T>> tagLocationBacktoRecords(
Map<HoodieKey, HoodieRecordLocation> keyFilenamePair, List<HoodieRecord<T>> records) {
Map<HoodieKey, HoodieRecord<T>> keyRecordPairMap = new HashMap<>();
records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
// Here as the record might have more data than rowKey (some rowKeys' fileId is null),
// so we do left outer join.
List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
keyRecordPairMap.keySet().forEach(k -> {
if (keyFilenamePair.containsKey(k)) {
newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
} else {
newList.add(Pair.of(keyRecordPairMap.get(k), null));
}
});
List<HoodieRecord<T>> res = Lists.newArrayList();
for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight())));
}
return res;
}
@Override
public List<WriteStatus> updateLocation(List<WriteStatus> writeStatusList, HoodieEngineContext context,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
return writeStatusList;
}
}

View File

@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.index.bloom;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
/**
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
*/
public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
extends HoodieIndex<T, Object, Object, Object> {
private static final Logger LOG = LogManager.getLogger(HoodieBloomIndex.class);
private final BaseHoodieBloomIndexHelper bloomIndexHelper;
public HoodieBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper bloomIndexHelper) {
super(config);
this.bloomIndexHelper = bloomIndexHelper;
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
// Step 0: cache the input records if needed
if (config.getBloomIndexUseCaching()) {
records.persist(new HoodieConfig(config.getProps())
.getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE));
}
// Step 1: Extract out thinner pairs of (partitionPath, recordKey)
HoodiePairData<String, String> partitionRecordKeyPairs = records.mapToPair(
record -> new ImmutablePair<>(record.getPartitionPath(), record.getRecordKey()));
// Step 2: Lookup indexes for all the partition/recordkey pair
HoodiePairData<HoodieKey, HoodieRecordLocation> keyFilenamePairs =
lookupIndex(partitionRecordKeyPairs, context, hoodieTable);
// Cache the result, for subsequent stages.
if (config.getBloomIndexUseCaching()) {
keyFilenamePairs.persist("MEMORY_AND_DISK_SER");
}
if (LOG.isDebugEnabled()) {
long totalTaggedRecords = keyFilenamePairs.count();
LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
}
// Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
HoodieData<HoodieRecord<T>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records);
if (config.getBloomIndexUseCaching()) {
records.unpersist();
keyFilenamePairs.unpersist();
}
return taggedRecords;
}
/**
* Lookup the location for each record key and return the pair<record_key,location> for all record keys already
* present and drop the record keys if not present.
*/
private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
HoodiePairData<String, String> partitionRecordKeyPairs, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = partitionRecordKeyPairs.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
// Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
// that contains it.
HoodieData<ImmutablePair<String, HoodieKey>> fileComparisonPairs =
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);
return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
partitionRecordKeyPairs, fileComparisonPairs, partitionToFileInfo, recordsPerPartition);
}
/**
* Load all involved files as <Partition, filename> pair List.
*/
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
if (config.getBloomIndexPruneByRanges()) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
} else {
return partitionPathFileIDList.stream()
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
}
@Override
public boolean rollbackCommit(String instantTime) {
// Nope, don't need to do anything.
return true;
}
/**
* This is not global, since we depend on the partitionPath to do the lookup.
*/
@Override
public boolean isGlobal() {
return false;
}
/**
* No indexes into log files yet.
*/
@Override
public boolean canIndexLogFiles() {
return false;
}
/**
* Bloom filters are stored, into the same data files.
*/
@Override
public boolean isImplicitWithStorage() {
return true;
}
/**
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
* <p>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
HoodieData<ImmutablePair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairs.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair.getRight();
String partitionPath = partitionRecordKeyPair.getLeft();
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionPath)))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
/**
* Tag the <rowKey, filename> back to the original HoodieRecord List.
*/
protected HoodieData<HoodieRecord<T>> tagLocationBacktoRecords(
HoodiePairData<HoodieKey, HoodieRecordLocation> keyFilenamePair,
HoodieData<HoodieRecord<T>> records) {
HoodiePairData<HoodieKey, HoodieRecord<T>> keyRecordPairs =
records.mapToPair(record -> new ImmutablePair<>(record.getKey(), record));
// Here as the records might have more data than keyFilenamePairs (some row keys' fileId is null),
// so we do left outer join.
return keyRecordPairs.leftOuterJoin(keyFilenamePair).values()
.map(v -> HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight().orElse(null))));
}
@Override
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatusData, HoodieEngineContext context,
HoodieTable hoodieTable) {
return writeStatusData;
}
}

View File

@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.index.bloom;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* This filter will only work with hoodie table since it will only load partitions
* with .hoodie_partition_metadata file in it.
*/
public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload<T>> extends HoodieBloomIndex<T> {
public HoodieGlobalBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper bloomIndexHelper) {
super(config, bloomIndexHelper);
}
/**
* Load all involved files as <Partition, filename> pairs from all partitions in the table.
*/
@Override
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable);
}
/**
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
* <p>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info. the partition path of the incoming record (partitionRecordKeyPairs._2()) will
* be ignored since the search scope should be bigger than that
*/
@Override
HoodieData<ImmutablePair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
: new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairs.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair.getRight();
String partitionPath = partitionRecordKeyPair.getLeft();
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
/**
* Tagging for global index should only consider the record key.
*/
@Override
protected HoodieData<HoodieRecord<T>> tagLocationBacktoRecords(
HoodiePairData<HoodieKey, HoodieRecordLocation> keyLocationPairs,
HoodieData<HoodieRecord<T>> records) {
HoodiePairData<String, HoodieRecord<T>> incomingRowKeyRecordPairs =
records.mapToPair(record -> new ImmutablePair<>(record.getRecordKey(), record));
HoodiePairData<String, Pair<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
keyLocationPairs.mapToPair(p -> new ImmutablePair<>(
p.getKey().getRecordKey(), new ImmutablePair<>(p.getValue(), p.getKey())));
// Here as the records might have more data than rowKeys (some rowKeys' fileId is null), so we do left outer join.
return incomingRowKeyRecordPairs.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record -> {
final HoodieRecord<T> hoodieRecord = record.getLeft();
final Option<Pair<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record.getRight();
if (recordLocationHoodieKeyPair.isPresent()) {
// Record key matched to file
if (config.getBloomIndexUpdatePartitionPath()
&& !recordLocationHoodieKeyPair.get().getRight().getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> deleteRecord = new HoodieRecord(recordLocationHoodieKeyPair.get().getRight(),
new EmptyHoodieRecordPayload());
deleteRecord.setCurrentLocation(recordLocationHoodieKeyPair.get().getLeft());
deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
HoodieRecord<T> insertRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
return Arrays.asList(deleteRecord, insertRecord).iterator();
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.
return Collections.singletonList(
(HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get().getRight(), hoodieRecord.getData()),
Option.ofNullable(recordLocationHoodieKeyPair.get().getLeft()))).iterator();
}
} else {
return Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty())).iterator();
}
});
}
@Override
public boolean isGlobal() {
return true;
}
}

View File

@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.index.bloom;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieKeyLookupHandle;
import org.apache.hudi.table.HoodieTable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.toList;
/**
* Helper for {@link HoodieBloomIndex} containing Java {@link List}-based logic.
*/
public class ListBasedHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper {
private static final ListBasedHoodieBloomIndexHelper SINGLETON_INSTANCE = new ListBasedHoodieBloomIndexHelper();
protected ListBasedHoodieBloomIndexHelper() {
}
public static ListBasedHoodieBloomIndexHelper getInstance() {
return SINGLETON_INSTANCE;
}
@Override
public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
HoodieData<ImmutablePair<String, HoodieKey>> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
List<Pair<String, HoodieKey>> fileComparisonPairList =
HoodieList.getList(fileComparisonPairs).stream()
.sorted(Comparator.comparing(ImmutablePair::getLeft)).collect(toList());
List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();
Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(
hoodieTable, config).apply(fileComparisonPairList.iterator());
while (iterator.hasNext()) {
keyLookupResults.addAll(iterator.next());
}
keyLookupResults = keyLookupResults.stream().filter(
lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
return context.parallelize(keyLookupResults).flatMap(lookupResult ->
lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()
).mapToPair(pair -> {
HoodieKeyLookupHandle.KeyLookupResult lookupResult = pair.getLeft();
String recordKey = pair.getRight();
return new ImmutablePair<>(
new HoodieKey(recordKey, lookupResult.getPartitionPath()),
new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
});
}
}

View File

@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.index.inmemory;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Hoodie Index implementation backed by an in-memory Hash map.
* <p>
* ONLY USE FOR LOCAL TESTING
*/
public class HoodieInMemoryHashIndex<T extends HoodieRecordPayload<T>>
extends HoodieIndex<T, Object, Object, Object> {
private static ConcurrentMap<HoodieKey, HoodieRecordLocation> recordLocationMap;
public HoodieInMemoryHashIndex(HoodieWriteConfig config) {
super(config);
synchronized (HoodieInMemoryHashIndex.class) {
if (recordLocationMap == null) {
recordLocationMap = new ConcurrentHashMap<>();
}
}
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
return records.mapPartitions(hoodieRecordIterator -> {
List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
while (hoodieRecordIterator.hasNext()) {
HoodieRecord<T> record = hoodieRecordIterator.next();
if (recordLocationMap.containsKey(record.getKey())) {
record.unseal();
record.setCurrentLocation(recordLocationMap.get(record.getKey()));
record.seal();
}
taggedRecords.add(record);
}
return taggedRecords.iterator();
}, true);
}
@Override
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) {
return writeStatuses.map(writeStatus -> {
for (HoodieRecord record : writeStatus.getWrittenRecords()) {
if (!writeStatus.isErrored(record.getKey())) {
HoodieKey key = record.getKey();
Option<HoodieRecordLocation> newLocation = record.getNewLocation();
if (newLocation.isPresent()) {
recordLocationMap.put(key, newLocation.get());
} else {
// Delete existing index for a deleted record
recordLocationMap.remove(key);
}
}
}
return writeStatus;
});
}
@Override
public boolean rollbackCommit(String instantTime) {
return true;
}
/**
* Only looks up by recordKey.
*/
@Override
public boolean isGlobal() {
return true;
}
/**
* Mapping is available in HBase already.
*/
@Override
public boolean canIndexLogFiles() {
return true;
}
/**
* Index needs to be explicitly updated after storage write.
*/
@Override
public boolean isImplicitWithStorage() {
return false;
}
}

View File

@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.index.simple;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
/**
* A global simple index which reads interested fields(record key and partition path) from base files and
* joins with incoming records to find the tagged location.
*
* @param <T>
*/
public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload<T>> extends HoodieSimpleIndex<T> {
public HoodieGlobalSimpleIndex(HoodieWriteConfig config, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, keyGeneratorOpt);
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
return tagLocationInternal(records, context, hoodieTable);
}
/**
* Tags records location for incoming records.
*
* @param inputRecords {@link HoodieData} of incoming records
* @param context instance of {@link HoodieEngineContext} to use
* @param hoodieTable instance of {@link HoodieTable} to use
* @return {@link HoodieData} of records with record locations set
*/
@Override
protected HoodieData<HoodieRecord<T>> tagLocationInternal(
HoodieData<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
HoodieTable hoodieTable) {
HoodiePairData<String, HoodieRecord<T>> keyedInputRecords =
inputRecords.mapToPair(entry -> new ImmutablePair<>(entry.getRecordKey(), entry));
HoodiePairData<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable =
fetchAllRecordLocations(context, hoodieTable, config.getGlobalSimpleIndexParallelism());
return getTaggedRecords(keyedInputRecords, allRecordLocationsInTable);
}
/**
* Fetch record locations for passed in {@link HoodieKey}s.
*
* @param context instance of {@link HoodieEngineContext} to use
* @param hoodieTable instance of {@link HoodieTable} of interest
* @param parallelism parallelism to use
* @return {@link HoodiePairData} of {@link HoodieKey} and {@link HoodieRecordLocation}
*/
protected HoodiePairData<HoodieKey, HoodieRecordLocation> fetchAllRecordLocations(
HoodieEngineContext context, HoodieTable hoodieTable, int parallelism) {
List<Pair<String, HoodieBaseFile>> latestBaseFiles = getAllBaseFilesInTable(context, hoodieTable);
return fetchRecordLocations(context, hoodieTable, parallelism, latestBaseFiles);
}
/**
* Load all files for all partitions as <Partition, filename> pair data.
*/
protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
final HoodieEngineContext context, final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
// Obtain the latest data files from all the partitions.
return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable);
}
/**
* Tag records with right {@link HoodieRecordLocation}.
*
* @param incomingRecords incoming {@link HoodieRecord}s
* @param existingRecords existing records with {@link HoodieRecordLocation}s
* @return {@link HoodieData} of {@link HoodieRecord}s with tagged {@link HoodieRecordLocation}s
*/
private HoodieData<HoodieRecord<T>> getTaggedRecords(
HoodiePairData<String, HoodieRecord<T>> incomingRecords,
HoodiePairData<HoodieKey, HoodieRecordLocation> existingRecords) {
HoodiePairData<String, Pair<String, HoodieRecordLocation>> existingRecordByRecordKey =
existingRecords.mapToPair(
entry -> new ImmutablePair<>(entry.getLeft().getRecordKey(),
Pair.of(entry.getLeft().getPartitionPath(), entry.getRight())));
return incomingRecords.leftOuterJoin(existingRecordByRecordKey).values()
.flatMap(entry -> {
HoodieRecord<T> inputRecord = entry.getLeft();
Option<Pair<String, HoodieRecordLocation>> partitionPathLocationPair = Option.ofNullable(entry.getRight().orElse(null));
List<HoodieRecord<T>> taggedRecords;
if (partitionPathLocationPair.isPresent()) {
String partitionPath = partitionPathLocationPair.get().getKey();
HoodieRecordLocation location = partitionPathLocationPair.get().getRight();
if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> deleteRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
deleteRecord.setCurrentLocation(location);
deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
HoodieRecord<T> insertRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
taggedRecords = Arrays.asList(deleteRecord, insertRecord);
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.
HoodieRecord<T> newRecord = new HoodieRecord<>(new HoodieKey(inputRecord.getRecordKey(), partitionPath), inputRecord.getData());
taggedRecords = Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(newRecord, Option.ofNullable(location)));
}
} else {
taggedRecords = Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty()));
}
return taggedRecords.iterator();
});
}
@Override
public boolean isGlobal() {
return true;
}
}

View File

@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.index.simple;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
/**
* A simple index which reads interested fields(record key and partition path) from base files and
* joins with incoming records to find the tagged location.
*
* @param <T> type of {@link HoodieRecordPayload}
*/
public class HoodieSimpleIndex<T extends HoodieRecordPayload<T>>
extends HoodieIndex<T, Object, Object, Object> {
private final Option<BaseKeyGenerator> keyGeneratorOpt;
public HoodieSimpleIndex(HoodieWriteConfig config, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config);
this.keyGeneratorOpt = keyGeneratorOpt;
}
@Override
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) {
return writeStatuses;
}
@Override
public boolean rollbackCommit(String commitTime) {
return true;
}
@Override
public boolean isGlobal() {
return false;
}
@Override
public boolean canIndexLogFiles() {
return false;
}
@Override
public boolean isImplicitWithStorage() {
return true;
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
return tagLocationInternal(records, context, hoodieTable);
}
/**
* Tags records location for incoming records.
*
* @param inputRecords {@link HoodieData} of incoming records
* @param context instance of {@link HoodieEngineContext} to use
* @param hoodieTable instance of {@link HoodieTable} to use
* @return {@link HoodieData} of records with record locations set
*/
protected HoodieData<HoodieRecord<T>> tagLocationInternal(
HoodieData<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
HoodieTable hoodieTable) {
if (config.getSimpleIndexUseCaching()) {
inputRecords.persist(new HoodieConfig(config.getProps())
.getString(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL_VALUE));
}
HoodiePairData<HoodieKey, HoodieRecord<T>> keyedInputRecords =
inputRecords.mapToPair(record -> new ImmutablePair<>(record.getKey(), record));
HoodiePairData<HoodieKey, HoodieRecordLocation> existingLocationsOnTable =
fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(), context, hoodieTable,
config.getSimpleIndexParallelism());
HoodieData<HoodieRecord<T>> taggedRecords =
keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry -> {
final HoodieRecord<T> untaggedRecord = entry.getRight().getLeft();
final Option<HoodieRecordLocation> location = Option.ofNullable(entry.getRight().getRight().orElse(null));
return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location);
});
if (config.getSimpleIndexUseCaching()) {
inputRecords.unpersist();
}
return taggedRecords;
}
/**
* Fetch record locations for passed in {@link HoodieKey}s.
*
* @param hoodieKeys {@link HoodieData} of {@link HoodieKey}s for which locations are fetched
* @param context instance of {@link HoodieEngineContext} to use
* @param hoodieTable instance of {@link HoodieTable} of interest
* @param parallelism parallelism to use
* @return {@link HoodiePairData} of {@link HoodieKey} and {@link HoodieRecordLocation}
*/
protected HoodiePairData<HoodieKey, HoodieRecordLocation> fetchRecordLocationsForAffectedPartitions(
HoodieData<HoodieKey> hoodieKeys, HoodieEngineContext context, HoodieTable hoodieTable,
int parallelism) {
List<String> affectedPartitionPathList =
hoodieKeys.map(HoodieKey::getPartitionPath).distinct().collectAsList();
List<Pair<String, HoodieBaseFile>> latestBaseFiles =
getLatestBaseFilesForAllPartitions(affectedPartitionPathList, context, hoodieTable);
return fetchRecordLocations(context, hoodieTable, parallelism, latestBaseFiles);
}
protected HoodiePairData<HoodieKey, HoodieRecordLocation> fetchRecordLocations(
HoodieEngineContext context, HoodieTable hoodieTable, int parallelism,
List<Pair<String, HoodieBaseFile>> baseFiles) {
int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism));
return context.parallelize(baseFiles, fetchParallelism)
.flatMap(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile, keyGeneratorOpt)
.locations().iterator())
.mapToPair(e -> (Pair<HoodieKey, HoodieRecordLocation>) e);
}
}

View File

@@ -101,7 +101,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<T, I, K, O> index;
protected final HoodieIndex<T, ?, ?, ?> index;
private SerializableConfiguration hadoopConfiguration;
protected final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetadata metadata;
@@ -125,7 +125,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
this.taskContextSupplier = context.getTaskContextSupplier();
}
protected abstract HoodieIndex<T, I, K, O> getIndex(HoodieWriteConfig config, HoodieEngineContext context);
protected abstract HoodieIndex<T, ?, ?, ?> getIndex(HoodieWriteConfig config, HoodieEngineContext context);
private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
@@ -347,7 +347,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
/**
* Return the index.
*/
public HoodieIndex<T, I, K, O> getIndex() {
public HoodieIndex<T, ?, ?, ?> getIndex() {
return index;
}

View File

@@ -63,11 +63,8 @@ public abstract class AbstractWriteHelper<T extends HoodieRecordPayload, I, K, O
}
}
private I tag(
I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table) {
// perform index loop up to get existing location of records
return table.getIndex().tagLocation(dedupedRecords, context, table);
}
protected abstract I tag(
I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table);
public I combineOnCondition(
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
@@ -87,5 +84,5 @@ public abstract class AbstractWriteHelper<T extends HoodieRecordPayload, I, K, O
}
public abstract I deduplicateRecords(
I records, HoodieIndex<T, I, K, O> index, int parallelism);
I records, HoodieIndex<T, ?, ?, ?> index, int parallelism);
}