[MINOR] Sync HUDI-1196 to FlinkWriteHelper (#2415)
This commit is contained in:
@@ -59,10 +59,8 @@ public class FlinkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractW
|
|||||||
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
|
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
|
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
|
||||||
// we cannot allow the user to change the key or partitionPath, since that will affect
|
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
|
||||||
// everything
|
return new HoodieRecord<T>(reducedKey, reducedData);
|
||||||
// so pick it from one of the records.
|
|
||||||
return new HoodieRecord<T>(rec1.getKey(), reducedData);
|
|
||||||
}).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
|
}).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user