1
0

[HUDI-2656] Generalize HoodieIndex for flexible record data type (#3893)

Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
This commit is contained in:
Y Ethan Guo
2022-02-03 20:24:04 -08:00
committed by GitHub
parent 69dfcda116
commit b8601a9f58
105 changed files with 564 additions and 504 deletions

View File

@@ -105,7 +105,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
protected final transient HoodieMetrics metrics;
private final transient HoodieIndex<T, ?, ?, ?> index;
private final transient HoodieIndex<?, ?> index;
protected transient Timer.Context writeTimer = null;
protected transient Timer.Context compactionTimer;
@@ -142,7 +142,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
this.txnManager = new TransactionManager(config, fs);
}
protected abstract HoodieIndex<T, ?, ?, ?> createIndex(HoodieWriteConfig writeConfig);
protected abstract HoodieIndex<?, ?> createIndex(HoodieWriteConfig writeConfig);
public void setOperationType(WriteOperationType operationType) {
this.operationType = operationType;
@@ -1160,7 +1160,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
return metrics;
}
public HoodieIndex<T, ?, ?, ?> getIndex() {
public HoodieIndex<?, ?> getIndex() {
return index;
}

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.execution;
import java.util.Properties;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -36,6 +35,7 @@ import org.apache.avro.generic.IndexedRecord;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.function.Function;
/**
@@ -87,7 +87,7 @@ public abstract class HoodieLazyInsertIterable<T extends HoodieRecordPayload>
public HoodieInsertValueGenResult(T record, Schema schema, Properties properties) {
this.record = record;
try {
this.insertValue = record.getData().getInsertValue(schema, properties);
this.insertValue = ((HoodieRecordPayload) record.getData()).getInsertValue(schema, properties);
} catch (Exception e) {
this.exception = Option.of(e);
}

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
@@ -39,13 +38,11 @@ import java.io.Serializable;
/**
* Base class for different types of indexes to determine the mapping from uuid.
*
* @param <T> Sub type of HoodieRecordPayload
* @param <I> Type of inputs for deprecated APIs
* @param <K> Type of keys for deprecated APIs
* @param <O> Type of outputs for deprecated APIs
*/
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O> implements Serializable {
public abstract class HoodieIndex<I, O> implements Serializable {
protected final HoodieWriteConfig config;
@@ -60,7 +57,7 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O> implem
@Deprecated
@PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
public I tagLocation(I records, HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException {
HoodieTable hoodieTable) throws HoodieIndexException {
throw new HoodieNotSupportedException("Deprecated API should not be called");
}
@@ -70,7 +67,7 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O> implem
@Deprecated
@PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
public O updateLocation(O writeStatuses, HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable) throws HoodieIndexException {
HoodieTable hoodieTable) throws HoodieIndexException {
throw new HoodieNotSupportedException("Deprecated API should not be called");
}
@@ -79,8 +76,8 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O> implem
* the row (if it is actually present).
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
public abstract <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException;
/**

View File

@@ -102,14 +102,14 @@ public class HoodieIndexUtils {
* @return the tagged {@link HoodieRecord}
*/
public static HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option<HoodieRecordLocation> location) {
HoodieRecord record = inputRecord;
HoodieRecord<?> record = inputRecord;
if (location.isPresent()) {
// When you have a record in multiple files in the same partition, then <row key, record> collection
// will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
// separate filenames that the record is found in. This will result in setting
// currentLocation 2 times and it will fail the second time. So creating a new in memory
// copy of the hoodie record.
record = new HoodieRecord<>(inputRecord);
record = inputRecord.newInstance();
record.unseal();
record.setCurrentLocation(location.get());
record.seal();

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -51,7 +51,7 @@ public abstract class BaseHoodieBloomIndexHelper implements Serializable {
public abstract HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
HoodieData<ImmutablePair<String, HoodieKey>> fileComparisonPairs,
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition);
}

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -59,8 +58,7 @@ import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPar
/**
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
*/
public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
extends HoodieIndex<T, Object, Object, Object> {
public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
private static final Logger LOG = LogManager.getLogger(HoodieBloomIndex.class);
private final BaseHoodieBloomIndexHelper bloomIndexHelper;
@@ -71,8 +69,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
// Step 0: cache the input records if needed
if (config.getBloomIndexUseCaching()) {
@@ -98,7 +96,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
}
// Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
HoodieData<HoodieRecord<T>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records);
HoodieData<HoodieRecord<R>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records);
if (config.getBloomIndexUseCaching()) {
records.unpersist();
@@ -133,7 +131,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
// Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
// that contains it.
HoodieData<ImmutablePair<String, HoodieKey>> fileComparisonPairs =
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);
return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
@@ -261,7 +259,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
HoodieData<ImmutablePair<String, HoodieKey>> explodeRecordsWithFileComparisons(
HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {
IndexFileFilter indexFileFilter =
@@ -273,7 +271,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
String partitionPath = partitionRecordKeyPair.getLeft();
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new ImmutablePair<>(partitionFileIdPair.getRight(),
.map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionPath)))
.collect(Collectors.toList());
}).flatMap(List::iterator);
@@ -282,10 +280,10 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload<T>>
/**
* Tag the <rowKey, filename> back to the original HoodieRecord List.
*/
protected HoodieData<HoodieRecord<T>> tagLocationBacktoRecords(
protected <R> HoodieData<HoodieRecord<R>> tagLocationBacktoRecords(
HoodiePairData<HoodieKey, HoodieRecordLocation> keyFilenamePair,
HoodieData<HoodieRecord<T>> records) {
HoodiePairData<HoodieKey, HoodieRecord<T>> keyRecordPairs =
HoodieData<HoodieRecord<R>> records) {
HoodiePairData<HoodieKey, HoodieRecord<R>> keyRecordPairs =
records.mapToPair(record -> new ImmutablePair<>(record.getKey(), record));
// Here as the records might have more data than keyFilenamePairs (some row keys' fileId is null),
// so we do left outer join.

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -46,7 +47,7 @@ import java.util.stream.Collectors;
* This filter will only work with hoodie table since it will only load partitions
* with .hoodie_partition_metadata file in it.
*/
public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload<T>> extends HoodieBloomIndex<T> {
public class HoodieGlobalBloomIndex extends HoodieBloomIndex {
public HoodieGlobalBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelper bloomIndexHelper) {
super(config, bloomIndexHelper);
}
@@ -73,7 +74,7 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload<T>> extends Ho
*/
@Override
HoodieData<ImmutablePair<String, HoodieKey>> explodeRecordsWithFileComparisons(
HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {
@@ -86,7 +87,7 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload<T>> extends Ho
String partitionPath = partitionRecordKeyPair.getLeft();
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> new ImmutablePair<>(partitionFileIdPair.getRight(),
.map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
.collect(Collectors.toList());
}).flatMap(List::iterator);
@@ -96,11 +97,11 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload<T>> extends Ho
* Tagging for global index should only consider the record key.
*/
@Override
protected HoodieData<HoodieRecord<T>> tagLocationBacktoRecords(
protected <R> HoodieData<HoodieRecord<R>> tagLocationBacktoRecords(
HoodiePairData<HoodieKey, HoodieRecordLocation> keyLocationPairs,
HoodieData<HoodieRecord<T>> records) {
HoodieData<HoodieRecord<R>> records) {
HoodiePairData<String, HoodieRecord<T>> incomingRowKeyRecordPairs =
HoodiePairData<String, HoodieRecord<R>> incomingRowKeyRecordPairs =
records.mapToPair(record -> new ImmutablePair<>(record.getRecordKey(), record));
HoodiePairData<String, Pair<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
@@ -109,29 +110,29 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload<T>> extends Ho
// Here as the records might have more data than rowKeys (some rowKeys' fileId is null), so we do left outer join.
return incomingRowKeyRecordPairs.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record -> {
final HoodieRecord<T> hoodieRecord = record.getLeft();
final HoodieRecord<R> hoodieRecord = record.getLeft();
final Option<Pair<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record.getRight();
if (recordLocationHoodieKeyPair.isPresent()) {
// Record key matched to file
if (config.getBloomIndexUpdatePartitionPath()
&& !recordLocationHoodieKeyPair.get().getRight().getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> deleteRecord = new HoodieRecord(recordLocationHoodieKeyPair.get().getRight(),
HoodieRecord<R> deleteRecord = new HoodieAvroRecord(recordLocationHoodieKeyPair.get().getRight(),
new EmptyHoodieRecordPayload());
deleteRecord.setCurrentLocation(recordLocationHoodieKeyPair.get().getLeft());
deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
HoodieRecord<T> insertRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
HoodieRecord<R> insertRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
return Arrays.asList(deleteRecord, insertRecord).iterator();
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.
return Collections.singletonList(
(HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get().getRight(), hoodieRecord.getData()),
(HoodieRecord<R>) HoodieIndexUtils.getTaggedRecord(new HoodieAvroRecord(recordLocationHoodieKeyPair.get().getRight(), (HoodieRecordPayload) hoodieRecord.getData()),
Option.ofNullable(recordLocationHoodieKeyPair.get().getLeft()))).iterator();
}
} else {
return Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty())).iterator();
return Collections.singletonList((HoodieRecord<R>) HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty())).iterator();
}
});
}

View File

@@ -57,11 +57,11 @@ public class ListBasedHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper
public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
HoodieData<ImmutablePair<String, HoodieKey>> fileComparisonPairs,
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
List<Pair<String, HoodieKey>> fileComparisonPairList =
HoodieList.getList(fileComparisonPairs).stream()
.sorted(Comparator.comparing(ImmutablePair::getLeft)).collect(toList());
.sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
Iterator<List<HoodieKeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(

View File

@@ -24,7 +24,6 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -34,6 +33,7 @@ import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -42,10 +42,8 @@ import java.util.Map;
/**
* Hash indexing mechanism.
* @param <T>
*/
public class HoodieBucketIndex<T extends HoodieRecordPayload<T>>
extends HoodieIndex<T, Object, Object, Object> {
public class HoodieBucketIndex extends HoodieIndex<Object, Object> {
private static final Logger LOG = LogManager.getLogger(HoodieBucketIndex.class);
@@ -66,14 +64,14 @@ public class HoodieBucketIndex<T extends HoodieRecordPayload<T>>
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(HoodieData<HoodieRecord<T>> records,
HoodieEngineContext context,
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
HoodieData<HoodieRecord<T>> taggedRecords = records.mapPartitions(recordIter -> {
HoodieData<HoodieRecord<R>> taggedRecords = records.mapPartitions(recordIter -> {
// partitionPath -> bucketId -> fileInfo
Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList = new HashMap<>();
return new LazyIterableIterator<HoodieRecord<T>, HoodieRecord<T>>(recordIter) {
return new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(recordIter) {
@Override
protected void start() {
@@ -81,7 +79,7 @@ public class HoodieBucketIndex<T extends HoodieRecordPayload<T>>
}
@Override
protected HoodieRecord<T> computeNext() {
protected HoodieRecord<R> computeNext() {
HoodieRecord record = recordIter.next();
int bucketId = BucketIdentifier.getBucketId(record, config.getBucketIndexHashField(), numBuckets);
String partitionPath = record.getPartitionPath();

View File

@@ -25,7 +25,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
@@ -41,8 +40,8 @@ import java.util.concurrent.ConcurrentMap;
* <p>
* ONLY USE FOR LOCAL TESTING
*/
public class HoodieInMemoryHashIndex<T extends HoodieRecordPayload<T>>
extends HoodieIndex<T, Object, Object, Object> {
public class HoodieInMemoryHashIndex
extends HoodieIndex<Object, Object> {
private static ConcurrentMap<HoodieKey, HoodieRecordLocation> recordLocationMap;
@@ -56,13 +55,13 @@ public class HoodieInMemoryHashIndex<T extends HoodieRecordPayload<T>>
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
return records.mapPartitions(hoodieRecordIterator -> {
List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
List<HoodieRecord<R>> taggedRecords = new ArrayList<>();
while (hoodieRecordIterator.hasNext()) {
HoodieRecord<T> record = hoodieRecordIterator.next();
HoodieRecord<R> record = hoodieRecordIterator.next();
if (recordLocationMap.containsKey(record.getKey())) {
record.unseal();
record.setCurrentLocation(recordLocationMap.get(record.getKey()));

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -47,17 +48,15 @@ import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPar
/**
* A global simple index which reads interested fields(record key and partition path) from base files and
* joins with incoming records to find the tagged location.
*
* @param <T>
*/
public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload<T>> extends HoodieSimpleIndex<T> {
public class HoodieGlobalSimpleIndex extends HoodieSimpleIndex {
public HoodieGlobalSimpleIndex(HoodieWriteConfig config, Option<BaseKeyGenerator> keyGeneratorOpt) {
super(config, keyGeneratorOpt);
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
return tagLocationInternal(records, context, hoodieTable);
}
@@ -71,11 +70,11 @@ public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload<T>> extends H
* @return {@link HoodieData} of records with record locations set
*/
@Override
protected HoodieData<HoodieRecord<T>> tagLocationInternal(
HoodieData<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
protected <R> HoodieData<HoodieRecord<R>> tagLocationInternal(
HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
HoodieTable hoodieTable) {
HoodiePairData<String, HoodieRecord<T>> keyedInputRecords =
HoodiePairData<String, HoodieRecord<R>> keyedInputRecords =
inputRecords.mapToPair(entry -> new ImmutablePair<>(entry.getRecordKey(), entry));
HoodiePairData<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable =
fetchAllRecordLocations(context, hoodieTable, config.getGlobalSimpleIndexParallelism());
@@ -114,8 +113,8 @@ public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload<T>> extends H
* @param existingRecords existing records with {@link HoodieRecordLocation}s
* @return {@link HoodieData} of {@link HoodieRecord}s with tagged {@link HoodieRecordLocation}s
*/
private HoodieData<HoodieRecord<T>> getTaggedRecords(
HoodiePairData<String, HoodieRecord<T>> incomingRecords,
private <R> HoodieData<HoodieRecord<R>> getTaggedRecords(
HoodiePairData<String, HoodieRecord<R>> incomingRecords,
HoodiePairData<HoodieKey, HoodieRecordLocation> existingRecords) {
HoodiePairData<String, Pair<String, HoodieRecordLocation>> existingRecordByRecordKey =
existingRecords.mapToPair(
@@ -124,29 +123,29 @@ public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload<T>> extends H
return incomingRecords.leftOuterJoin(existingRecordByRecordKey).values()
.flatMap(entry -> {
HoodieRecord<T> inputRecord = entry.getLeft();
HoodieRecord<R> inputRecord = entry.getLeft();
Option<Pair<String, HoodieRecordLocation>> partitionPathLocationPair = Option.ofNullable(entry.getRight().orElse(null));
List<HoodieRecord<T>> taggedRecords;
List<HoodieRecord<R>> taggedRecords;
if (partitionPathLocationPair.isPresent()) {
String partitionPath = partitionPathLocationPair.get().getKey();
HoodieRecordLocation location = partitionPathLocationPair.get().getRight();
if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> deleteRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
HoodieRecord<R> deleteRecord = new HoodieAvroRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
deleteRecord.setCurrentLocation(location);
deleteRecord.seal();
// Tag the incoming record for inserting to the new partition
HoodieRecord<T> insertRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
HoodieRecord<R> insertRecord = (HoodieRecord<R>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
taggedRecords = Arrays.asList(deleteRecord, insertRecord);
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.
HoodieRecord<T> newRecord = new HoodieRecord<>(new HoodieKey(inputRecord.getRecordKey(), partitionPath), inputRecord.getData());
taggedRecords = Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(newRecord, Option.ofNullable(location)));
HoodieRecord<R> newRecord = new HoodieAvroRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), (HoodieRecordPayload) inputRecord.getData());
taggedRecords = Collections.singletonList((HoodieRecord<R>) HoodieIndexUtils.getTaggedRecord(newRecord, Option.ofNullable(location)));
}
} else {
taggedRecords = Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty()));
taggedRecords = Collections.singletonList((HoodieRecord<R>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty()));
}
return taggedRecords.iterator();
});

View File

@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -47,11 +46,9 @@ import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPar
/**
* A simple index which reads interested fields(record key and partition path) from base files and
* joins with incoming records to find the tagged location.
*
* @param <T> type of {@link HoodieRecordPayload}
*/
public class HoodieSimpleIndex<T extends HoodieRecordPayload<T>>
extends HoodieIndex<T, Object, Object, Object> {
public class HoodieSimpleIndex
extends HoodieIndex<Object, Object> {
private final Option<BaseKeyGenerator> keyGeneratorOpt;
@@ -88,8 +85,8 @@ public class HoodieSimpleIndex<T extends HoodieRecordPayload<T>>
}
@Override
public HoodieData<HoodieRecord<T>> tagLocation(
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) {
return tagLocationInternal(records, context, hoodieTable);
}
@@ -102,23 +99,23 @@ public class HoodieSimpleIndex<T extends HoodieRecordPayload<T>>
* @param hoodieTable instance of {@link HoodieTable} to use
* @return {@link HoodieData} of records with record locations set
*/
protected HoodieData<HoodieRecord<T>> tagLocationInternal(
HoodieData<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
protected <R> HoodieData<HoodieRecord<R>> tagLocationInternal(
HoodieData<HoodieRecord<R>> inputRecords, HoodieEngineContext context,
HoodieTable hoodieTable) {
if (config.getSimpleIndexUseCaching()) {
inputRecords.persist(new HoodieConfig(config.getProps())
.getString(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL_VALUE));
}
HoodiePairData<HoodieKey, HoodieRecord<T>> keyedInputRecords =
HoodiePairData<HoodieKey, HoodieRecord<R>> keyedInputRecords =
inputRecords.mapToPair(record -> new ImmutablePair<>(record.getKey(), record));
HoodiePairData<HoodieKey, HoodieRecordLocation> existingLocationsOnTable =
fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(), context, hoodieTable,
config.getSimpleIndexParallelism());
HoodieData<HoodieRecord<T>> taggedRecords =
HoodieData<HoodieRecord<R>> taggedRecords =
keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry -> {
final HoodieRecord<T> untaggedRecord = entry.getRight().getLeft();
final HoodieRecord<R> untaggedRecord = entry.getRight().getLeft();
final Option<HoodieRecordLocation> location = Option.ofNullable(entry.getRight().getRight().orElse(null));
return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location);
});

View File

@@ -393,7 +393,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
@Override
public void write(HoodieRecord record, Option<IndexedRecord> insertValue) {
Option<Map<String, String>> recordMetadata = record.getData().getMetadata();
Option<Map<String, String>> recordMetadata = ((HoodieRecordPayload) record.getData()).getMetadata();
try {
init(record);
flushToDiskIfRequired(record);

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.io;
import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
@@ -37,6 +36,7 @@ import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
@@ -128,7 +128,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
*/
@Override
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
Option recordMetadata = record.getData().getMetadata();
Option recordMetadata = ((HoodieRecordPayload) record.getData()).getMetadata();
if (HoodieOperation.isDelete(record.getOperation())) {
avroRecord = Option.empty();
}

View File

@@ -324,7 +324,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
try {
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,

View File

@@ -85,7 +85,7 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
}
// This is a new insert
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(keyToPreWrite));
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(keyToPreWrite).newInstance();
if (writtenRecordKeys.contains(keyToPreWrite)) {
throw new HoodieUpsertException("Insert/Update not in sorted order");
}

View File

@@ -210,7 +210,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
* Perform the actual writing of the given record into the backing file.
*/
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<Exception> exception) {
Option recordMetadata = record.getData().getMetadata();
Option recordMetadata = ((HoodieRecordPayload) record.getData()).getMetadata();
if (exception.isPresent() && exception.get() instanceof Throwable) {
// Not throwing exception from here, since we don't want to fail the entire job for a single record
writeStatus.markFailure(record, exception.get(), recordMetadata);

View File

@@ -102,7 +102,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<T, ?, ?, ?> index;
protected final HoodieIndex<?, ?> index;
private SerializableConfiguration hadoopConfiguration;
protected final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetadata metadata;
@@ -128,7 +128,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
this.taskContextSupplier = context.getTaskContextSupplier();
}
protected abstract HoodieIndex<T, ?, ?, ?> getIndex(HoodieWriteConfig config, HoodieEngineContext context);
protected abstract HoodieIndex<?, ?> getIndex(HoodieWriteConfig config, HoodieEngineContext context);
protected HoodieStorageLayout getStorageLayout(HoodieWriteConfig config) {
return HoodieLayoutFactory.createLayout(config);
@@ -367,7 +367,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
/**
* Return the index.
*/
public HoodieIndex<T, ?, ?, ?> getIndex() {
public HoodieIndex<?, ?> getIndex() {
return index;
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieBootstrapHandle;
@@ -39,7 +40,8 @@ public class BootstrapRecordConsumer extends BoundedInMemoryQueueConsumer<Hoodie
@Override
protected void consumeOneRecord(HoodieRecord record) {
try {
bootstrapHandle.write(record, record.getData().getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
.getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}

View File

@@ -86,5 +86,5 @@ public abstract class BaseWriteHelper<T extends HoodieRecordPayload, I, K, O, R>
}
public abstract I deduplicateRecords(
I records, HoodieIndex<T, ?, ?, ?> index, int parallelism);
I records, HoodieIndex<?, ?> index, int parallelism);
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -122,7 +123,7 @@ public class TestHoodieHFileReaderWriter {
record.put("time", Integer.toString(RANDOM.nextInt()));
record.put("number", i);
if (testAvroWithMeta) {
writer.writeAvroWithMetadata(record, new HoodieRecord(new HoodieKey((String) record.get("_row_key"),
writer.writeAvroWithMetadata(record, new HoodieAvroRecord(new HoodieKey((String) record.get("_row_key"),
Integer.toString((Integer) record.get("number"))), new EmptyHoodieRecordPayload())); // payload does not matter. GenericRecord passed in is what matters
// only HoodieKey will be looked up from the 2nd arg(HoodieRecord).
} else {
@@ -170,4 +171,4 @@ public class TestHoodieHFileReaderWriter {
}
return rowKeys;
}
}
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -41,6 +42,7 @@ import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -48,7 +50,6 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
@@ -118,7 +119,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable {
config, schema, contextSupplier, populateMetaFields)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
GenericRecord avroRecord = (GenericRecord) ((HoodieRecordPayload) record.getData()).getInsertValue(schema).get();
if (populateMetaFields) {
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
@@ -141,7 +142,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable {
config, schema, contextSupplier)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
GenericRecord avroRecord = (GenericRecord) ((HoodieRecordPayload) record.getData()).getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++));
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
writer.writeAvro(record.getRecordKey(), avroRecord);
@@ -175,7 +176,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable {
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
try {
GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
GenericRecord val = (GenericRecord) ((HoodieRecordPayload) r.getData()).getInsertValue(schema).get();
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
return (IndexedRecord) val;
} catch (IOException e) {