1
0

[HUDI-2342] Optimize Bootstrap operator (#3516)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-08-21 20:03:03 +08:00
committed by GitHub
parent c7c517f14c
commit ab3fbb8895
12 changed files with 239 additions and 235 deletions

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.sink.bootstrap;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
@@ -26,15 +27,19 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.format.FormatUtils;
@@ -42,35 +47,40 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.util.StreamerUtil.isValidFile;
/**
* The function to load index from existing hoodieTable.
* The operator to load index from existing hoodieTable.
*
* <p>Each subtask of the function triggers the index bootstrap when the first element came in,
* the record cannot be sent until all the index records have been sent.
*
* <p>The output records should then shuffle by the recordKey and thus do scalable write.
*/
public class BootstrapFunction<I, O extends HoodieRecord>
extends ProcessFunction<I, O> {
public class BootstrapOperator<I, O extends HoodieRecord>
extends AbstractStreamOperator<O> implements OneInputStreamOperator<I, O> {
private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
private static final Logger LOG = LoggerFactory.getLogger(BootstrapOperator.class);
protected HoodieTable<?, ?, ?, ?> hoodieTable;
@@ -79,65 +89,62 @@ public class BootstrapFunction<I, O extends HoodieRecord>
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
protected transient HoodieWriteConfig writeConfig;
private GlobalAggregateManager aggregateManager;
private transient ListState<String> instantState;
private final Pattern pattern;
private boolean alreadyBootstrap;
private String lastInstantTime;
private HoodieFlinkWriteClient writeClient;
private String actionType;
public BootstrapFunction(Configuration conf) {
public BootstrapOperator(Configuration conf) {
this.conf = conf;
this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
public void snapshotState(StateSnapshotContext context) throws Exception {
lastInstantTime = this.writeClient.getLastPendingInstant(this.actionType);
instantState.update(Collections.singletonList(lastInstantTime));
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
ListStateDescriptor<String> instantStateDescriptor = new ListStateDescriptor<>(
"instantStateDescriptor",
Types.STRING
);
instantState = context.getOperatorStateStore().getListState(instantStateDescriptor);
if (context.isRestored()) {
Iterator<String> instantIterator = instantState.get().iterator();
if (instantIterator.hasNext()) {
lastInstantTime = instantIterator.next();
}
}
this.hadoopConf = StreamerUtil.getHadoopConf();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
this.hoodieTable = getTable();
this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager();
this.writeClient = StreamerUtil.createWriteClient(this.conf, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)));
String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
if (pattern.matcher(partitionPath).matches()) {
loadRecords(partitionPath);
}
}
LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
}
@Override
@SuppressWarnings("unchecked")
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (!alreadyBootstrap) {
String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
if (pattern.matcher(partitionPath).matches()) {
loadRecords(partitionPath, out);
}
}
// wait for others bootstrap task send bootstrap complete.
waitForBootstrapReady(taskID);
alreadyBootstrap = true;
LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
}
// send the trigger record
out.collect((O) value);
}
/**
* Wait for other bootstrap tasks to finish the index bootstrap.
*/
private void waitForBootstrapReady(int taskID) {
int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
int readyTaskNum = 1;
while (taskNum != readyTaskNum) {
try {
readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new BootstrapAggFunction());
LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", taskID);
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
LOG.warn("Update global task bootstrap summary error", e);
}
}
public void processElement(StreamRecord<I> element) throws Exception {
output.collect((StreamRecord<O>) element);
}
private HoodieFlinkTable getTable() {
@@ -153,7 +160,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
* @param partitionPath The partition path
*/
@SuppressWarnings("unchecked")
protected void loadRecords(String partitionPath, Collector<O> out) throws Exception {
protected void loadRecords(String partitionPath) throws Exception {
long start = System.currentTimeMillis();
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
@@ -163,8 +170,11 @@ public class BootstrapFunction<I, O extends HoodieRecord>
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
Option<HoodieInstant> latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
if (!StringUtils.isNullOrEmpty(lastInstantTime)) {
commitsTimeline = commitsTimeline.findInstantsAfter(lastInstantTime);
}
Option<HoodieInstant> latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
@@ -193,22 +203,22 @@ public class BootstrapFunction<I, O extends HoodieRecord>
}
for (HoodieKey hoodieKey : hoodieKeys) {
out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)));
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
}
});
// load avro log records
List<String> logPaths = fileSlice.getLogFiles()
// filter out crushed files
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
// filter out crushed files
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);
try {
for (String recordKey : scanner.getRecords().keySet()) {
out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice)));
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
@@ -241,7 +251,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
}
@VisibleForTesting
public boolean isAlreadyBootstrap() {
return alreadyBootstrap;
public boolean isAlreadyBootstrap() throws Exception {
return instantState.get().iterator().hasNext();
}
}

