1
0

[HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups (#4352)

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

- Today, base files have bloom filter at their footers and index lookups
  have to load the base file to perform any bloom lookups. Though we have
  interval tree based file purging, we still end up in significant amount
  of base file read for the bloom filter for the end index lookups for the
  keys. This index lookup operation can be made more performant by having
  all the bloom filters in a new metadata partition and doing pointed
  lookups based on keys.

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Adding indexing support for clean, restore and rollback operations.
   Each of these operations will now be converted to index records for
   bloom filter and column stats additionally.

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Making hoodie key consistent for both column stats and bloom index by
   including fileId instead of fileName, in both read and write paths.

 - Performance optimization for looking up records in the metadata table.

 - Avoiding multi column sorting needed for HoodieBloomMetaIndexBatchCheckFunction

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - HoodieBloomMetaIndexBatchCheckFunction cleanup to remove unused classes

 - Base file checking before reading the file footer for bloom or column stats

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Updating the bloom index and column stats index to have full file name
   included in the key instead of just file id.

 - Minor test fixes.

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Fixed flink commit method to handle metadata table all partition update records

 - TestBloomIndex fixes

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - SparkHoodieBloomIndexHelper code simplification for various config modes

 - Signature change for getBloomFilters() and getColumnStats(). Callers can
   just pass in interested partition and file names, the index key is then
   constructed internally based on the passed in parameters.

 - KeyLookupHandle and KeyLookupResults code refactoring

 - Metadata schema changes - removed the reserved field

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Removing HoodieColumnStatsMetadata and using HoodieColumnRangeMetadata instead.
   Fixed the users of the the removed class.

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Extending meta index test to cover deletes, compactions, clean
   and restore table operations. Also, fixed the getBloomFilters()
   and getColumnStats() to account for deleted entries.

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Addressing review comments - java doc for new classes, keys sorting for
   lookup, index methods renaming.

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Consolidated the bloom filter checking for keys in to one
   HoodieMetadataBloomIndexCheckFunction instead of a spearate batch
   and lazy mode. Removed all the configs around it.

 - Made the metadata table partition file group count configurable.

 - Fixed the HoodieKeyLookupHandle to have auto closable file reader
   when checking bloom filter and range keys.

 - Config property renames. Test fixes.

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Enabling column stats indexing for all columns by default

 - Handling column stat generation errors and test update

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Metadata table partition file group count taken from the slices when
   the table is bootstrapped.

 - Prep records for the commit refactored to the base class

 - HoodieFileReader interface changes for filtering keys

 - Multi column and data types support for colums stats index

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - rebase to latest master and merge fixes for the build and test failures

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Extending the metadata column stats type payload schema to include
   more statistics about the column ranges to help query integration.

* [HUDI-1295] Metadata Index - Bloom filter and Column stats index to speed up index lookups

 - Addressing review comments
This commit is contained in:
Manoj Govindassamy
2022-02-03 04:42:48 -08:00
committed by GitHub
parent d681824982
commit 5927bdd1c0
49 changed files with 2304 additions and 522 deletions

View File

@@ -1437,6 +1437,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
}
public boolean isMetadataBloomFilterIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isBloomFilterIndexEnabled();
}
public boolean isMetadataIndexColumnStatsForAllColumnsEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled();
}
public int getBloomIndexKeysPerBucket() {
return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET);
}

View File

