[HUDI-2193] Remove state in BootstrapFunction
This commit is contained in:
@@ -41,16 +41,9 @@ import org.apache.hudi.util.StreamerUtil;
|
|||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
import org.apache.flink.annotation.VisibleForTesting;
|
||||||
import org.apache.flink.api.common.state.CheckpointListener;
|
|
||||||
import org.apache.flink.api.common.state.ListState;
|
|
||||||
import org.apache.flink.api.common.state.ListStateDescriptor;
|
|
||||||
import org.apache.flink.api.scala.typeutils.Types;
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
|
||||||
import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
|
||||||
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
|
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
|
||||||
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
|
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
|
||||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
|
||||||
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
import org.apache.flink.streaming.api.functions.ProcessFunction;
|
||||||
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
|
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
|
||||||
import org.apache.flink.util.Collector;
|
import org.apache.flink.util.Collector;
|
||||||
@@ -73,8 +66,7 @@ import static java.util.stream.Collectors.toList;
|
|||||||
* <p>The output records should then shuffle by the recordKey and thus do scalable write.
|
* <p>The output records should then shuffle by the recordKey and thus do scalable write.
|
||||||
*/
|
*/
|
||||||
public class BootstrapFunction<I, O extends HoodieRecord>
|
public class BootstrapFunction<I, O extends HoodieRecord>
|
||||||
extends ProcessFunction<I, O>
|
extends ProcessFunction<I, O> {
|
||||||
implements CheckpointedFunction, CheckpointListener {
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
|
private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
|
||||||
|
|
||||||
@@ -86,7 +78,6 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
|||||||
private transient HoodieWriteConfig writeConfig;
|
private transient HoodieWriteConfig writeConfig;
|
||||||
|
|
||||||
private GlobalAggregateManager aggregateManager;
|
private GlobalAggregateManager aggregateManager;
|
||||||
private ListState<Boolean> bootstrapState;
|
|
||||||
|
|
||||||
private final Pattern pattern;
|
private final Pattern pattern;
|
||||||
private boolean alreadyBootstrap;
|
private boolean alreadyBootstrap;
|
||||||
@@ -96,20 +87,6 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
|||||||
this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
|
this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initializeState(FunctionInitializationContext context) throws Exception {
|
|
||||||
this.bootstrapState = context.getOperatorStateStore()
|
|
||||||
.getListState(new ListStateDescriptor<>("bootstrap-state", Types.BOOLEAN()));
|
|
||||||
|
|
||||||
if (context.isRestored()) {
|
|
||||||
LOG.info("Restoring state for the {}.", getClass().getSimpleName());
|
|
||||||
|
|
||||||
for (Boolean alreadyBootstrap : bootstrapState.get()) {
|
|
||||||
this.alreadyBootstrap = alreadyBootstrap;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
@@ -277,16 +254,6 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
|||||||
fileId, maxParallelism, parallelism) == taskID;
|
fileId, maxParallelism, parallelism) == taskID;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void notifyCheckpointComplete(long checkpointId) {
|
|
||||||
// no operation
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void snapshotState(FunctionSnapshotContext context) throws Exception {
|
|
||||||
this.bootstrapState.add(alreadyBootstrap);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public boolean isAlreadyBootstrap() {
|
public boolean isAlreadyBootstrap() {
|
||||||
return alreadyBootstrap;
|
return alreadyBootstrap;
|
||||||
|
|||||||
@@ -130,7 +130,6 @@ public class StreamWriteFunctionWrapper<I> {
|
|||||||
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
||||||
bootstrapFunction = new BootstrapFunction<>(conf);
|
bootstrapFunction = new BootstrapFunction<>(conf);
|
||||||
bootstrapFunction.setRuntimeContext(runtimeContext);
|
bootstrapFunction.setRuntimeContext(runtimeContext);
|
||||||
bootstrapFunction.initializeState(this.functionInitializationContext);
|
|
||||||
bootstrapFunction.open(conf);
|
bootstrapFunction.open(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user