From c4a2ad2702c28fc03392a917a878570af40062db Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Sat, 5 Jun 2021 11:48:08 +0800 Subject: [PATCH] [HUDI-1954] only reset bucket when flush bucket success (#3029) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 喻兆靖 --- .../apache/hudi/sink/StreamWriteFunction.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 7679f2c24..c49f7f167 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -470,30 +470,34 @@ public class StreamWriteFunction boolean flushBucket = bucket.detector.detect(item); boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); if (flushBucket) { - flushBucket(bucket); - this.tracer.countDown(bucket.detector.totalSize); - bucket.reset(); + if (flushBucket(bucket)) { + this.tracer.countDown(bucket.detector.totalSize); + bucket.reset(); + } } else if (flushBuffer) { // find the max size bucket and flush it out List sortedBuckets = this.buckets.values().stream() .sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize)) .collect(Collectors.toList()); final DataBucket bucketToFlush = sortedBuckets.get(0); - flushBucket(bucketToFlush); - this.tracer.countDown(bucketToFlush.detector.totalSize); - bucketToFlush.reset(); + if (flushBucket(bucketToFlush)) { + this.tracer.countDown(bucketToFlush.detector.totalSize); + bucketToFlush.reset(); + } else { + LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize); + } } bucket.records.add(item); } @SuppressWarnings("unchecked, rawtypes") - private void flushBucket(DataBucket bucket) { + private boolean flushBucket(DataBucket bucket) { String instant = this.writeClient.getLastPendingInstant(this.actionType); if (instant == null) { // in case there are empty checkpoints that has no input data - LOG.info("No inflight instant when flushing data, cancel."); - return; + LOG.info("No inflight instant when flushing data, skip."); + return false; } // if exactly-once semantics turns on, @@ -536,6 +540,7 @@ public class StreamWriteFunction .isEndInput(false) .build(); this.eventGateway.sendEventToCoordinator(event); + return true; } @SuppressWarnings("unchecked, rawtypes") @@ -543,8 +548,7 @@ public class StreamWriteFunction this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data - LOG.info("No inflight instant when flushing data, cancel."); - return; + throw new HoodieException("No inflight instant when flushing data!"); } final List writeStatus; if (buckets.size() > 0) {