@@ -18,17 +18,30 @@
package org.apache.hudi.index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import static java.util.stream.Collectors.toList;
@@ -37,6 +50,8 @@ import static java.util.stream.Collectors.toList;
*/
public class HoodieIndexUtils {
private static final Logger LOG = LogManager.getLogger(HoodieIndexUtils.class);
/**
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
*
@@ -101,4 +116,34 @@ public class HoodieIndexUtils {
}
return record;
}
/**
* Given a list of row keys and one file, return only row keys existing in that file.
*
* @param filePath - File to filter keys from
* @param candidateRecordKeys - Candidate keys to filter
* @return List of candidate keys that are available in the file
*/
public static List<String> filterKeysFromFile(Path filePath, List<String> candidateRecordKeys,
Configuration configuration) throws HoodieIndexException {
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
List<String> foundRecordKeys = new ArrayList<>();
try {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
Set<String> fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
if (LOG.isDebugEnabled()) {
LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
}
}
} catch (Exception e) {
throw new HoodieIndexException("Error checking candidate keys against file.", e);
}
return foundRecordKeys;
}
}

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.io.HoodieKeyLookupHandle;
import org.apache.hudi.io.HoodieKeyLookupHandle.KeyLookupResult;
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.table.HoodieTable;
import java.util.function.Function;
@@ -37,7 +37,7 @@ import java.util.List;
* Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files.
*/
public class HoodieBaseBloomIndexCheckFunction
implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<KeyLookupResult>>> {
implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
private final HoodieTable hoodieTable;
@@ -49,11 +49,11 @@ public class HoodieBaseBloomIndexCheckFunction
}
@Override
public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
}
class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<KeyLookupResult>> {
class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<HoodieKeyLookupResult>> {
private HoodieKeyLookupHandle keyLookupHandle;
@@ -66,8 +66,8 @@ public class HoodieBaseBloomIndexCheckFunction
}
@Override
protected List<KeyLookupResult> computeNext() {
List<KeyLookupResult> ret = new ArrayList<>();
protected List<HoodieKeyLookupResult> computeNext() {
List<HoodieKeyLookupResult> ret = new ArrayList<>();
try {
// process one file in each go.
while (inputItr.hasNext()) {
@@ -83,7 +83,7 @@ public class HoodieBaseBloomIndexCheckFunction
}
// if continue on current file
if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
if (keyLookupHandle.getPartitionPathFileIDPair().equals(partitionPathFilePair)) {
keyLookupHandle.addKey(recordKey);
} else {
// do the actual checking of file & break out

View File

@@ -19,11 +19,13 @@
package org.apache.hudi.index.bloom;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -33,6 +35,7 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
@@ -46,6 +49,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
@@ -111,13 +115,19 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
HoodiePairData<String, String> partitionRecordKeyPairs, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
// Step 1: Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = partitionRecordKeyPairs.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
List<Pair<String, BloomIndexFileInfo>> fileInfoList;
if (config.getBloomIndexPruneByRanges()) {
fileInfoList = (config.getMetadataConfig().isColumnStatsIndexEnabled()
? loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable)
: loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable));
} else {
fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable);
}
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
@@ -133,30 +143,84 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
/**
* Load all involved files as <Partition, filename> pair List.
*/
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
if (config.getBloomIndexPruneByRanges()) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
}
/**
* Get BloomIndexFileInfo for all the latest base files for the requested partitions.
*
* @param partitions - List of partitions to get the base files for
* @param context - Engine context
* @param hoodieTable - Hoodie Table
* @return List of partition and file column range info pairs
*/
private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context,
hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
return partitionPathFileIDList.stream()
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
/**
* Load the column stats index as BloomIndexFileInfo for all the involved files in the partition.
*
* @param partitions - List of partitions for which column stats need to be loaded
* @param context - Engine context
* @param hoodieTable - Hoodie table
* @return List of partition and file column range info pairs
*/
protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices");
final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
return context.flatMap(partitions, partitionName -> {
// Partition and file name pairs
List<Pair<String, String>> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
hoodieTable).stream().map(baseFile -> Pair.of(partitionName, baseFile.getFileName()))
.sorted()
.collect(toList());
if (partitionFileNameList.isEmpty()) {
return Stream.empty();
}
try {
Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap = hoodieTable
.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
entry.getValue().getMinValue(),
entry.getValue().getMaxValue()
)));
}
}, Math.max(partitionPathFileIDList.size(), 1));
} else {
return partitionPathFileIDList.stream()
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
}
return result.stream();
} catch (MetadataNotFoundException me) {
throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
}
}, Math.max(partitions.size(), 1));
}
@Override

View File

