[HUDI-4332] The current instant may be wrong under some extreme conditions in AppendWriteFunction. (#5988)
This commit is contained in:
@@ -106,30 +106,29 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
|
|||||||
// Utilities
|
// Utilities
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
private void initWriterHelper() {
|
private void initWriterHelper() {
|
||||||
this.currentInstant = instantToWrite(true);
|
final String instant = instantToWrite(true);
|
||||||
if (this.currentInstant == null) {
|
if (instant == null) {
|
||||||
// in case there are empty checkpoints that has no input data
|
// in case there are empty checkpoints that has no input data
|
||||||
throw new HoodieException("No inflight instant when flushing data!");
|
throw new HoodieException("No inflight instant when flushing data!");
|
||||||
}
|
}
|
||||||
this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
|
this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
|
||||||
this.currentInstant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
|
instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
|
||||||
this.rowType);
|
this.rowType);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void flushData(boolean endInput) {
|
private void flushData(boolean endInput) {
|
||||||
final List<WriteStatus> writeStatus;
|
final List<WriteStatus> writeStatus;
|
||||||
final String instant;
|
|
||||||
if (this.writerHelper != null) {
|
if (this.writerHelper != null) {
|
||||||
writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
|
writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
|
||||||
instant = this.writerHelper.getInstantTime();
|
this.currentInstant = this.writerHelper.getInstantTime();
|
||||||
} else {
|
} else {
|
||||||
writeStatus = Collections.emptyList();
|
writeStatus = Collections.emptyList();
|
||||||
instant = instantToWrite(false);
|
this.currentInstant = instantToWrite(false);
|
||||||
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, instant);
|
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, this.currentInstant);
|
||||||
}
|
}
|
||||||
final WriteMetadataEvent event = WriteMetadataEvent.builder()
|
final WriteMetadataEvent event = WriteMetadataEvent.builder()
|
||||||
.taskID(taskID)
|
.taskID(taskID)
|
||||||
.instantTime(instant)
|
.instantTime(this.currentInstant)
|
||||||
.writeStatus(writeStatus)
|
.writeStatus(writeStatus)
|
||||||
.lastBatch(true)
|
.lastBatch(true)
|
||||||
.endInput(endInput)
|
.endInput(endInput)
|
||||||
|
|||||||
Reference in New Issue
Block a user