[HUDI-389] Fixing Index look up to return right partitions for a given key along with fileId with Global Bloom (#1091)
* Fixing Index look up to return partitions for a given key along with fileId with Global Bloom * Addressing some of the comments * Fixing test in TestHoodieGlobalBloomIndex to test the fix
This commit is contained in:
committed by
vinoth chandar
parent
94aec965f5
commit
9c4217a3e1
@@ -105,7 +105,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
recordRDD.unpersist(); // unpersist the input Record RDD
|
recordRDD.unpersist(); // unpersist the input Record RDD
|
||||||
keyFilenamePairRDD.unpersist();
|
keyFilenamePairRDD.unpersist();
|
||||||
}
|
}
|
||||||
|
|
||||||
return taggedRecordRDD;
|
return taggedRecordRDD;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -321,8 +320,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
|||||||
String recordKey = partitionRecordKeyPair._2();
|
String recordKey = partitionRecordKeyPair._2();
|
||||||
String partitionPath = partitionRecordKeyPair._1();
|
String partitionPath = partitionRecordKeyPair._1();
|
||||||
|
|
||||||
return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
|
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
|
||||||
.map(matchingFile -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, partitionPath)))
|
.map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
|
||||||
|
new HoodieKey(recordKey, partitionPath)))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}).flatMap(List::iterator);
|
}).flatMap(List::iterator);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -66,7 +66,8 @@ public class HoodieBloomIndexCheckFunction
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void start() {}
|
protected void start() {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
|
protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
|
||||||
@@ -113,6 +114,7 @@ public class HoodieBloomIndexCheckFunction
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void end() {}
|
protected void end() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,12 +33,11 @@ import com.google.common.annotations.VisibleForTesting;
|
|||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.Optional;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
@@ -74,7 +73,7 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
|
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
|
||||||
* checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
|
* checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
|
||||||
* to be compared gets cut down a lot from range pruning.
|
* to be compared gets cut down a lot from range pruning.
|
||||||
*
|
* <p>
|
||||||
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
|
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
|
||||||
* recordKey ranges in the index info. the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will
|
* recordKey ranges in the index info. the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will
|
||||||
* be ignored since the search scope should be bigger than that
|
* be ignored since the search scope should be bigger than that
|
||||||
@@ -85,10 +84,6 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
||||||
Map<String, String> indexToPartitionMap = new HashMap<>();
|
|
||||||
for (Entry<String, List<BloomIndexFileInfo>> entry : partitionToFileIndexInfo.entrySet()) {
|
|
||||||
entry.getValue().forEach(indexFile -> indexToPartitionMap.put(indexFile.getFileId(), entry.getKey()));
|
|
||||||
}
|
|
||||||
|
|
||||||
IndexFileFilter indexFileFilter =
|
IndexFileFilter indexFileFilter =
|
||||||
config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
|
config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
|
||||||
@@ -98,26 +93,37 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
String recordKey = partitionRecordKeyPair._2();
|
String recordKey = partitionRecordKeyPair._2();
|
||||||
String partitionPath = partitionRecordKeyPair._1();
|
String partitionPath = partitionRecordKeyPair._1();
|
||||||
|
|
||||||
return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
|
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
|
||||||
.map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file))))
|
.map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(),
|
||||||
|
new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}).flatMap(List::iterator);
|
}).flatMap(List::iterator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tagging for global index should only consider the record key.
|
* Tagging for global index should only consider the record key.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
|
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
|
||||||
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
|
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
|
||||||
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD =
|
|
||||||
|
JavaPairRDD<String, HoodieRecord<T>> incomingRowKeyRecordPairRDD =
|
||||||
recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
|
recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
|
||||||
|
|
||||||
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
|
JavaPairRDD<String, Tuple2<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
|
||||||
// so we do left outer join.
|
keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));
|
||||||
return rowKeyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), p._2)))
|
|
||||||
.values().map(value -> getTaggedRecord(value._1, Option.ofNullable(value._2.orNull())));
|
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
|
||||||
|
return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> {
|
||||||
|
final HoodieRecord<T> hoodieRecord = record._1;
|
||||||
|
final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
|
||||||
|
if (recordLocationHoodieKeyPair.isPresent()) {
|
||||||
|
// Record key matched to file
|
||||||
|
return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
|
||||||
|
} else {
|
||||||
|
return getTaggedRecord(hoodieRecord, Option.empty());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.index.bloom;
|
package org.apache.hudi.index.bloom;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
@@ -27,12 +29,13 @@ import java.util.Set;
|
|||||||
public interface IndexFileFilter extends Serializable {
|
public interface IndexFileFilter extends Serializable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetches all matching files for a given record key and partition.
|
* Fetches all matching files and partition pair for a given record key and partition path.
|
||||||
*
|
*
|
||||||
* @param partitionPath the partition path of interest
|
* @param partitionPath the partition path of interest
|
||||||
* @param recordKey the record key to be looked up
|
* @param recordKey the record key to be looked up
|
||||||
* @return the {@link Set} of matching file names where the record could potentially be present.
|
* @return the {@link Set} of matching <Partition path, file name> pairs where the record could potentially be
|
||||||
|
* present.
|
||||||
*/
|
*/
|
||||||
Set<String> getMatchingFiles(String partitionPath, String recordKey);
|
Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,13 +18,15 @@
|
|||||||
|
|
||||||
package org.apache.hudi.index.bloom;
|
package org.apache.hudi.index.bloom;
|
||||||
|
|
||||||
import java.util.Collection;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interval Tree based index look up for Global Index. Builds an {@link KeyRangeLookupTree} for all index files (across
|
* Interval Tree based index look up for Global Index. Builds an {@link KeyRangeLookupTree} for all index files (across
|
||||||
@@ -34,6 +36,7 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {
|
|||||||
|
|
||||||
private final KeyRangeLookupTree indexLookUpTree = new KeyRangeLookupTree();
|
private final KeyRangeLookupTree indexLookUpTree = new KeyRangeLookupTree();
|
||||||
private final Set<String> filesWithNoRanges = new HashSet<>();
|
private final Set<String> filesWithNoRanges = new HashSet<>();
|
||||||
|
private final Map<String, String> fileIdToPartitionPathMap = new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiates {@link IntervalTreeBasedGlobalIndexFileFilter}.
|
* Instantiates {@link IntervalTreeBasedGlobalIndexFileFilter}.
|
||||||
@@ -41,8 +44,15 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {
|
|||||||
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s
|
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s
|
||||||
*/
|
*/
|
||||||
IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
|
IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
|
||||||
List<BloomIndexFileInfo> allIndexFiles =
|
List<BloomIndexFileInfo> allIndexFiles = new ArrayList<>();
|
||||||
partitionToFileIndexInfo.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
|
|
||||||
|
partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> {
|
||||||
|
bloomIndexFileInfoList.forEach(file -> {
|
||||||
|
fileIdToPartitionPathMap.put(file.getFileId(), parition);
|
||||||
|
allIndexFiles.add(file);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
|
// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
|
||||||
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed
|
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed
|
||||||
// which could result in N search time instead of NlogN.
|
// which could result in N search time instead of NlogN.
|
||||||
@@ -58,10 +68,12 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
|
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
|
||||||
Set<String> toReturn = new HashSet<>();
|
Set<String> matchingFiles = new HashSet<>();
|
||||||
toReturn.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey));
|
matchingFiles.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey));
|
||||||
toReturn.addAll(filesWithNoRanges);
|
matchingFiles.addAll(filesWithNoRanges);
|
||||||
|
Set<Pair<String, String>> toReturn = new HashSet<>();
|
||||||
|
matchingFiles.forEach(file -> toReturn.add(Pair.of(fileIdToPartitionPathMap.get(file), file)));
|
||||||
return toReturn;
|
return toReturn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.index.bloom;
|
package org.apache.hudi.index.bloom;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
@@ -62,14 +64,16 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
|
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
|
||||||
Set<String> toReturn = new HashSet<>();
|
Set<Pair<String, String>> toReturn = new HashSet<>();
|
||||||
// could be null, if there are no files in a given partition yet or if all index files have no ranges
|
// could be null, if there are no files in a given partition yet or if all index files have no ranges
|
||||||
if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
|
if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
|
||||||
toReturn.addAll(partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey));
|
partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file ->
|
||||||
|
toReturn.add(Pair.of(partitionPath, file)));
|
||||||
}
|
}
|
||||||
if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
|
if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
|
||||||
toReturn.addAll(partitionToFilesWithNoRanges.get(partitionPath));
|
partitionToFilesWithNoRanges.get(partitionPath).forEach(file ->
|
||||||
|
toReturn.add(Pair.of(partitionPath, file)));
|
||||||
}
|
}
|
||||||
return toReturn;
|
return toReturn;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -73,8 +73,8 @@ class KeyRangeNode implements Comparable<KeyRangeNode>, Serializable {
|
|||||||
*
|
*
|
||||||
* @param that the {@link KeyRangeNode} to be compared with
|
* @param that the {@link KeyRangeNode} to be compared with
|
||||||
* @return the result of comparison. 0 if both min and max are equal in both. 1 if this {@link KeyRangeNode} is
|
* @return the result of comparison. 0 if both min and max are equal in both. 1 if this {@link KeyRangeNode} is
|
||||||
* greater than the {@code that} keyRangeNode. -1 if {@code that} keyRangeNode is greater than this
|
* greater than the {@code that} keyRangeNode. -1 if {@code that} keyRangeNode is greater than this {@link
|
||||||
* {@link KeyRangeNode}
|
* KeyRangeNode}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(KeyRangeNode that) {
|
public int compareTo(KeyRangeNode that) {
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.index.bloom;
|
package org.apache.hudi.index.bloom;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -35,18 +37,15 @@ class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
|
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
|
||||||
Set<String> toReturn = new HashSet<>();
|
Set<Pair<String, String>> toReturn = new HashSet<>();
|
||||||
partitionToFileIndexInfo.values().forEach(indexInfos -> {
|
partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> {
|
||||||
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
|
bloomIndexFileInfoList.forEach(file -> {
|
||||||
// for each candidate file in partition, that needs to be compared.
|
if (shouldCompareWithFile(file, recordKey)) {
|
||||||
for (BloomIndexFileInfo indexInfo : indexInfos) {
|
toReturn.add(Pair.of(partition, file.getFileId()));
|
||||||
if (shouldCompareWithFile(indexInfo, recordKey)) {
|
|
||||||
toReturn.add(indexInfo.getFileId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
});
|
||||||
return toReturn;
|
return toReturn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.index.bloom;
|
package org.apache.hudi.index.bloom;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -41,14 +43,14 @@ class ListBasedIndexFileFilter implements IndexFileFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
|
public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
|
||||||
List<BloomIndexFileInfo> indexInfos = partitionToFileIndexInfo.get(partitionPath);
|
List<BloomIndexFileInfo> indexInfos = partitionToFileIndexInfo.get(partitionPath);
|
||||||
Set<String> toReturn = new HashSet<>();
|
Set<Pair<String, String>> toReturn = new HashSet<>();
|
||||||
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
|
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
|
||||||
// for each candidate file in partition, that needs to be compared.
|
// for each candidate file in partition, that needs to be compared.
|
||||||
for (BloomIndexFileInfo indexInfo : indexInfos) {
|
for (BloomIndexFileInfo indexInfo : indexInfos) {
|
||||||
if (shouldCompareWithFile(indexInfo, recordKey)) {
|
if (shouldCompareWithFile(indexInfo, recordKey)) {
|
||||||
toReturn.add(indexInfo.getFileId());
|
toReturn.add(Pair.of(partitionPath, indexInfo.getFileId()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -112,6 +112,9 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
|
|||||||
return getConfigBuilder().build();
|
return getConfigBuilder().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected HoodieWriteConfig getConfig(IndexType indexType) {
|
||||||
|
return getConfigBuilder(indexType).build();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get Config builder with default configs set.
|
* Get Config builder with default configs set.
|
||||||
@@ -127,7 +130,20 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
|
|||||||
*
|
*
|
||||||
* @return Config Builder
|
* @return Config Builder
|
||||||
*/
|
*/
|
||||||
|
HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
|
||||||
|
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
|
||||||
|
}
|
||||||
|
|
||||||
HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
|
HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
|
||||||
|
return getConfigBuilder(schemaStr, IndexType.BLOOM);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Config builder with default configs set.
|
||||||
|
*
|
||||||
|
* @return Config Builder
|
||||||
|
*/
|
||||||
|
HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
|
||||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
|
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
|
||||||
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
|
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
|
||||||
.withWriteStatusClass(MetadataMergeWriteStatus.class)
|
.withWriteStatusClass(MetadataMergeWriteStatus.class)
|
||||||
@@ -135,7 +151,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
|
|||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
|
||||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
||||||
.forTable("test-trip-table")
|
.forTable("test-trip-table")
|
||||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
|
||||||
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
||||||
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.exception.HoodieCommitException;
|
import org.apache.hudi.exception.HoodieCommitException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
|
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
@@ -382,6 +383,59 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
|||||||
HoodieWriteClient::upsert, true, 50, 150, 2);
|
HoodieWriteClient::upsert, true, 50, 150, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test update of a record to different partition with Global Index.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUpsertToDiffPartitionGlobalIndex() throws Exception {
|
||||||
|
HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false);
|
||||||
|
/**
|
||||||
|
* Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
|
||||||
|
*/
|
||||||
|
String newCommitTime = "001";
|
||||||
|
List<HoodieRecord> inserts1 = dataGen.generateInserts(newCommitTime, 10);
|
||||||
|
|
||||||
|
// Write 1 (only inserts)
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(inserts1, 1);
|
||||||
|
|
||||||
|
JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
|
||||||
|
List<WriteStatus> statuses = result.collect();
|
||||||
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
|
// check the partition metadata is written out
|
||||||
|
assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs);
|
||||||
|
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
||||||
|
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||||
|
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
|
||||||
|
}
|
||||||
|
assertEquals("Must contain " + 10 + " records", 10,
|
||||||
|
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write 2. Updates with different partition
|
||||||
|
*/
|
||||||
|
newCommitTime = "004";
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
List<HoodieRecord> updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1);
|
||||||
|
JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(updates1, 1);
|
||||||
|
|
||||||
|
JavaRDD<WriteStatus> result1 = client.upsert(updateRecords, newCommitTime);
|
||||||
|
List<WriteStatus> statuses1 = result1.collect();
|
||||||
|
assertNoWriteErrors(statuses1);
|
||||||
|
|
||||||
|
// check the partition metadata is written out
|
||||||
|
assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs);
|
||||||
|
// Check the entire dataset has all records still
|
||||||
|
fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
||||||
|
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||||
|
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
|
||||||
|
}
|
||||||
|
assertEquals("Must contain " + 10 + " records", 10,
|
||||||
|
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test scenario of new file-group getting added during upsert().
|
* Test scenario of new file-group getting added during upsert().
|
||||||
*/
|
*/
|
||||||
@@ -391,7 +445,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
|||||||
final int insertSplitLimit = 100;
|
final int insertSplitLimit = 100;
|
||||||
// setup the small file handling params
|
// setup the small file handling params
|
||||||
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
|
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
|
||||||
dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
|
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||||
|
|
||||||
HoodieWriteClient client = getHoodieWriteClient(config, false);
|
HoodieWriteClient client = getHoodieWriteClient(config, false);
|
||||||
|
|
||||||
@@ -504,7 +558,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
|||||||
final int insertSplitLimit = 100;
|
final int insertSplitLimit = 100;
|
||||||
// setup the small file handling params
|
// setup the small file handling params
|
||||||
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
|
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
|
||||||
dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
|
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||||
HoodieWriteClient client = getHoodieWriteClient(config, false);
|
HoodieWriteClient client = getHoodieWriteClient(config, false);
|
||||||
|
|
||||||
// Inserts => will write file1
|
// Inserts => will write file1
|
||||||
@@ -516,7 +570,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
|||||||
List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
|
List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
|
||||||
|
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
assertPartitionMetadata(new String[]{testPartitionPath}, fs);
|
assertPartitionMetadata(new String[] {testPartitionPath}, fs);
|
||||||
|
|
||||||
assertEquals("Just 1 file needs to be added.", 1, statuses.size());
|
assertEquals("Just 1 file needs to be added.", 1, statuses.size());
|
||||||
String file1 = statuses.get(0).getFileId();
|
String file1 = statuses.get(0).getFileId();
|
||||||
@@ -586,7 +640,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
|||||||
List<String> keysSoFar = new ArrayList<>();
|
List<String> keysSoFar = new ArrayList<>();
|
||||||
// setup the small file handling params
|
// setup the small file handling params
|
||||||
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
|
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
|
||||||
dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
|
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||||
|
|
||||||
HoodieWriteClient client = getHoodieWriteClient(config, false);
|
HoodieWriteClient client = getHoodieWriteClient(config, false);
|
||||||
|
|
||||||
@@ -714,7 +768,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
|||||||
final int insertSplitLimit = 100;
|
final int insertSplitLimit = 100;
|
||||||
// setup the small file handling params
|
// setup the small file handling params
|
||||||
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, true); // hold upto 200 records max
|
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, true); // hold upto 200 records max
|
||||||
dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
|
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||||
|
|
||||||
HoodieWriteClient client = getHoodieWriteClient(config, false);
|
HoodieWriteClient client = getHoodieWriteClient(config, false);
|
||||||
|
|
||||||
|
|||||||
@@ -315,6 +315,24 @@ public class HoodieTestDataGenerator {
|
|||||||
return updates;
|
return updates;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<HoodieRecord> generateUpdatesWithDiffPartition(String commitTime, List<HoodieRecord> baseRecords)
|
||||||
|
throws IOException {
|
||||||
|
List<HoodieRecord> updates = new ArrayList<>();
|
||||||
|
for (HoodieRecord baseRecord : baseRecords) {
|
||||||
|
String partition = baseRecord.getPartitionPath();
|
||||||
|
String newPartition = "";
|
||||||
|
if (partitionPaths[0].equalsIgnoreCase(partition)) {
|
||||||
|
newPartition = partitionPaths[1];
|
||||||
|
} else {
|
||||||
|
newPartition = partitionPaths[0];
|
||||||
|
}
|
||||||
|
HoodieKey key = new HoodieKey(baseRecord.getRecordKey(), newPartition);
|
||||||
|
HoodieRecord record = generateUpdateRecord(key, commitTime);
|
||||||
|
updates.add(record);
|
||||||
|
}
|
||||||
|
return updates;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned
|
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned
|
||||||
* list
|
* list
|
||||||
|
|||||||
@@ -22,7 +22,9 @@ import org.apache.hudi.HoodieClientTestHarness;
|
|||||||
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieIndexConfig;
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||||
import org.apache.hudi.index.bloom.HoodieBloomIndex;
|
import org.apache.hudi.index.bloom.HoodieBloomIndex;
|
||||||
|
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
|
||||||
import org.apache.hudi.index.hbase.HBaseIndex;
|
import org.apache.hudi.index.hbase.HBaseIndex;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@@ -62,5 +64,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
|||||||
config = clientConfigBuilder.withPath(basePath)
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
|
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
|
||||||
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieBloomIndex);
|
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieBloomIndex);
|
||||||
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
|
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
|
||||||
|
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieGlobalBloomIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,7 +61,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
private String schemaStr;
|
private String schemaStr;
|
||||||
private Schema schema;
|
private Schema schema;
|
||||||
|
|
||||||
public TestHoodieGlobalBloomIndex() throws Exception {}
|
public TestHoodieGlobalBloomIndex() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
@@ -171,7 +172,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
partitionToFileIndexInfo.put("2017/10/23",
|
partitionToFileIndexInfo.put("2017/10/23",
|
||||||
Arrays.asList(new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010")));
|
Arrays.asList(new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010")));
|
||||||
|
|
||||||
// the partition partition of the key of the incoming records will be ignored
|
// the partition of the key of the incoming records will be ignored
|
||||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
|
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
|
||||||
jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"),
|
jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"),
|
||||||
new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t);
|
new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t);
|
||||||
@@ -240,7 +241,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
TestRawTripPayload rowChange5 =
|
TestRawTripPayload rowChange5 =
|
||||||
new TestRawTripPayload("{\"_row_key\":\"003\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
|
new TestRawTripPayload("{\"_row_key\":\"003\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
|
||||||
HoodieRecord record5 =
|
HoodieRecord record5 =
|
||||||
new HoodieRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()), rowChange4);
|
new HoodieRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()), rowChange5);
|
||||||
|
|
||||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
|
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
|
||||||
|
|
||||||
@@ -257,7 +258,6 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||||
|
|
||||||
|
|
||||||
// Add some commits
|
// Add some commits
|
||||||
new File(basePath + "/.hoodie").mkdirs();
|
new File(basePath + "/.hoodie").mkdirs();
|
||||||
|
|
||||||
@@ -267,12 +267,19 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
||||||
if (record.getRecordKey().equals("000")) {
|
if (record.getRecordKey().equals("000")) {
|
||||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0)));
|
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0)));
|
||||||
|
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData());
|
||||||
} else if (record.getRecordKey().equals("001")) {
|
} else if (record.getRecordKey().equals("001")) {
|
||||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2)));
|
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2)));
|
||||||
|
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData());
|
||||||
} else if (record.getRecordKey().equals("002")) {
|
} else if (record.getRecordKey().equals("002")) {
|
||||||
assertTrue(!record.isCurrentLocationKnown());
|
assertTrue(!record.isCurrentLocationKnown());
|
||||||
|
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData());
|
||||||
|
} else if (record.getRecordKey().equals("003")) {
|
||||||
|
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
|
||||||
|
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData());
|
||||||
} else if (record.getRecordKey().equals("004")) {
|
} else if (record.getRecordKey().equals("004")) {
|
||||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
|
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
|
||||||
|
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user