@@ -55,11 +55,11 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload<T>> extends Ho
* Load all involved files as <Partition, filename> pairs from all partitions in the table.
*/
@Override
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), metaClient.getBasePath());
return super.loadInvolvedFiles(allPartitionPaths, context, hoodieTable);
return super.loadColumnRangesFromFiles(allPartitionPaths, context, hoodieTable);
}
/**

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieKeyLookupHandle;
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.table.HoodieTable;
import java.util.ArrayList;
@@ -63,9 +63,8 @@ public class ListBasedHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper
HoodieList.getList(fileComparisonPairs).stream()
.sorted(Comparator.comparing(ImmutablePair::getLeft)).collect(toList());
List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();
Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(
List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
Iterator<List<HoodieKeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(
hoodieTable, config).apply(fileComparisonPairList.iterator());
while (iterator.hasNext()) {
keyLookupResults.addAll(iterator.next());
@@ -77,7 +76,7 @@ public class ListBasedHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper
lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()
).mapToPair(pair -> {
HoodieKeyLookupHandle.KeyLookupResult lookupResult = pair.getLeft();
HoodieKeyLookupResult lookupResult = pair.getLeft();
String recordKey = pair.getRight();
return new ImmutablePair<>(
new HoodieKey(recordKey, lookupResult.getPartitionPath()),

View File

@@ -19,6 +19,8 @@
package org.apache.hudi.io;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -31,8 +33,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload, I, K, O> {
protected final FileSystem fs;
protected final HoodieTable<T, I, K, O> hoodieTable;
HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable) {
this.instantTime = instantTime;
HoodieIOHandle(HoodieWriteConfig config, Option<String> instantTime, HoodieTable<T, I, K, O> hoodieTable) {
this.instantTime = instantTime.orElse(StringUtils.EMPTY_STRING);
this.config = config;
this.hoodieTable = hoodieTable;
this.fs = getFileSystem();

View File

@@ -47,7 +47,7 @@ public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload, I, K, O
public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, HoodieBaseFile> partitionPathBaseFilePair, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, null, hoodieTable, Pair.of(partitionPathBaseFilePair.getLeft(), partitionPathBaseFilePair.getRight().getFileId()));
super(config, hoodieTable, Pair.of(partitionPathBaseFilePair.getLeft(), partitionPathBaseFilePair.getRight().getFileId()));
this.partitionPathBaseFilePair = partitionPathBaseFilePair;
this.keyGeneratorOpt = keyGeneratorOpt;
}

View File

@@ -19,25 +19,30 @@
package org.apache.hudi.io;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Takes a bunch of keys and returns ones that are present in the file group.
@@ -46,52 +51,58 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> exten
private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class);
private final HoodieTableType tableType;
private final BloomFilter bloomFilter;
private final List<String> candidateRecordKeys;
private final boolean useMetadataTableIndex;
private Option<String> fileName = Option.empty();
private long totalKeysChecked;
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFilePair) {
super(config, null, hoodieTable, partitionPathFilePair);
this.tableType = hoodieTable.getMetaClient().getTableType();
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
HoodieTimer timer = new HoodieTimer().startTimer();
try {
this.bloomFilter = createNewFileReader().readBloomFilter();
} catch (IOException e) {
throw new HoodieIndexException(String.format("Error reading bloom filter from %s: %s", partitionPathFilePair, e));
}
LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer()));
Pair<String, String> partitionPathFileIDPair) {
this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
}
/**
* Given a list of row keys and one file, return only row keys existing in that file.
*/
public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
Path filePath) throws HoodieIndexException {
List<String> foundRecordKeys = new ArrayList<>();
public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair, Option<String> fileName,
boolean useMetadataTableIndex) {
super(config, hoodieTable, partitionPathFileIDPair);
this.candidateRecordKeys = new ArrayList<>();
this.totalKeysChecked = 0;
if (fileName.isPresent()) {
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
"File name '" + fileName.get() + "' doesn't match this lookup handle fileid '" + getFileId() + "'");
this.fileName = fileName;
}
this.useMetadataTableIndex = useMetadataTableIndex;
this.bloomFilter = getBloomFilter();
}
private BloomFilter getBloomFilter() {
BloomFilter bloomFilter = null;
HoodieTimer timer = new HoodieTimer().startTimer();
try {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
if (LOG.isDebugEnabled()) {
LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
if (this.useMetadataTableIndex) {
ValidationUtils.checkArgument(this.fileName.isPresent(),
"File name not available to fetch bloom filter from the metadata table index.");
Option<ByteBuffer> bloomFilterByteBuffer =
hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), fileName.get());
if (!bloomFilterByteBuffer.isPresent()) {
throw new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight());
}
bloomFilter =
new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
BloomFilterTypeCode.DYNAMIC_V0);
} else {
try (HoodieFileReader reader = createNewFileReader()) {
bloomFilter = reader.readBloomFilter();
}
}
} catch (Exception e) {
throw new HoodieIndexException("Error checking candidate keys against file.", e);
} catch (IOException e) {
throw new HoodieIndexException(String.format("Error reading bloom filter from %s/%s - %s",
getPartitionPathFileIDPair().getLeft(), this.fileName, e));
}
return foundRecordKeys;
LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
return bloomFilter;
}
/**
@@ -101,7 +112,7 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> exten
// check record key against bloom filter of current file & add to possible keys if needed
if (bloomFilter.mightContain(recordKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Record key " + recordKey + " matches bloom filter in " + partitionPathFilePair);
LOG.debug("Record key " + recordKey + " matches bloom filter in " + partitionPathFileIDPair);
}
candidateRecordKeys.add(recordKey);
}
@@ -111,53 +122,18 @@ public class HoodieKeyLookupHandle<T extends HoodieRecordPayload, I, K, O> exten
/**
* Of all the keys, that were added, return a list of keys that were actually found in the file group.
*/
public KeyLookupResult getLookupResult() {
public HoodieKeyLookupResult getLookupResult() {
if (LOG.isDebugEnabled()) {
LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys);
LOG.debug("#The candidate row keys for " + partitionPathFileIDPair + " => " + candidateRecordKeys);
}
HoodieBaseFile dataFile = getLatestDataFile();
List<String> matchingKeys =
checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()), candidateRecordKeys,
hoodieTable.getHadoopConf());
LOG.info(
String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
return new HoodieKeyLookupResult(partitionPathFileIDPair.getRight(), partitionPathFileIDPair.getLeft(),
dataFile.getCommitTime(), matchingKeys);
}
/**
* Encapsulates the result from a key lookup.
*/
public static class KeyLookupResult {
private final String fileId;
private final String baseInstantTime;
private final List<String> matchingRecordKeys;
private final String partitionPath;
public KeyLookupResult(String fileId, String partitionPath, String baseInstantTime,
List<String> matchingRecordKeys) {
this.fileId = fileId;
this.partitionPath = partitionPath;
this.baseInstantTime = baseInstantTime;
this.matchingRecordKeys = matchingRecordKeys;
}
public String getFileId() {
return fileId;
}
public String getBaseInstantTime() {
return baseInstantTime;
}
public String getPartitionPath() {
return partitionPath;
}
public List<String> getMatchingRecordKeys() {
return matchingRecordKeys;
}
}
}

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import java.util.List;
/**
* Encapsulates the result from a key lookup.
*/
public class HoodieKeyLookupResult {
private final String fileId;
private final String baseInstantTime;
private final List<String> matchingRecordKeys;
private final String partitionPath;
public HoodieKeyLookupResult(String fileId, String partitionPath, String baseInstantTime,
List<String> matchingRecordKeys) {
this.fileId = fileId;
this.partitionPath = partitionPath;
this.baseInstantTime = baseInstantTime;
this.matchingRecordKeys = matchingRecordKeys;
}
public String getFileId() {
return fileId;
}
public String getBaseInstantTime() {
return baseInstantTime;
}
public String getPartitionPath() {
return partitionPath;
}
public List<String> getMatchingRecordKeys() {
return matchingRecordKeys;
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.io;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
@@ -32,10 +33,12 @@ public class HoodieRangeInfoHandle<T extends HoodieRecordPayload, I, K, O> exten
public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFilePair) {
super(config, null, hoodieTable, partitionPathFilePair);
super(config, hoodieTable, partitionPathFilePair);
}
public String[] getMinMaxKeys() throws IOException {
return createNewFileReader().readMinMaxRecordKeys();
try (HoodieFileReader reader = createNewFileReader()) {
return reader.readMinMaxRecordKeys();
}
}
}

