diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 616fca787..8ee4b46d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -386,6 +386,7 @@ public class HoodieAppendHandle extends try { // flush any remaining records to disk appendDataAndDeleteBlocks(header); + recordItr = null; if (writer != null) { writer.close(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index c49f7f167..7b1653853 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -532,6 +532,7 @@ public class StreamWriteFunction } bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); + records.clear(); final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() .taskID(taskID) .instantTime(instant) // the write instant may shift but the event still use the currentInstant. @@ -564,6 +565,7 @@ public class StreamWriteFunction } bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); + records.clear(); bucket.reset(); } });