diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java index 7b40718b3..6e275dc8c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java @@ -106,30 +106,29 @@ public class AppendWriteFunction extends AbstractStreamWriteFunction { // Utilities // ------------------------------------------------------------------------- private void initWriterHelper() { - this.currentInstant = instantToWrite(true); - if (this.currentInstant == null) { + final String instant = instantToWrite(true); + if (instant == null) { // in case there are empty checkpoints that has no input data throw new HoodieException("No inflight instant when flushing data!"); } this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), - this.currentInstant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), + instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), this.rowType); } private void flushData(boolean endInput) { final List writeStatus; - final String instant; if (this.writerHelper != null) { writeStatus = this.writerHelper.getWriteStatuses(this.taskID); - instant = this.writerHelper.getInstantTime(); + this.currentInstant = this.writerHelper.getInstantTime(); } else { writeStatus = Collections.emptyList(); - instant = instantToWrite(false); - LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, instant); + this.currentInstant = instantToWrite(false); + LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, this.currentInstant); } final WriteMetadataEvent event = WriteMetadataEvent.builder() .taskID(taskID) - .instantTime(instant) + .instantTime(this.currentInstant) .writeStatus(writeStatus) .lastBatch(true) .endInput(endInput)