diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 7679f2c24..c49f7f167 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -470,30 +470,34 @@ public class StreamWriteFunction boolean flushBucket = bucket.detector.detect(item); boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); if (flushBucket) { - flushBucket(bucket); - this.tracer.countDown(bucket.detector.totalSize); - bucket.reset(); + if (flushBucket(bucket)) { + this.tracer.countDown(bucket.detector.totalSize); + bucket.reset(); + } } else if (flushBuffer) { // find the max size bucket and flush it out List sortedBuckets = this.buckets.values().stream() .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize)) .collect(Collectors.toList()); final DataBucket bucketToFlush = sortedBuckets.get(0); - flushBucket(bucketToFlush); - this.tracer.countDown(bucketToFlush.detector.totalSize); - bucketToFlush.reset(); + if (flushBucket(bucketToFlush)) { + this.tracer.countDown(bucketToFlush.detector.totalSize); + bucketToFlush.reset(); + } else { + LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize); + } } bucket.records.add(item); } @SuppressWarnings("unchecked, rawtypes") - private void flushBucket(DataBucket bucket) { + private boolean flushBucket(DataBucket bucket) { String instant = this.writeClient.getLastPendingInstant(this.actionType); if (instant == null) { // in case there are empty checkpoints that has no input data - LOG.info("No inflight instant when flushing data, cancel."); - return; + LOG.info("No inflight instant when flushing data, skip."); + return false; } // if exactly-once semantics turns on, @@ -536,6 +540,7 @@ public class StreamWriteFunction .isEndInput(false) .build(); this.eventGateway.sendEventToCoordinator(event); + return true; } @SuppressWarnings("unchecked, rawtypes") @@ -543,8 +548,7 @@ public class StreamWriteFunction this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data - LOG.info("No inflight instant when flushing data, cancel."); - return; + throw new HoodieException("No inflight instant when flushing data!"); } final List writeStatus; if (buckets.size() > 0) {