1
0

[HUDI-2696] Remove the aborted checkpoint notification from coordinator (#3926)

This commit is contained in:
Danny Chan
2021-11-05 16:37:23 +08:00
committed by GitHub
parent f67da0c7d0
commit 3af6568d31
7 changed files with 32 additions and 66 deletions

View File

@@ -235,16 +235,6 @@ public class StreamWriteOperatorCoordinator
); );
} }
@Override
public void notifyCheckpointAborted(long checkpointId) {
// once the checkpoint was aborted, unblock the writer tasks to
// reuse the last instant.
if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
executor.execute(() -> sendCommitAckEvents(checkpointId),
"unblock data write with aborted checkpoint %s", checkpointId);
}
}
@Override @Override
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
// no operation // no operation
@@ -334,8 +324,10 @@ public class StreamWriteOperatorCoordinator
private void startInstant() { private void startInstant() {
final String instant = HoodieActiveTimeline.createNewInstantTime(); final String instant = HoodieActiveTimeline.createNewInstantTime();
this.writeClient.startCommitWithTime(instant, tableState.commitAction); // put the assignment in front of metadata generation,
// because the instant request from write task is asynchronous.
this.instant = instant; this.instant = instant;
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
this.writeClient.upgradeDowngrade(this.instant); this.writeClient.upgradeDowngrade(this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,

View File

@@ -183,14 +183,20 @@ public class BulkInsertWriteFunction<I>
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
} }
/**
* Returns the last pending instant time.
*/
protected String lastPendingInstant() {
return StreamerUtil.getLastPendingInstant(this.metaClient);
}
private String instantToWrite() { private String instantToWrite() {
String instant = StreamerUtil.getLastPendingInstant(this.metaClient); String instant = lastPendingInstant();
// if exactly-once semantics turns on, // if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits. // waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder() TimeWait timeWait = TimeWait.builder()
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize") .action("instant initialize")
.throwsT(true)
.build(); .build();
while (instant == null || instant.equals(this.initInstant)) { while (instant == null || instant.equals(this.initInstant)) {
// wait condition: // wait condition:
@@ -199,7 +205,7 @@ public class BulkInsertWriteFunction<I>
// sleep for a while // sleep for a while
timeWait.waitFor(); timeWait.waitFor();
// refresh the inflight instant // refresh the inflight instant
instant = StreamerUtil.getLastPendingInstant(this.metaClient); instant = lastPendingInstant();
} }
return instant; return instant;
} }

View File

@@ -114,11 +114,6 @@ public abstract class AbstractStreamWriteFunction<I>
*/ */
protected List<WriteStatus> writeStatuses; protected List<WriteStatus> writeStatuses;
/**
* Current checkpoint id.
*/
private long checkpointId = -1;
/** /**
* Constructs a StreamWriteFunctionBase. * Constructs a StreamWriteFunctionBase.
* *
@@ -152,7 +147,6 @@ public abstract class AbstractStreamWriteFunction<I>
@Override @Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
this.checkpointId = functionSnapshotContext.getCheckpointId();
snapshotState(); snapshotState();
// Reload the snapshot state as the current state. // Reload the snapshot state as the current state.
reloadWriteMetaState(); reloadWriteMetaState();
@@ -216,11 +210,8 @@ public abstract class AbstractStreamWriteFunction<I>
public void handleOperatorEvent(OperatorEvent event) { public void handleOperatorEvent(OperatorEvent event) {
ValidationUtils.checkArgument(event instanceof CommitAckEvent, ValidationUtils.checkArgument(event instanceof CommitAckEvent,
"The write function can only handle CommitAckEvent"); "The write function can only handle CommitAckEvent");
long checkpointId = ((CommitAckEvent) event).getCheckpointId();
if (checkpointId == -1 || checkpointId == this.checkpointId) {
this.confirming = false; this.confirming = false;
} }
}
/** /**
* Returns the last pending instant time. * Returns the last pending instant time.
@@ -249,16 +240,9 @@ public abstract class AbstractStreamWriteFunction<I>
// 2. the inflight instant does not change and the checkpoint has buffering data // 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null || (instant.equals(this.currentInstant) && hasData)) { if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while // sleep for a while
boolean timeout = timeWait.waitFor(); timeWait.waitFor();
if (timeout && instant != null) {
// if the timeout threshold hits but the last instant still not commit,
// and the task does not receive commit ask event(no data or aborted checkpoint),
// assumes the checkpoint was canceled silently and unblock the data flushing
confirming = false;
} else {
// refresh the inflight instant // refresh the inflight instant
instant = lastPendingInstant(); instant = lastPendingInstant();
}
} else { } else {
// the pending instant changed, that means the last instant was committed // the pending instant changed, that means the last instant was committed
// successfully. // successfully.

View File

@@ -35,14 +35,13 @@ public class TimeWait {
private final long timeout; // timeout in SECONDS private final long timeout; // timeout in SECONDS
private final long interval; // interval in MILLISECONDS private final long interval; // interval in MILLISECONDS
private final String action; // action to report error message private final String action; // action to report error message
private final boolean throwsT; // whether to throw when timeout
private long waitingTime = 0L; private long waitingTime = 0L;
private TimeWait(long timeout, long interval, String action, boolean throwsT) { private TimeWait(long timeout, long interval, String action) {
this.timeout = timeout; this.timeout = timeout;
this.interval = interval; this.interval = interval;
this.action = action; this.action = action;
this.throwsT = throwsT;
} }
public static Builder builder() { public static Builder builder() {
@@ -51,23 +50,14 @@ public class TimeWait {
/** /**
* Wait for an interval time. * Wait for an interval time.
*
* @return true if is timed out
*/ */
public boolean waitFor() { public void waitFor() {
try { try {
if (waitingTime > timeout) { if (waitingTime > timeout) {
final String msg = "Timeout(" + waitingTime + "ms) while waiting for " + action; throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for " + action);
if (this.throwsT) {
throw new HoodieException(msg);
} else {
LOG.warn(msg);
return true;
}
} }
TimeUnit.MILLISECONDS.sleep(interval); TimeUnit.MILLISECONDS.sleep(interval);
waitingTime += interval; waitingTime += interval;
return false;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new HoodieException("Error while waiting for " + action, e); throw new HoodieException("Error while waiting for " + action, e);
} }
@@ -80,7 +70,6 @@ public class TimeWait {
private long timeout = 5 * 60 * 1000L; // default 5 minutes private long timeout = 5 * 60 * 1000L; // default 5 minutes
private long interval = 1000; private long interval = 1000;
private String action; private String action;
private boolean throwsT = false;
private Builder() { private Builder() {
} }
@@ -102,14 +91,9 @@ public class TimeWait {
return this; return this;
} }
public Builder throwsT(boolean throwsT) {
this.throwsT = throwsT;
return this;
}
public TimeWait build() { public TimeWait build() {
Objects.requireNonNull(this.action); Objects.requireNonNull(this.action);
return new TimeWait(this.timeout, this.interval, this.action, this.throwsT); return new TimeWait(this.timeout, this.interval, this.action);
} }
} }
} }

