[HUDI-4450] Revert the checkpoint abort notification (#6181)
This commit is contained in:
@@ -267,15 +267,6 @@ public class StreamWriteOperatorCoordinator
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyCheckpointAborted(long checkpointId) {
|
||||
if (checkpointId == this.checkpointId && !WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
|
||||
executor.execute(() -> {
|
||||
this.ckpMetadata.abortInstant(this.instant);
|
||||
}, "abort instant %s", this.instant);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
|
||||
// no operation
|
||||
|
||||
@@ -243,13 +243,6 @@ public abstract class AbstractStreamWriteFunction<I>
|
||||
return this.ckpMetadata.lastPendingInstant();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the instant is fresh new(not aborted).
|
||||
*/
|
||||
protected boolean freshInstant(String instant) {
|
||||
return !this.ckpMetadata.isAborted(instant);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepares the instant time to write with for next checkpoint.
|
||||
*
|
||||
@@ -286,6 +279,6 @@ public abstract class AbstractStreamWriteFunction<I>
|
||||
* Returns whether the pending instant is invalid to write with.
|
||||
*/
|
||||
private boolean invalidInstant(String instant, boolean hasData) {
|
||||
return instant.equals(this.currentInstant) && hasData && freshInstant(instant);
|
||||
return instant.equals(this.currentInstant) && hasData;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user