diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index 4bb703e10..d39ab7c39 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -41,16 +41,9 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.state.CheckpointListener; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.scala.typeutils.Types; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.util.Collector; @@ -73,8 +66,7 @@ import static java.util.stream.Collectors.toList; *

The output records should then shuffle by the recordKey and thus do scalable write. */ public class BootstrapFunction - extends ProcessFunction - implements CheckpointedFunction, CheckpointListener { + extends ProcessFunction { private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class); @@ -86,7 +78,6 @@ public class BootstrapFunction private transient HoodieWriteConfig writeConfig; private GlobalAggregateManager aggregateManager; - private ListState bootstrapState; private final Pattern pattern; private boolean alreadyBootstrap; @@ -96,20 +87,6 @@ public class BootstrapFunction this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX)); } - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - this.bootstrapState = context.getOperatorStateStore() - .getListState(new ListStateDescriptor<>("bootstrap-state", Types.BOOLEAN())); - - if (context.isRestored()) { - LOG.info("Restoring state for the {}.", getClass().getSimpleName()); - - for (Boolean alreadyBootstrap : bootstrapState.get()) { - this.alreadyBootstrap = alreadyBootstrap; - } - } - } - @Override public void open(Configuration parameters) throws Exception { super.open(parameters); @@ -277,16 +254,6 @@ public class BootstrapFunction fileId, maxParallelism, parallelism) == taskID; } - @Override - public void notifyCheckpointComplete(long checkpointId) { - // no operation - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - this.bootstrapState.add(alreadyBootstrap); - } - @VisibleForTesting public boolean isAlreadyBootstrap() { return alreadyBootstrap; diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index e78456b0e..fb3971444 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -130,7 +130,6 @@ public class StreamWriteFunctionWrapper { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { bootstrapFunction = new BootstrapFunction<>(conf); bootstrapFunction.setRuntimeContext(runtimeContext); - bootstrapFunction.initializeState(this.functionInitializationContext); bootstrapFunction.open(conf); }