View File

@@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bootstrap.aggregate;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
/**
* Bootstrap ready task id accumulator.
*/
public class BootstrapAccumulator implements Serializable {
private static final long serialVersionUID = 1L;
private final Set<Integer> readyTaskSet;
public BootstrapAccumulator() {
this.readyTaskSet = new HashSet<>();
}
public void update(int taskId) {
readyTaskSet.add(taskId);
}
public int readyTaskNum() {
return readyTaskSet.size();
}
public BootstrapAccumulator merge(BootstrapAccumulator acc) {
if (acc == null) {
return this;
}
readyTaskSet.addAll(acc.readyTaskSet);
return this;
}
}

View File

@@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bootstrap.aggregate;
import org.apache.flink.api.common.functions.AggregateFunction;
/**
* Aggregate Function that accumulates the loaded task number of
* function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}.
*/
public class BootstrapAggFunction implements AggregateFunction<Integer, BootstrapAccumulator, Integer> {
public static final String NAME = BootstrapAggFunction.class.getSimpleName();
@Override
public BootstrapAccumulator createAccumulator() {
return new BootstrapAccumulator();
}
@Override
public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) {
bootstrapAccumulator.update(taskId);
return bootstrapAccumulator;
}
@Override
public Integer getResult(BootstrapAccumulator bootstrapAccumulator) {
return bootstrapAccumulator.readyTaskNum();
}
@Override
public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) {
return bootstrapAccumulator.merge(acc);
}
}

View File

