From 125415a8b89bb9190d83b3ad74e68f1d86a05118 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 10 Jun 2021 19:09:23 +0800 Subject: [PATCH] [HUDI-1994] Release the new records iterator for append handle #close (#3058) --- .../src/main/java/org/apache/hudi/io/HoodieAppendHandle.java | 1 + .../src/main/java/org/apache/hudi/sink/StreamWriteFunction.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 616fca787..8ee4b46d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -386,6 +386,7 @@ public class HoodieAppendHandle extends try { // flush any remaining records to disk appendDataAndDeleteBlocks(header); + recordItr = null; if (writer != null) { writer.close(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index c49f7f167..7b1653853 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -532,6 +532,7 @@ public class StreamWriteFunction } bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); + records.clear(); final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() .taskID(taskID) .instantTime(instant) // the write instant may shift but the event still use the currentInstant. @@ -564,6 +565,7 @@ public class StreamWriteFunction } bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); + records.clear(); bucket.reset(); } });