From 858e84b5b2f1c5fbd4266922b494ad0d16f5b92a Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 21 Jul 2021 10:13:05 +0800 Subject: [PATCH] [HUDI-2198] Clean and reset the bootstrap events for coordinator when task failover (#3304) --- .../hudi/client/HoodieFlinkWriteClient.java | 2 +- .../apache/hudi/sink/StreamWriteFunction.java | 2 ++ .../sink/StreamWriteOperatorCoordinator.java | 8 +++-- .../partitioner/BucketAssignFunction.java | 7 +++-- .../hudi/sink/TestWriteCopyOnWrite.java | 23 ++++++++++++++ .../utils/StreamWriteFunctionWrapper.java | 31 +++++++++++++------ 6 files changed, 57 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 05e4481ec..45701bbba 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -458,7 +458,7 @@ public class HoodieFlinkWriteClient extends HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath) .getCommitsTimeline().filterInflightsAndRequested(); return unCompletedTimeline.getInstants() - .filter(x -> x.getAction().equals(actionType)) + .filter(x -> x.getAction().equals(actionType) && x.isInflight()) .map(HoodieInstant::getTimestamp) .collect(Collectors.toList()).stream() .max(Comparator.naturalOrder()) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 788e928b9..6b5b9f939 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -214,6 +214,7 @@ public class StreamWriteFunction } // blocks flushing until the coordinator starts a new instant this.confirming = true; + this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); } @Override @@ -534,6 +535,7 @@ public class StreamWriteFunction DataBucket bucket = this.buckets.computeIfAbsent(bucketID, k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value)); final DataItem item = DataItem.fromHoodieRecord(value); + boolean flushBucket = bucket.detector.detect(item); boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); if (flushBucket) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 303532aa9..2236415a7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -243,7 +243,9 @@ public class StreamWriteOperatorCoordinator @Override public void subtaskFailed(int i, @Nullable Throwable throwable) { - // no operation + // reset the event + this.eventBuffer[i] = null; + LOG.warn("Reset the event for task [" + i + "]", throwable); } @Override @@ -328,8 +330,8 @@ public class StreamWriteOperatorCoordinator } private void handleBootstrapEvent(WriteMetadataEvent event) { - addEventToBuffer(event); - if (Arrays.stream(eventBuffer).allMatch(Objects::nonNull)) { + this.eventBuffer[event.getTaskID()] = event; + if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) { // start to initialize the instant. initInstant(event.getInstantTime()); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 75a345430..ff859eef1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -194,9 +194,10 @@ public class BucketAssignFunction> } else { location = getNewRecordLocation(partitionPath); this.context.setCurrentKey(recordKey); - if (isChangingRecords) { - updateIndexState(partitionPath, location); - } + } + // always refresh the index + if (isChangingRecords) { + updateIndexState(partitionPath, location); } record.setCurrentLocation(location); out.collect((O) record); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index e145076bc..7e060f710 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -218,6 +218,29 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointFails(2); } + @Test + public void testSubtaskFails() throws Exception { + // open the function and ingest data + funcWrapper.openFunction(); + // no data written and triggers checkpoint fails, + // then we should revert the start instant + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + funcWrapper.getNextEvent(); + + String instant1 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); + assertNotNull(instant1); + + // fails the subtask + funcWrapper.subTaskFails(0); + + String instant2 = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); + assertNotEquals(instant2, instant1, "The previous instant should be rolled back when starting new instant"); + + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, null); + } + @Test public void testInsert() throws Exception { // open the function and ingest data diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index fb3971444..d1475bb3d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -110,7 +110,7 @@ public class StreamWriteFunctionWrapper { this.gateway = new MockOperatorEventGateway(); this.conf = conf; // one function - this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); + this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1, false); this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); @@ -139,14 +139,7 @@ public class StreamWriteFunctionWrapper { bucketAssignerFunction.setContext(bucketAssignOperatorContext); bucketAssignerFunction.initializeState(this.functionInitializationContext); - writeFunction = new StreamWriteFunction<>(conf); - writeFunction.setRuntimeContext(runtimeContext); - writeFunction.setOperatorEventGateway(gateway); - writeFunction.initializeState(this.functionInitializationContext); - writeFunction.open(conf); - - // handle the bootstrap event - coordinator.handleEventFromOperator(0, getNextEvent()); + setupWriteFunction(); if (asyncCompaction) { compactFunctionWrapper.openFunction(); @@ -240,6 +233,11 @@ public class StreamWriteFunctionWrapper { coordinator.notifyCheckpointAborted(checkpointId); } + public void subTaskFails(int taskID) throws Exception { + coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception")); + setupWriteFunction(); + } + public void close() throws Exception { coordinator.close(); ioManager.close(); @@ -270,6 +268,21 @@ public class StreamWriteFunctionWrapper { return this.bootstrapFunction.isAlreadyBootstrap(); } + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void setupWriteFunction() throws Exception { + writeFunction = new StreamWriteFunction<>(conf); + writeFunction.setRuntimeContext(runtimeContext); + writeFunction.setOperatorEventGateway(gateway); + writeFunction.initializeState(this.functionInitializationContext); + writeFunction.open(conf); + + // handle the bootstrap event + coordinator.handleEventFromOperator(0, getNextEvent()); + } + // ------------------------------------------------------------------------- // Inner Class // -------------------------------------------------------------------------