1
0

Fixed HUDI-116 : Handle duplicate record keys across partitions

- Join based on HoodieKey and not RecordKey during tagging
 - Unit tests changed to run with duplicate keys
 - Special casing GlobalBloom to still join by recordkey
This commit is contained in:
Vinoth Chandar
2019-05-21 18:59:10 -07:00
committed by vinoth chandar
parent f120427607
commit 4074c5eb23
5 changed files with 82 additions and 90 deletions

View File

@@ -44,7 +44,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -86,25 +85,25 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
JavaPairRDD<HoodieKey, String> keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
// Cache the result, for subsequent stages.
if (config.getBloomIndexUseCaching()) {
rowKeyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
}
if (logger.isDebugEnabled()) {
long totalTaggedRecords = rowKeyFilenamePairRDD.count();
long totalTaggedRecords = keyFilenamePairRDD.count();
logger.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
}
// Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys
// Cost: 4 sec.
JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(rowKeyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD,
recordRDD);
if (config.getBloomIndexUseCaching()) {
recordRDD.unpersist(); // unpersist the input Record RDD
rowKeyFilenamePairRDD.unpersist();
keyFilenamePairRDD.unpersist();
}
return taggedRecordRDD;
@@ -116,23 +115,21 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
JavaPairRDD<HoodieKey, String> keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
JavaPairRDD<HoodieKey, String> keyHoodieKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD = hoodieKeys
.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(keyPathTuple -> {
return keyHoodieKeyPairRDD.leftOuterJoin(keyFilenamePairRDD).mapToPair(keyPathTuple -> {
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
String fileName = keyPathTuple._2._2.get();
String partitionPath = keyPathTuple._2._1.getPartitionPath();
String partitionPath = keyPathTuple._1.getPartitionPath();
recordLocationPath = Optional
.of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath), fileName)
.toUri().getPath());
} else {
recordLocationPath = Optional.absent();
}
return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
return new Tuple2<>(keyPathTuple._1, recordLocationPath);
});
}
@@ -140,7 +137,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
* Lookup the location for each record key and return the pair<record_key,location> for all record keys already
* present and drop the record keys if not present
*/
private JavaPairRDD<String, String> lookupIndex(
private JavaPairRDD<HoodieKey, String> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext
jsc, final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
@@ -327,7 +324,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
* parallelism for tagging location
*/
@VisibleForTesting
JavaPairRDD<String, String> findMatchingFilesForRecordKeys(
JavaPairRDD<HoodieKey, String> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient,
Map<String, Long> fileGroupToComparisons) {
@@ -354,38 +351,41 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
.flatMap(List::iterator)
.filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new Tuple2<>(recordKey, lookupResult.getFileName()))
.map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
lookupResult.getFileName()))
.collect(Collectors.toList())
.iterator());
}
HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord, org.apache.spark.api.java.Optional<String> location) {
HoodieRecord<T> record = inputRecord;
if (location.isPresent()) {
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD
// will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
// separate filenames that the record is found in. This will result in setting
// currentLocation 2 times and it will fail the second time. So creating a new in memory
// copy of the hoodie record.
record = new HoodieRecord<>(inputRecord);
String filename = location.get();
if (filename != null && !filename.isEmpty()) {
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
}
return record;
}
/**
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
*/
private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<String, String> rowKeyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<HoodieKey, String> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
// so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD
// will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
// separate filenames that the record is found in. This will result in setting
// currentLocation 2 times and it will fail the second time. So creating a new in memory
// copy of the hoodie record.
record = new HoodieRecord<>(v1._1());
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
}
return record;
});
return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values().map(v1 -> getTaggedRecord(v1._1, v1._2));
}
@Override

View File

@@ -177,7 +177,7 @@ public class HoodieBloomIndexCheckFunction implements
checkAndAddCandidates(recordKey);
} else {
// do the actual checking of file & break out
ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile()));
ret.add(new KeyLookupResult(currentFile, currentPartitionPath, checkAgainstCurrentFile()));
initState(fileName, partitionPath);
checkAndAddCandidates(recordKey);
break;
@@ -186,7 +186,7 @@ public class HoodieBloomIndexCheckFunction implements
// handle case, where we ran out of input, close pending work, update return val
if (!inputItr.hasNext()) {
ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile()));
ret.add(new KeyLookupResult(currentFile, currentPartitionPath, checkAgainstCurrentFile()));
}
} catch (Throwable e) {
if (e instanceof HoodieException) {

View File

@@ -20,6 +20,7 @@ package com.uber.hoodie.index.bloom;
import com.google.common.annotations.VisibleForTesting;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
@@ -32,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -99,4 +99,25 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
/**
* Tagging for global index should only consider the record key
*/
@Override
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<HoodieKey, String> keyFilenamePairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
// so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), p._2)))
.values().map(value -> getTaggedRecord(value._1, value._2));
}
@Override
public boolean isGlobal() {
return true;
}
}

View File

@@ -27,9 +27,11 @@ public class KeyLookupResult {
private final String fileName;
private final List<String> matchingRecordKeys;
private final String partitionPath;
public KeyLookupResult(String fileName, List<String> matchingRecordKeys) {
public KeyLookupResult(String fileName, String partitionPath, List<String> matchingRecordKeys) {
this.fileName = fileName;
this.partitionPath = partitionPath;
this.matchingRecordKeys = matchingRecordKeys;
}
@@ -40,4 +42,8 @@ public class KeyLookupResult {
public List<String> getMatchingRecordKeys() {
return matchingRecordKeys;
}
public String getPartitionPath() {
return partitionPath;
}
}