@@ -19,17 +19,17 @@
package org.apache.hudi.sink.bootstrap.batch;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.HashSet;
import java.util.Set;
/**
* The function to load index from existing hoodieTable.
* The operator to load index from existing hoodieTable.
*
* <p>This function should only be used for bounded source.
*
@@ -39,34 +39,30 @@ import java.util.Set;
*
* <p>The input records should shuffle by the partition path to avoid repeated loading.
*/
public class BatchBootstrapFunction<I, O extends HoodieRecord>
extends BootstrapFunction<I, O> {
public class BatchBootstrapOperator<I, O extends HoodieRecord>
extends BootstrapOperator<I, O> {
private Set<String> partitionPathSet;
private boolean haveSuccessfulCommits;
private final Set<String> partitionPathSet;
private final boolean haveSuccessfulCommits;
public BatchBootstrapFunction(Configuration conf) {
public BatchBootstrapOperator(Configuration conf) {
super(conf);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.partitionPathSet = new HashSet<>();
this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient());
}
@Override
public void processElement(I value, Context context, Collector<O> out) throws Exception {
final HoodieRecord record = (HoodieRecord<?>) value;
@SuppressWarnings("unchecked")
public void processElement(StreamRecord<I> element) throws Exception {
final HoodieRecord record = (HoodieRecord<?>) element.getValue();
final String partitionPath = record.getKey().getPartitionPath();
if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) {
loadRecords(partitionPath, out);
loadRecords(partitionPath);
partitionPathSet.add(partitionPath);
}
// send the trigger record
out.collect((O) value);
output.collect((StreamRecord<O>) element);
}
}

View File

@@ -22,8 +22,8 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapFunction;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -104,11 +104,11 @@ public class Pipelines {
DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
dataStream1 = dataStream1.rebalance()
dataStream1 = dataStream1
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
@@ -133,7 +133,7 @@ public class Pipelines {
.transform(
"batch_index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BatchBootstrapFunction<>(conf)))
new BatchBootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
.uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}

View File

@@ -23,7 +23,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -114,11 +114,11 @@ public class HoodieFlinkStreamer {
.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
dataStream2 = dataStream2.rebalance()
dataStream2 = dataStream2
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -277,7 +277,7 @@ public class StreamWriteITCase extends TestLogger {
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
new BootstrapOperator<>(conf));
}
DataStream<Object> pipeline = hoodieDataStream
@@ -370,7 +370,7 @@ public class StreamWriteITCase extends TestLogger {
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
new BootstrapOperator<>(conf));
}
DataStream<Object> pipeline = hoodieDataStream

View File

@@ -725,8 +725,6 @@ public class TestWriteCopyOnWrite {
funcWrapper.invoke(rowData);
}
assertTrue(funcWrapper.isAlreadyBootstrap());
checkIndexLoaded(
new HoodieKey("id1", "par1"),
new HoodieKey("id2", "par1"),
@@ -743,6 +741,8 @@ public class TestWriteCopyOnWrite {
// this triggers the data write and event send
funcWrapper.checkpointFunction(1);
assertTrue(funcWrapper.isAlreadyBootstrap());
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());

View File

@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import java.io.IOException;
import java.util.List;
/**
* Collecting {@link Output} for {@link StreamRecord}.
*/
public class CollectorOutput<T> implements Output<StreamRecord<T>> {
private final List<StreamElement> list;
public CollectorOutput(List<StreamElement> list) {
this.list = list;
}
public List<StreamElement> getList() {
return list;
}
@Override
public void emitWatermark(Watermark mark) {
list.add(mark);
}
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
list.add(latencyMarker);
}
@Override
public void collect(StreamRecord<T> record) {
try {
ClassLoader cl = record.getClass().getClassLoader();
T copied =
InstantiationUtil.deserializeObject(
InstantiationUtil.serializeObject(record.getValue()), cl);
list.add(record.copy(copied));
} catch (IOException | ClassNotFoundException ex) {
throw new RuntimeException("Unable to deserialize record: " + record, ex);
}
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
}
@Override
public void close() {
}
}

View File

@@ -52,7 +52,6 @@ public class CompactFunctionWrapper {
private final IOManager ioManager;
private final StreamingRuntimeContext runtimeContext;
private final MockFunctionInitializationContext functionInitializationContext;
/** Function that generates the {@link HoodieCompactionPlan}. */
private CompactionPlanOperator compactionPlanOperator;
@@ -70,7 +69,6 @@ public class CompactFunctionWrapper {
.build();
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
this.conf = conf;
this.functionInitializationContext = new MockFunctionInitializationContext();
}
public void openFunction() throws Exception {

View File

@@ -19,15 +19,18 @@ package org.apache.hudi.sink.utils;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
/**
* A {@link FunctionInitializationContext} for testing purpose.
*/
public class MockFunctionInitializationContext implements FunctionInitializationContext {
public class MockStateInitializationContext implements StateInitializationContext {
private final MockOperatorStateStore operatorStateStore;
public MockFunctionInitializationContext() {
public MockStateInitializationContext() {
operatorStateStore = new MockOperatorStateStore();
}
@@ -45,4 +48,14 @@ public class MockFunctionInitializationContext implements FunctionInitialization
public KeyedStateStore getKeyedStateStore() {
return operatorStateStore;
}
@Override
public Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs() {
return null;
}
@Override
public Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs() {
return null;
}
}

View File

@@ -25,8 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
@@ -34,6 +33,7 @@ import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -43,8 +43,14 @@ import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCo
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
@@ -68,7 +74,7 @@ public class StreamWriteFunctionWrapper<I> {
private final MockOperatorEventGateway gateway;
private final MockOperatorCoordinatorContext coordinatorContext;
private final StreamWriteOperatorCoordinator coordinator;
private final MockFunctionInitializationContext functionInitializationContext;
private final MockStateInitializationContext stateInitializationContext;
/**
* Function that converts row data to HoodieRecord.
@@ -77,7 +83,7 @@ public class StreamWriteFunctionWrapper<I> {
/**
* Function that load index in state.
*/
private BootstrapFunction<HoodieRecord<?>, HoodieRecord<?>> bootstrapFunction;
private BootstrapOperator<HoodieRecord<?>, HoodieRecord<?>> bootstrapOperator;
/**
* Function that assigns bucket ID.
*/
@@ -93,6 +99,12 @@ public class StreamWriteFunctionWrapper<I> {
private CompactFunctionWrapper compactFunctionWrapper;
private final Output<StreamRecord<HoodieRecord<?>>> output;
private final MockStreamTask streamTask;
private final StreamConfig streamConfig;
private final boolean asyncCompaction;
public StreamWriteFunctionWrapper(String tablePath) throws Exception {
@@ -114,10 +126,17 @@ public class StreamWriteFunctionWrapper<I> {
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
this.functionInitializationContext = new MockFunctionInitializationContext();
this.stateInitializationContext = new MockStateInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
this.output = new CollectorOutput<>(new ArrayList<>());
this.streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
this.streamTask = new MockStreamTaskBuilder(environment)
.setConfig(new StreamConfig(conf))
.setExecutionConfig(new ExecutionConfig().enableObjectReuse())
.build();
}
public void openFunction() throws Exception {
@@ -128,16 +147,16 @@ public class StreamWriteFunctionWrapper<I> {
toHoodieFunction.open(conf);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapFunction = new BootstrapFunction<>(conf);
bootstrapFunction.setRuntimeContext(runtimeContext);
bootstrapFunction.open(conf);
bootstrapOperator = new BootstrapOperator<>(conf);
bootstrapOperator.setup(streamTask, streamConfig, output);
bootstrapOperator.initializeState(this.stateInitializationContext);
}
bucketAssignerFunction = new BucketAssignFunction<>(conf);
bucketAssignerFunction.setRuntimeContext(runtimeContext);
bucketAssignerFunction.open(conf);
bucketAssignerFunction.setContext(bucketAssignOperatorContext);
bucketAssignerFunction.initializeState(this.functionInitializationContext);
bucketAssignerFunction.initializeState(this.stateInitializationContext);
setupWriteFunction();
@@ -146,6 +165,7 @@ public class StreamWriteFunctionWrapper<I> {
}
}
@SuppressWarnings("unchecked")
public void invoke(I record) throws Exception {
HoodieRecord<?> hoodieRecord = toHoodieFunction.map((RowData) record);
HoodieRecord<?>[] hoodieRecords = new HoodieRecord[1];
@@ -162,27 +182,16 @@ public class StreamWriteFunctionWrapper<I> {
};
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
List<HoodieRecord<?>> bootstrapRecords = new ArrayList<>();
Collector<HoodieRecord<?>> bootstrapCollector = new Collector<HoodieRecord<?>>() {
@Override
public void collect(HoodieRecord<?> record) {
if (record instanceof IndexRecord) {
bootstrapRecords.add(record);
}
List<StreamElement> list = ((CollectorOutput) output).getList();
for (StreamElement streamElement : list) {
if (streamElement.isRecord()) {
HoodieRecord<?> bootstrapRecord = (HoodieRecord<?>) streamElement.asRecord().getValue();
bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
}
@Override
public void close() {
}
};
bootstrapFunction.processElement(hoodieRecord, null, bootstrapCollector);
for (HoodieRecord bootstrapRecord : bootstrapRecords) {
bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
}
bootstrapOperator.processElement(new StreamRecord<>(hoodieRecord));
list.clear();
this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey());
}
@@ -210,14 +219,17 @@ public class StreamWriteFunctionWrapper<I> {
public void checkpointFunction(long checkpointId) throws Exception {
// checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapOperator.snapshotState(null);
}
bucketAssignerFunction.snapshotState(null);
writeFunction.snapshotState(null);
functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
}
public void checkpointComplete(long checkpointId) {
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
coordinator.notifyCheckpointComplete(checkpointId);
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
if (asyncCompaction) {
@@ -264,8 +276,8 @@ public class StreamWriteFunctionWrapper<I> {
return this.writeFunction.isConfirming();
}
public boolean isAlreadyBootstrap() {
return this.bootstrapFunction.isAlreadyBootstrap();
public boolean isAlreadyBootstrap() throws Exception {
return this.bootstrapOperator.isAlreadyBootstrap();
}
// -------------------------------------------------------------------------
@@ -276,7 +288,7 @@ public class StreamWriteFunctionWrapper<I> {
writeFunction = new StreamWriteFunction<>(conf);
writeFunction.setRuntimeContext(runtimeContext);
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.functionInitializationContext);
writeFunction.initializeState(this.stateInitializationContext);
writeFunction.open(conf);
// handle the bootstrap event