From de8161ae96c4cec1e4c9e40069f286439f0c3261 Mon Sep 17 00:00:00 2001 From: luokey <854194341@qq.com> Date: Fri, 18 Feb 2022 17:31:38 +0800 Subject: [PATCH] HoodieSortedMergeHandle#close write data disorder (#4841) Co-authored-by: 854194341@qq.com --- .../java/org/apache/hudi/io/HoodieSortedMergeHandle.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 4111abfae..897491b90 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -108,8 +108,9 @@ public class HoodieSortedMergeHandle ext @Override public List close() { // write out any pending records (this can happen when inserts are turned into updates) - newRecordKeysSorted.stream().forEach(key -> { + while (!newRecordKeysSorted.isEmpty()) { try { + String key = newRecordKeysSorted.poll(); HoodieRecord hoodieRecord = keyToNewRecords.get(key); if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { if (useWriterSchema) { @@ -122,7 +123,7 @@ public class HoodieSortedMergeHandle ext } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); } - }); + } newRecordKeysSorted.clear(); keyToNewRecords.clear();