[HUDI-1196] Update HoodieKey when deduplicating records with global index (#2248)
- Works only for overwrite payload (default) - Does not alter current semantics otherwise Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
This commit is contained in:
@@ -59,10 +59,9 @@ public class SparkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractW
|
|||||||
}).reduceByKey((rec1, rec2) -> {
|
}).reduceByKey((rec1, rec2) -> {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
|
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
|
||||||
// we cannot allow the user to change the key or partitionPath, since that will affect
|
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
|
||||||
// everything
|
|
||||||
// so pick it from one of the records.
|
return new HoodieRecord<T>(reducedKey, reducedData);
|
||||||
return new HoodieRecord<T>(rec1.getKey(), reducedData);
|
|
||||||
}, parallelism).map(Tuple2::_2);
|
}, parallelism).map(Tuple2::_2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -243,6 +243,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
when(index.isGlobal()).thenReturn(true);
|
when(index.isGlobal()).thenReturn(true);
|
||||||
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
|
List<HoodieRecord<RawTripTestPayload>> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
|
||||||
assertEquals(1, dedupedRecs.size());
|
assertEquals(1, dedupedRecs.size());
|
||||||
|
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
|
||||||
assertNodupesWithinPartition(dedupedRecs);
|
assertNodupesWithinPartition(dedupedRecs);
|
||||||
|
|
||||||
// non-Global dedup should be done based on both recordKey and partitionPath
|
// non-Global dedup should be done based on both recordKey and partitionPath
|
||||||
|
|||||||
Reference in New Issue
Block a user