From 78fd12259404cad8117862fdf0b948cbe115e453 Mon Sep 17 00:00:00 2001 From: rmpifer Date: Tue, 1 Dec 2020 13:50:46 -0800 Subject: [PATCH] [HUDI-1196] Update HoodieKey when deduplicating records with global index (#2248) - Works only for overwrite payload (default) - Does not alter current semantics otherwise Co-authored-by: Ryan Pifer --- .../apache/hudi/table/action/commit/SparkWriteHelper.java | 7 +++---- .../hudi/client/TestHoodieClientOnCopyOnWriteStorage.java | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java index 5f1a1ef55..a197c91da 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java @@ -59,10 +59,9 @@ public class SparkWriteHelper extends AbstractW }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") T reducedData = (T) rec1.getData().preCombine(rec2.getData()); - // we cannot allow the user to change the key or partitionPath, since that will affect - // everything - // so pick it from one of the records. - return new HoodieRecord(rec1.getKey(), reducedData); + HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); + + return new HoodieRecord(reducedKey, reducedData); }, parallelism).map(Tuple2::_2); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index d0fda81be..d278b08f3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -243,6 +243,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { when(index.isGlobal()).thenReturn(true); List> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect(); assertEquals(1, dedupedRecs.size()); + assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); // non-Global dedup should be done based on both recordKey and partitionPath