[HUDI-2049] StreamWriteFunction should wait for the next inflight instant time before flushing (#3123)
This commit is contained in:
@@ -54,7 +54,6 @@ import java.util.HashMap;
|
|||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
@@ -494,7 +493,7 @@ public class StreamWriteFunction<K, I, O>
|
|||||||
long waitingTime = 0L;
|
long waitingTime = 0L;
|
||||||
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
|
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
|
||||||
long interval = 500L;
|
long interval = 500L;
|
||||||
while (Objects.equals(instant, this.currentInstant)) {
|
while (instant == null || instant.equals(this.currentInstant)) {
|
||||||
// sleep for a while
|
// sleep for a while
|
||||||
try {
|
try {
|
||||||
if (waitingTime > ckpTimeout) {
|
if (waitingTime > ckpTimeout) {
|
||||||
|
|||||||
Reference in New Issue
Block a user