[HUDI-1994] Release the new records iterator for append handle #close (#3058)
This commit is contained in:
@@ -386,6 +386,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
try {
|
try {
|
||||||
// flush any remaining records to disk
|
// flush any remaining records to disk
|
||||||
appendDataAndDeleteBlocks(header);
|
appendDataAndDeleteBlocks(header);
|
||||||
|
recordItr = null;
|
||||||
if (writer != null) {
|
if (writer != null) {
|
||||||
writer.close();
|
writer.close();
|
||||||
|
|
||||||
|
|||||||
@@ -532,6 +532,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
}
|
}
|
||||||
bucket.preWrite(records);
|
bucket.preWrite(records);
|
||||||
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
|
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
|
||||||
|
records.clear();
|
||||||
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
|
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
|
||||||
.taskID(taskID)
|
.taskID(taskID)
|
||||||
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
|
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
|
||||||
@@ -564,6 +565,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
}
|
}
|
||||||
bucket.preWrite(records);
|
bucket.preWrite(records);
|
||||||
writeStatus.addAll(writeFunction.apply(records, currentInstant));
|
writeStatus.addAll(writeFunction.apply(records, currentInstant));
|
||||||
|
records.clear();
|
||||||
bucket.reset();
|
bucket.reset();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user