View File

@@ -468,7 +468,7 @@ public class StreamerUtil {
if (reloadTimeline) { if (reloadTimeline) {
metaClient.reloadActiveTimeline(); metaClient.reloadActiveTimeline();
} }
return metaClient.getCommitsTimeline().filterInflightsAndRequested() return metaClient.getCommitsTimeline().filterInflights()
.lastInstant() .lastInstant()
.map(HoodieInstant::getTimestamp) .map(HoodieInstant::getTimestamp)
.orElse(null); .orElse(null);

View File

@@ -95,8 +95,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.assertEmptyEvent() .assertEmptyEvent()
.checkpointFails(1) .checkpointFails(1)
.consume(TestData.DATA_SET_INSERT) .consume(TestData.DATA_SET_INSERT)
.checkpointNotThrow(2, .checkpointThrows(2,
"The stream writer reuse the last instant time when waiting for the last instant commit timeout") "Timeout(1000ms) while waiting for instant initialize")
// do not send the write event and fails the checkpoint, // do not send the write event and fails the checkpoint,
// behaves like the last checkpoint is successful. // behaves like the last checkpoint is successful.
.checkpointFails(2) .checkpointFails(2)
@@ -390,7 +390,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.consume(TestData.DATA_SET_INSERT) .consume(TestData.DATA_SET_INSERT)
.assertNotConfirming() .assertNotConfirming()
.checkpoint(2) .checkpoint(2)
.assertConsumeDoesNotThrow(TestData.DATA_SET_INSERT) .assertConsumeThrows(TestData.DATA_SET_INSERT,
"Timeout(1000ms) while waiting for instant initialize")
.end(); .end();
} }

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestData;
@@ -51,11 +52,11 @@ import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
@@ -151,10 +152,8 @@ public class TestWriteBase {
return this; return this;
} }
public TestHarness assertConsumeDoesNotThrow(List<RowData> inputs) { public TestHarness assertConsumeThrows(List<RowData> inputs, String message) {
assertDoesNotThrow(() -> { assertThrows(HoodieException.class, () -> consume(inputs), message);
consume(inputs);
}, "The stream writer reuse the last instant time when waiting for the last instant commit timeout");
return this; return this;
} }
@@ -294,9 +293,9 @@ public class TestWriteBase {
return this; return this;
} }
public TestHarness checkpointNotThrow(long checkpointId, String message) { public TestHarness checkpointThrows(long checkpointId, String message) {
// this returns early because there is no inflight instant // this returns early because there is no inflight instant
assertDoesNotThrow(() -> checkpoint(checkpointId), message); assertThrows(HoodieException.class, () -> checkpoint(checkpointId), message);
return this; return this;
} }