[HUDI-1673] Replace scala.Tule2 to Pair in FlinkHoodieBloomIndex (#2642)
This commit is contained in:
@@ -44,8 +44,6 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
import static java.util.stream.Collectors.mapping;
|
||||
import static java.util.stream.Collectors.groupingBy;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
@@ -106,14 +104,14 @@ public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends FlinkH
|
||||
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
|
||||
|
||||
// Step 2: Load all involved files as <Partition, filename> pairs
|
||||
List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
|
||||
List<Pair<String, BloomIndexFileInfo>> fileInfoList =
|
||||
loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
|
||||
fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
|
||||
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
|
||||
|
||||
// Step 3: Obtain a List, for each incoming record, that already exists, with the file id,
|
||||
// that contains it.
|
||||
List<Tuple2<String, HoodieKey>> fileComparisons =
|
||||
List<Pair<String, HoodieKey>> fileComparisons =
|
||||
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap);
|
||||
return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
|
||||
}
|
||||
@@ -122,7 +120,7 @@ public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends FlinkH
|
||||
* Load all involved files as <Partition, filename> pair List.
|
||||
*/
|
||||
//TODO duplicate code with spark, we can optimize this method later
|
||||
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
|
||||
List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
|
||||
final HoodieTable hoodieTable) {
|
||||
// Obtain the latest data files from all the partitions.
|
||||
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
|
||||
@@ -136,15 +134,15 @@ public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends FlinkH
|
||||
try {
|
||||
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
|
||||
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
|
||||
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
|
||||
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
|
||||
} catch (MetadataNotFoundException me) {
|
||||
LOG.warn("Unable to find range metadata in file :" + pf);
|
||||
return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
|
||||
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
|
||||
}
|
||||
}, Math.max(partitionPathFileIDList.size(), 1));
|
||||
} else {
|
||||
return partitionPathFileIDList.stream()
|
||||
.map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
|
||||
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,19 +184,19 @@ public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends FlinkH
|
||||
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
|
||||
* recordKey ranges in the index info.
|
||||
*/
|
||||
List<Tuple2<String, HoodieKey>> explodeRecordsWithFileComparisons(
|
||||
List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||
Map<String, List<String>> partitionRecordKeyMap) {
|
||||
IndexFileFilter indexFileFilter =
|
||||
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
|
||||
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
|
||||
|
||||
List<Tuple2<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
|
||||
List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
|
||||
partitionRecordKeyMap.keySet().forEach(partitionPath -> {
|
||||
List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
|
||||
hoodieRecordKeys.forEach(hoodieRecordKey -> {
|
||||
indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> {
|
||||
fileRecordPairs.add(new Tuple2<>(partitionFileIdPair.getRight(),
|
||||
fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
|
||||
new HoodieKey(hoodieRecordKey, partitionPath)));
|
||||
});
|
||||
});
|
||||
@@ -210,10 +208,10 @@ public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends FlinkH
|
||||
* Find out <RowKey, filename> pair.
|
||||
*/
|
||||
Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
|
||||
List<Tuple2<String, HoodieKey>> fileComparisons,
|
||||
List<Pair<String, HoodieKey>> fileComparisons,
|
||||
HoodieTable hoodieTable) {
|
||||
|
||||
fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1._1.compareTo(o2._1)).collect(toList());
|
||||
fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList());
|
||||
|
||||
List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();
|
||||
|
||||
@@ -244,17 +242,17 @@ public class FlinkHoodieBloomIndex<T extends HoodieRecordPayload> extends FlinkH
|
||||
records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
|
||||
// Here as the record might have more data than rowKey (some rowKeys' fileId is null),
|
||||
// so we do left outer join.
|
||||
List<Tuple2<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
|
||||
List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
|
||||
keyRecordPairMap.keySet().forEach(k -> {
|
||||
if (keyFilenamePair.containsKey(k)) {
|
||||
newList.add(new Tuple2(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
|
||||
newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
|
||||
} else {
|
||||
newList.add(new Tuple2(keyRecordPairMap.get(k), null));
|
||||
newList.add(Pair.of(keyRecordPairMap.get(k), null));
|
||||
}
|
||||
});
|
||||
List<HoodieRecord<T>> res = Lists.newArrayList();
|
||||
for (Tuple2<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
|
||||
res.add(HoodieIndexUtils.getTaggedRecord(v._1, Option.ofNullable(v._2)));
|
||||
for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
|
||||
res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight())));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
@@ -33,14 +33,12 @@ import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files.
|
||||
*/
|
||||
//TODO we can move this class into the hudi-client-common and reuse it for spark client
|
||||
public class HoodieFlinkBloomIndexCheckFunction
|
||||
implements Function<Iterator<Tuple2<String, HoodieKey>>, Iterator<List<KeyLookupResult>>> {
|
||||
implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<KeyLookupResult>>> {
|
||||
|
||||
private final HoodieTable hoodieTable;
|
||||
|
||||
@@ -52,25 +50,25 @@ public class HoodieFlinkBloomIndexCheckFunction
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<List<KeyLookupResult>> apply(Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
|
||||
public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
|
||||
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<? super V, ? extends Iterator<Tuple2<String, HoodieKey>>> before) {
|
||||
public <V> Function<V, Iterator<List<KeyLookupResult>>> compose(Function<? super V, ? extends Iterator<Pair<String, HoodieKey>>> before) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> Function<Iterator<Tuple2<String, HoodieKey>>, V> andThen(Function<? super Iterator<List<KeyLookupResult>>, ? extends V> after) {
|
||||
public <V> Function<Iterator<Pair<String, HoodieKey>>, V> andThen(Function<? super Iterator<List<KeyLookupResult>>, ? extends V> after) {
|
||||
return null;
|
||||
}
|
||||
|
||||
class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
|
||||
class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<KeyLookupResult>> {
|
||||
|
||||
private HoodieKeyLookupHandle keyLookupHandle;
|
||||
|
||||
LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
|
||||
LazyKeyCheckIterator(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
|
||||
super(filePartitionRecordKeyTripletItr);
|
||||
}
|
||||
|
||||
@@ -84,10 +82,10 @@ public class HoodieFlinkBloomIndexCheckFunction
|
||||
try {
|
||||
// process one file in each go.
|
||||
while (inputItr.hasNext()) {
|
||||
Tuple2<String, HoodieKey> currentTuple = inputItr.next();
|
||||
String fileId = currentTuple._1;
|
||||
String partitionPath = currentTuple._2.getPartitionPath();
|
||||
String recordKey = currentTuple._2.getRecordKey();
|
||||
Pair<String, HoodieKey> currentTuple = inputItr.next();
|
||||
String fileId = currentTuple.getLeft();
|
||||
String partitionPath = currentTuple.getRight().getPartitionPath();
|
||||
String recordKey = currentTuple.getRight().getRecordKey();
|
||||
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
|
||||
|
||||
// lazily init state
|
||||
|
||||
@@ -50,8 +50,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.UUID.randomUUID;
|
||||
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
||||
@@ -130,7 +128,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
|
||||
new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
|
||||
|
||||
List<String> partitions = asList("2016/01/21", "2016/04/01", "2015/03/12");
|
||||
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, context, hoodieTable);
|
||||
List<Pair<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, context, hoodieTable);
|
||||
// Still 0, as no valid commit
|
||||
assertEquals(0, filesList.size());
|
||||
|
||||
@@ -145,20 +143,20 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
|
||||
|
||||
if (rangePruning) {
|
||||
// these files will not have the key ranges
|
||||
assertNull(filesList.get(0)._2().getMaxRecordKey());
|
||||
assertNull(filesList.get(0)._2().getMinRecordKey());
|
||||
assertFalse(filesList.get(1)._2().hasKeyRanges());
|
||||
assertNotNull(filesList.get(2)._2().getMaxRecordKey());
|
||||
assertNotNull(filesList.get(2)._2().getMinRecordKey());
|
||||
assertTrue(filesList.get(3)._2().hasKeyRanges());
|
||||
assertNull(filesList.get(0).getRight().getMaxRecordKey());
|
||||
assertNull(filesList.get(0).getRight().getMinRecordKey());
|
||||
assertFalse(filesList.get(1).getRight().hasKeyRanges());
|
||||
assertNotNull(filesList.get(2).getRight().getMaxRecordKey());
|
||||
assertNotNull(filesList.get(2).getRight().getMinRecordKey());
|
||||
assertTrue(filesList.get(3).getRight().hasKeyRanges());
|
||||
|
||||
// no longer sorted, but should have same files.
|
||||
|
||||
List<Tuple2<String, BloomIndexFileInfo>> expected =
|
||||
asList(new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")),
|
||||
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")),
|
||||
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", "000")),
|
||||
new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", "003")));
|
||||
List<Pair<String, BloomIndexFileInfo>> expected =
|
||||
asList(Pair.of("2016/04/01", new BloomIndexFileInfo("2")),
|
||||
Pair.of("2015/03/12", new BloomIndexFileInfo("1")),
|
||||
Pair.of("2015/03/12", new BloomIndexFileInfo("3", "000", "000")),
|
||||
Pair.of("2015/03/12", new BloomIndexFileInfo("4", "001", "003")));
|
||||
assertEquals(expected, filesList);
|
||||
}
|
||||
}
|
||||
@@ -176,20 +174,20 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness {
|
||||
new BloomIndexFileInfo("f5", "009", "010")));
|
||||
|
||||
Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
|
||||
asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"),
|
||||
new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004"))
|
||||
asList(Pair.of("2017/10/22", "003"), Pair.of("2017/10/22", "002"),
|
||||
Pair.of("2017/10/22", "005"), Pair.of("2017/10/22", "004"))
|
||||
.forEach(t -> {
|
||||
List<String> recordKeyList = partitionRecordKeyMap.getOrDefault(t._1, new ArrayList<>());
|
||||
recordKeyList.add(t._2);
|
||||
partitionRecordKeyMap.put(t._1, recordKeyList);
|
||||
List<String> recordKeyList = partitionRecordKeyMap.getOrDefault(t.getLeft(), new ArrayList<>());
|
||||
recordKeyList.add(t.getRight());
|
||||
partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
|
||||
});
|
||||
|
||||
List<scala.Tuple2<String, HoodieKey>> comparisonKeyList =
|
||||
List<Pair<String, HoodieKey>> comparisonKeyList =
|
||||
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyMap);
|
||||
|
||||
assertEquals(10, comparisonKeyList.size());
|
||||
java.util.Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
|
||||
.collect(java.util.stream.Collectors.groupingBy(t -> t._2.getRecordKey(), java.util.stream.Collectors.mapping(t -> t._1, java.util.stream.Collectors.toList())));
|
||||
.collect(java.util.stream.Collectors.groupingBy(t -> t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> t.getLeft(), java.util.stream.Collectors.toList())));
|
||||
|
||||
assertEquals(4, recordKeyToFileComps.size());
|
||||
assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002")));
|
||||
|
||||
Reference in New Issue
Block a user