diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 876cce132..7f85deadc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -55,6 +55,7 @@ public class FlinkCreateHandle private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class); private long lastFileSize = 0L; + private long totalRecordsWritten = 0L; public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { @@ -143,6 +144,7 @@ public class FlinkCreateHandle try { setupWriteStatus(); // reset the write status + totalRecordsWritten += recordsWritten; recordsWritten = 0; recordsDeleted = 0; insertRecordsWritten = 0; @@ -169,7 +171,7 @@ public class FlinkCreateHandle @Override public void finishWrite() { - LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); + LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + totalRecordsWritten); try { fileWriter.close(); } catch (IOException e) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 9db472f52..5a04a35da 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -142,6 +142,22 @@ public class StreamWriteFunction */ private transient TotalSizeTracer tracer; + /** + * Flag saying whether the write task is waiting for the checkpoint success notification + * after it finished a checkpoint. + * + *

The flag is needed because the write task does not block during the waiting time interval, + * some data buckets still flush out with old instant time. There are two cases that the flush may produce + * corrupted files if the old instant is committed successfully: + * 1) the write handle was writing data but interrupted, left a corrupted parquet file; + * 2) the write handle finished the write but was not closed, left an empty parquet file. + * + *

To solve, when this flag was set to true, we flush the data buffer with a new instant time = old instant time + 1ms, + * the new instant time would affect the write file name. The filesystem view does not recognize the file as committed because + * it always filters the data files based on successful commit time. + */ + private volatile boolean confirming = false; + /** * Constructs a StreamingSinkFunction. * @@ -191,7 +207,7 @@ public class StreamWriteFunction @Override public void notifyCheckpointComplete(long checkpointId) { - this.writeClient.cleanHandles(); + this.confirming = false; } /** @@ -392,21 +408,27 @@ public class StreamWriteFunction @SuppressWarnings("unchecked, rawtypes") private void flushBucket(DataBucket bucket) { - this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); - if (this.currentInstant == null) { + final String instant = this.writeClient.getLastPendingInstant(this.actionType); + + if (instant == null) { // in case there are empty checkpoints that has no input data LOG.info("No inflight instant when flushing data, cancel."); return; } + + // if we are waiting for the checkpoint notification, shift the write instant time. + boolean shift = confirming && StreamerUtil.equal(instant, this.currentInstant); + final String flushInstant = shift ? StreamerUtil.instantTimePlus(instant, 1) : instant; + List records = bucket.records; ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } - final List writeStatus = new ArrayList<>(writeFunction.apply(records, currentInstant)); + final List writeStatus = new ArrayList<>(writeFunction.apply(records, flushInstant)); final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() .taskID(taskID) - .instantTime(currentInstant) + .instantTime(instant) // the write instant may shift but the event still use the currentInstant. .writeStatus(writeStatus) .isLastBatch(false) .isEndInput(false) @@ -451,6 +473,7 @@ public class StreamWriteFunction this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); - this.currentInstant = ""; + this.writeClient.cleanHandles(); + this.confirming = true; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index d08d68e62..e585c72c6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -58,6 +58,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.BufferedReader; import java.io.IOException; import java.io.StringReader; @@ -324,4 +326,19 @@ public class StreamerUtil { return new HoodieFlinkWriteClient<>(context, getHoodieClientConfig(conf)); } + + /** + * Plus the old instant time with given milliseconds and returns. + */ + public static String instantTimePlus(String oldInstant, long milliseconds) { + long oldTime = Long.parseLong(oldInstant); + return String.valueOf(oldTime + milliseconds); + } + + /** + * Copied from Objects#equal. + */ + public static boolean equal(@Nullable Object a, @Nullable Object b) { + return a == b || (a != null && a.equals(b)); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index a4b6c16a3..5050109a8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -163,7 +163,6 @@ public class StreamWriteFunctionWrapper { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); - this.writeFunction.notifyCheckpointComplete(checkpointId); if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { try { compactFunctionWrapper.compact(checkpointId);