1
0

[HUDI-2198] Clean and reset the bootstrap events for coordinator when task failover (#3304)

This commit is contained in:
Danny Chan
2021-07-21 10:13:05 +08:00
committed by GitHub
parent 634163a990
commit 858e84b5b2
6 changed files with 57 additions and 16 deletions

View File

@@ -458,7 +458,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath) HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterInflightsAndRequested(); .getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants() return unCompletedTimeline.getInstants()
.filter(x -> x.getAction().equals(actionType)) .filter(x -> x.getAction().equals(actionType) && x.isInflight())
.map(HoodieInstant::getTimestamp) .map(HoodieInstant::getTimestamp)
.collect(Collectors.toList()).stream() .collect(Collectors.toList()).stream()
.max(Comparator.naturalOrder()) .max(Comparator.naturalOrder())

View File

@@ -214,6 +214,7 @@ public class StreamWriteFunction<K, I, O>
} }
// blocks flushing until the coordinator starts a new instant // blocks flushing until the coordinator starts a new instant
this.confirming = true; this.confirming = true;
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
} }
@Override @Override
@@ -534,6 +535,7 @@ public class StreamWriteFunction<K, I, O>
DataBucket bucket = this.buckets.computeIfAbsent(bucketID, DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value)); k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
final DataItem item = DataItem.fromHoodieRecord(value); final DataItem item = DataItem.fromHoodieRecord(value);
boolean flushBucket = bucket.detector.detect(item); boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) { if (flushBucket) {

View File

@@ -243,7 +243,9 @@ public class StreamWriteOperatorCoordinator
@Override @Override
public void subtaskFailed(int i, @Nullable Throwable throwable) { public void subtaskFailed(int i, @Nullable Throwable throwable) {
// no operation // reset the event
this.eventBuffer[i] = null;
LOG.warn("Reset the event for task [" + i + "]", throwable);
} }
@Override @Override
@@ -328,8 +330,8 @@ public class StreamWriteOperatorCoordinator
} }
private void handleBootstrapEvent(WriteMetadataEvent event) { private void handleBootstrapEvent(WriteMetadataEvent event) {
addEventToBuffer(event); this.eventBuffer[event.getTaskID()] = event;
if (Arrays.stream(eventBuffer).allMatch(Objects::nonNull)) { if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
// start to initialize the instant. // start to initialize the instant.
initInstant(event.getInstantTime()); initInstant(event.getInstantTime());
} }

View File

@@ -194,10 +194,11 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
} else { } else {
location = getNewRecordLocation(partitionPath); location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey); this.context.setCurrentKey(recordKey);
}
// always refresh the index
if (isChangingRecords) { if (isChangingRecords) {
updateIndexState(partitionPath, location); updateIndexState(partitionPath, location);
} }
}
record.setCurrentLocation(location); record.setCurrentLocation(location);
out.collect((O) record); out.collect((O) record);
} }

View File

@@ -218,6 +218,29 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFails(2); funcWrapper.checkpointFails(2);
} }
@Test
public void testSubtaskFails() throws Exception {
// open the function and ingest data
funcWrapper.openFunction();
// no data written and triggers checkpoint fails,
// then we should revert the start instant
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
funcWrapper.getNextEvent();
String instant1 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
assertNotNull(instant1);
// fails the subtask
funcWrapper.subTaskFails(0);
String instant2 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant");
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null);
}
@Test @Test
public void testInsert() throws Exception { public void testInsert() throws Exception {
// open the function and ingest data // open the function and ingest data

View File

@@ -110,7 +110,7 @@ public class StreamWriteFunctionWrapper<I> {
this.gateway = new MockOperatorEventGateway(); this.gateway = new MockOperatorEventGateway();
this.conf = conf; this.conf = conf;
// one function // one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1, false);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
@@ -139,14 +139,7 @@ public class StreamWriteFunctionWrapper<I> {
bucketAssignerFunction.setContext(bucketAssignOperatorContext); bucketAssignerFunction.setContext(bucketAssignOperatorContext);
bucketAssignerFunction.initializeState(this.functionInitializationContext); bucketAssignerFunction.initializeState(this.functionInitializationContext);
writeFunction = new StreamWriteFunction<>(conf); setupWriteFunction();
writeFunction.setRuntimeContext(runtimeContext);
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.functionInitializationContext);
writeFunction.open(conf);
// handle the bootstrap event
coordinator.handleEventFromOperator(0, getNextEvent());
if (asyncCompaction) { if (asyncCompaction) {
compactFunctionWrapper.openFunction(); compactFunctionWrapper.openFunction();
@@ -240,6 +233,11 @@ public class StreamWriteFunctionWrapper<I> {
coordinator.notifyCheckpointAborted(checkpointId); coordinator.notifyCheckpointAborted(checkpointId);
} }
public void subTaskFails(int taskID) throws Exception {
coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
setupWriteFunction();
}
public void close() throws Exception { public void close() throws Exception {
coordinator.close(); coordinator.close();
ioManager.close(); ioManager.close();
@@ -270,6 +268,21 @@ public class StreamWriteFunctionWrapper<I> {
return this.bootstrapFunction.isAlreadyBootstrap(); return this.bootstrapFunction.isAlreadyBootstrap();
} }
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void setupWriteFunction() throws Exception {
writeFunction = new StreamWriteFunction<>(conf);
writeFunction.setRuntimeContext(runtimeContext);
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.functionInitializationContext);
writeFunction.open(conf);
// handle the bootstrap event
coordinator.handleEventFromOperator(0, getNextEvent());
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Inner Class // Inner Class
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------