View File

@@ -18,8 +18,11 @@
package org.apache.hudi.io;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
@@ -28,20 +31,17 @@ import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Base class for read operations done logically on the file group.
*/
public abstract class HoodieReadHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieIOHandle<T, I, K, O> {
protected final Pair<String, String> partitionPathFilePair;
protected final Pair<String, String> partitionPathFileIDPair;
public HoodieReadHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFilePair) {
super(config, instantTime, hoodieTable);
this.partitionPathFilePair = partitionPathFilePair;
public HoodieReadHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> hoodieTable,
Pair<String, String> partitionPathFileIDPair) {
super(config, Option.empty(), hoodieTable);
this.partitionPathFileIDPair = partitionPathFileIDPair;
}
@Override
@@ -49,17 +49,17 @@ public abstract class HoodieReadHandle<T extends HoodieRecordPayload, I, K, O> e
return hoodieTable.getMetaClient().getFs();
}
public Pair<String, String> getPartitionPathFilePair() {
return partitionPathFilePair;
public Pair<String, String> getPartitionPathFileIDPair() {
return partitionPathFileIDPair;
}
public String getFileId() {
return partitionPathFilePair.getRight();
return partitionPathFileIDPair.getRight();
}
protected HoodieBaseFile getLatestDataFile() {
return hoodieTable.getBaseFileOnlyView()
.getLatestBaseFile(partitionPathFilePair.getLeft(), partitionPathFilePair.getRight()).get();
.getLatestBaseFile(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight()).get();
}
protected HoodieFileReader createNewFileReader() throws IOException {

View File

@@ -108,7 +108,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId,
HoodieTable<T, I, K, O> hoodieTable, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable);
super(config, Option.of(instantTime), hoodieTable);
this.partitionPath = partitionPath;
this.fileId = fileId;
this.tableSchema = overriddenSchema.orElseGet(() -> getSpecifiedTableSchema(config));

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -50,6 +52,7 @@ import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -109,6 +112,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
protected boolean enabled;
protected SerializableConfiguration hadoopConf;
protected final transient HoodieEngineContext engineContext;
// TODO: HUDI-3258 Support secondary key via multiple partitions within a single type
protected final List<MetadataPartitionType> enabledPartitionTypes;
/**
* Hudi backed table metadata writer.
@@ -128,6 +133,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
this.dataWriteConfig = writeConfig;
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
this.metrics = Option.empty();
this.enabledPartitionTypes = new ArrayList<>();
if (writeConfig.isMetadataTableEnabled()) {
this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
@@ -145,22 +152,67 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
"File listing cannot be used for Metadata Table");
initRegistry();
this.dataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
enablePartitions();
initRegistry();
initialize(engineContext, actionMetadata, inflightInstantTimestamp);
initTableMetadata();
} else {
enabled = false;
this.metrics = Option.empty();
}
}
public HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext) {
HoodieEngineContext engineContext) {
this(hadoopConf, writeConfig, engineContext, Option.empty(), Option.empty());
}
/**
* Enable metadata table partitions based on config.
*/
private void enablePartitions() {
final HoodieMetadataConfig metadataConfig = dataWriteConfig.getMetadataConfig();
boolean isBootstrapCompleted;
Option<HoodieTableMetaClient> metaClient = Option.empty();
try {
isBootstrapCompleted = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
if (isBootstrapCompleted) {
metaClient = Option.of(HoodieTableMetaClient.builder().setConf(hadoopConf.get())
.setBasePath(metadataWriteConfig.getBasePath()).build());
}
} catch (IOException e) {
throw new HoodieException("Failed to enable metadata partitions!", e);
}
Option<HoodieTableFileSystemView> fsView = Option.ofNullable(
metaClient.isPresent() ? HoodieTableMetadataUtil.getFileSystemView(metaClient.get()) : null);
enablePartition(MetadataPartitionType.FILES, metadataConfig, metaClient, fsView, isBootstrapCompleted);
if (metadataConfig.isBloomFilterIndexEnabled()) {
enablePartition(MetadataPartitionType.BLOOM_FILTERS, metadataConfig, metaClient, fsView, isBootstrapCompleted);
}
if (metadataConfig.isColumnStatsIndexEnabled()) {
enablePartition(MetadataPartitionType.COLUMN_STATS, metadataConfig, metaClient, fsView, isBootstrapCompleted);
}
}
/**
* Enable metadata table partition.
*
* @param partitionType - Metadata table partition type
* @param metadataConfig - Table config
* @param metaClient - Meta client for the metadata table
* @param fsView - Metadata table filesystem view to use
* @param isBootstrapCompleted - Is metadata table bootstrap completed
*/
private void enablePartition(final MetadataPartitionType partitionType, final HoodieMetadataConfig metadataConfig,
final Option<HoodieTableMetaClient> metaClient, Option<HoodieTableFileSystemView> fsView, boolean isBootstrapCompleted) {
final int fileGroupCount = HoodieTableMetadataUtil.getPartitionFileGroupCount(partitionType, metaClient, fsView,
metadataConfig, isBootstrapCompleted);
partitionType.setFileGroupCount(fileGroupCount);
this.enabledPartitionTypes.add(partitionType);
}
protected abstract void initRegistry();
/**
@@ -257,10 +309,14 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
return metadataWriteConfig;
}
public HoodieBackedTableMetadata metadata() {
public HoodieBackedTableMetadata getTableMetadata() {
return metadata;
}
public List<MetadataPartitionType> getEnabledPartitionTypes() {
return this.enabledPartitionTypes;
}
/**
* Initialize the metadata table if it does not exist.
*
@@ -460,7 +516,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
initTableMetadata();
initializeFileGroups(dataMetaClient, MetadataPartitionType.FILES, createInstantTime, 1);
initializeEnabledFileGroups(dataMetaClient, createInstantTime);
// List all partitions in the basePath of the containing dataset
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
@@ -529,6 +585,20 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
return partitionsToBootstrap;
}
/**
* Initialize file groups for all the enabled partition types.
*
* @param dataMetaClient - Meta client for the data table
* @param createInstantTime - Metadata table create instant time
* @throws IOException
*/
private void initializeEnabledFileGroups(HoodieTableMetaClient dataMetaClient, String createInstantTime) throws IOException {
for (MetadataPartitionType enabledPartitionType : this.enabledPartitionTypes) {
initializeFileGroups(dataMetaClient, enabledPartitionType, createInstantTime,
enabledPartitionType.getFileGroupCount());
}
}
/**
* Initialize file groups for a partition. For file listing, we just have one file group.
*
@@ -550,12 +620,12 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0], blockHeader);
LOG.info(String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s",
fileGroupCount, metadataPartition.partitionPath(), metadataPartition.getFileIdPrefix(), instantTime));
fileGroupCount, metadataPartition.getPartitionPath(), metadataPartition.getFileIdPrefix(), instantTime));
for (int i = 0; i < fileGroupCount; ++i) {
final String fileGroupFileId = String.format("%s%04d", metadataPartition.getFileIdPrefix(), i);
try {
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), metadataPartition.partitionPath()))
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), metadataPartition.getPartitionPath()))
.withFileId(fileGroupFileId).overBaseCommit(instantTime)
.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
.withFileSize(0L)
@@ -567,7 +637,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
writer.appendBlock(block);
writer.close();
} catch (InterruptedException e) {
throw new HoodieException("Failed to created fileGroup " + fileGroupFileId + " for partition " + metadataPartition.partitionPath(), e);
throw new HoodieException("Failed to created fileGroup " + fileGroupFileId + " for partition " + metadataPartition.getPartitionPath(), e);
}
}
}
@@ -577,7 +647,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* Updates of different commit metadata uses the same method to convert to HoodieRecords and hence.
*/
private interface ConvertMetadataFunction {
List<HoodieRecord> convertMetadata();
Map<MetadataPartitionType, HoodieData<HoodieRecord>> convertMetadata();
}
/**
@@ -589,8 +659,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/
private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
if (enabled && metadata != null) {
List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService);
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
commit(instantTime, partitionRecordsMap, canTriggerTableService);
}
}
@@ -602,7 +672,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/
@Override
public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime), !isTableServiceAction);
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes,
commitMetadata, dataMetaClient, dataWriteConfig.isMetadataIndexColumnStatsForAllColumnsEnabled(), instantTime), !isTableServiceAction);
}
/**
@@ -613,8 +684,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/
@Override
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime),
false);
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes,
cleanMetadata, dataMetaClient, instantTime), false);
}
/**
@@ -625,8 +696,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(),
restoreMetadata, instantTime, metadata.getSyncedInstantTime()), false);
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
enabledPartitionTypes, metadataMetaClient.getActiveTimeline(), restoreMetadata, dataMetaClient, instantTime,
metadata.getSyncedInstantTime()), false);
}
/**
@@ -650,9 +722,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
}
}
List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), rollbackMetadata, instantTime,
metadata.getSyncedInstantTime(), wasSynced);
commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, false);
Map<MetadataPartitionType, HoodieData<HoodieRecord>> records =
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, enabledPartitionTypes,
metadataMetaClient.getActiveTimeline(), rollbackMetadata, dataMetaClient, instantTime,
metadata.getSyncedInstantTime(), wasSynced);
commit(instantTime, records, false);
}
}
@@ -665,12 +739,47 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
/**
* Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
* @param records The HoodieData of records to be written.
* @param partitionName The partition to which the records are to be written.
* @param instantTime The timestamp to use for the deltacommit.
*
* @param instantTime - Action instant time for this commit
* @param partitionRecordsMap - Map of partition name to its records to commit
* @param canTriggerTableService true if table services can be scheduled and executed. false otherwise.
*/
protected abstract void commit(HoodieData<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService);
protected abstract void commit(
String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap,
boolean canTriggerTableService);
/**
* Tag each record with the location in the given partition.
* The record is tagged with respective file slice's location based on its record key.
*/
protected HoodieData<HoodieRecord> prepRecords(Map<MetadataPartitionType,
HoodieData<HoodieRecord>> partitionRecordsMap) {
// The result set
HoodieData<HoodieRecord> allPartitionRecords = engineContext.emptyHoodieData();
HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : partitionRecordsMap.entrySet()) {
final String partitionName = entry.getKey().getPartitionPath();
final int fileGroupCount = entry.getKey().getFileGroupCount();
HoodieData<HoodieRecord> records = entry.getValue();
List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), partitionName);
ValidationUtils.checkArgument(fileSlices.size() == fileGroupCount,
String.format("Invalid number of file groups for partition:%s, found=%d, required=%d",
partitionName, fileSlices.size(), fileGroupCount));
HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
fileGroupCount));
r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
return r;
});
allPartitionRecords = allPartitionRecords.union(rddSinglePartitionRecords);
}
return allPartitionRecords;
}
/**
* Perform a compaction on the Metadata Table.
@@ -735,14 +844,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
List<String> partitions = partitionInfoList.stream().map(p ->
p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : p.getRelativePath()).collect(Collectors.toList());
final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum();
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
// Record which saves the list of all partitions
HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
if (partitions.isEmpty()) {
// in case of boostrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit
commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord), 1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
// in case of bootstrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit
final HoodieData<HoodieRecord> allPartitionRecordsRDD = engineContext.parallelize(
Collections.singletonList(allPartitionRecord), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD);
commit(createInstantTime, partitionToRecordsMap, false);
return;
}
HoodieData<HoodieRecord> partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
if (!partitionInfoList.isEmpty()) {
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
@@ -762,7 +876,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata");
ValidationUtils.checkState(partitionRecords.count() == (partitions.size() + 1));
commit(partitionRecords, MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
partitionToRecordsMap.put(MetadataPartitionType.FILES, partitionRecords);
commit(createInstantTime, partitionToRecordsMap, false);
}
/**

View File

@@ -768,4 +768,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
return Option.empty();
}
public HoodieTableMetadata getMetadataTable() {
return this.metadata;
}
}