From c151147819abb6b4c418138d9b401f374fc021a0 Mon Sep 17 00:00:00 2001 From: Trevor Date: Sat, 9 Jan 2021 07:57:56 +0800 Subject: [PATCH] [MINOR] Sync HUDI-1196 to FlinkWriteHelper (#2415) --- .../apache/hudi/table/action/commit/FlinkWriteHelper.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index df106ce8d..191071e01 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -59,10 +59,8 @@ public class FlinkWriteHelper extends AbstractW return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { @SuppressWarnings("unchecked") T reducedData = (T) rec1.getData().preCombine(rec2.getData()); - // we cannot allow the user to change the key or partitionPath, since that will affect - // everything - // so pick it from one of the records. - return new HoodieRecord(rec1.getKey(), reducedData); + HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); + return new HoodieRecord(reducedKey, reducedData); }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList()); } }