[HUDI-1954] only reset bucket when flush bucket success (#3029)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
@@ -470,30 +470,34 @@ public class StreamWriteFunction<K, I, O>
|
||||
boolean flushBucket = bucket.detector.detect(item);
|
||||
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
|
||||
if (flushBucket) {
|
||||
flushBucket(bucket);
|
||||
this.tracer.countDown(bucket.detector.totalSize);
|
||||
bucket.reset();
|
||||
if (flushBucket(bucket)) {
|
||||
this.tracer.countDown(bucket.detector.totalSize);
|
||||
bucket.reset();
|
||||
}
|
||||
} else if (flushBuffer) {
|
||||
// find the max size bucket and flush it out
|
||||
List<DataBucket> sortedBuckets = this.buckets.values().stream()
|
||||
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
|
||||
.collect(Collectors.toList());
|
||||
final DataBucket bucketToFlush = sortedBuckets.get(0);
|
||||
flushBucket(bucketToFlush);
|
||||
this.tracer.countDown(bucketToFlush.detector.totalSize);
|
||||
bucketToFlush.reset();
|
||||
if (flushBucket(bucketToFlush)) {
|
||||
this.tracer.countDown(bucketToFlush.detector.totalSize);
|
||||
bucketToFlush.reset();
|
||||
} else {
|
||||
LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
|
||||
}
|
||||
}
|
||||
bucket.records.add(item);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked, rawtypes")
|
||||
private void flushBucket(DataBucket bucket) {
|
||||
private boolean flushBucket(DataBucket bucket) {
|
||||
String instant = this.writeClient.getLastPendingInstant(this.actionType);
|
||||
|
||||
if (instant == null) {
|
||||
// in case there are empty checkpoints that has no input data
|
||||
LOG.info("No inflight instant when flushing data, cancel.");
|
||||
return;
|
||||
LOG.info("No inflight instant when flushing data, skip.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// if exactly-once semantics turns on,
|
||||
@@ -536,6 +540,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
.isEndInput(false)
|
||||
.build();
|
||||
this.eventGateway.sendEventToCoordinator(event);
|
||||
return true;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked, rawtypes")
|
||||
@@ -543,8 +548,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
|
||||
if (this.currentInstant == null) {
|
||||
// in case there are empty checkpoints that has no input data
|
||||
LOG.info("No inflight instant when flushing data, cancel.");
|
||||
return;
|
||||
throw new HoodieException("No inflight instant when flushing data!");
|
||||
}
|
||||
final List<WriteStatus> writeStatus;
|
||||
if (buckets.size() > 0) {
|
||||
|
||||
Reference in New Issue
Block a user