From ab3fbb88950ca351723f2f46da9f40ba325a03b5 Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Sat, 21 Aug 2021 20:03:03 +0800 Subject: [PATCH] [HUDI-2342] Optimize Bootstrap operator (#3516) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 喻兆靖 --- ...apFunction.java => BootstrapOperator.java} | 142 ++++++++++-------- .../aggregate/BootstrapAccumulator.java | 53 ------- .../aggregate/BootstrapAggFunction.java | 50 ------ ...ction.java => BatchBootstrapOperator.java} | 30 ++-- .../org/apache/hudi/sink/utils/Pipelines.java | 10 +- .../hudi/streamer/HoodieFlinkStreamer.java | 6 +- .../apache/hudi/sink/StreamWriteITCase.java | 6 +- .../hudi/sink/TestWriteCopyOnWrite.java | 4 +- .../hudi/sink/utils/CollectorOutput.java | 78 ++++++++++ .../sink/utils/CompactFunctionWrapper.java | 2 - ...va => MockStateInitializationContext.java} | 17 ++- .../utils/StreamWriteFunctionWrapper.java | 76 ++++++---- 12 files changed, 239 insertions(+), 235 deletions(-) rename hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/{BootstrapFunction.java => BootstrapOperator.java} (63%) delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java delete mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java rename hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/{BatchBootstrapFunction.java => BatchBootstrapOperator.java} (73%) create mode 100644 hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java rename hudi-flink/src/test/java/org/apache/hudi/sink/utils/{MockFunctionInitializationContext.java => MockStateInitializationContext.java} (71%) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java similarity index 63% rename from hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index a73831b7b..3ac7aa1e6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -19,6 +19,7 @@ package org.apache.hudi.sink.bootstrap; import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; @@ -26,15 +27,19 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.format.FormatUtils; @@ -42,35 +47,40 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.util.Collector; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import static java.util.stream.Collectors.toList; import static org.apache.hudi.util.StreamerUtil.isValidFile; /** - * The function to load index from existing hoodieTable. + * The operator to load index from existing hoodieTable. * *

Each subtask of the function triggers the index bootstrap when the first element came in, * the record cannot be sent until all the index records have been sent. * *

The output records should then shuffle by the recordKey and thus do scalable write. */ -public class BootstrapFunction - extends ProcessFunction { +public class BootstrapOperator + extends AbstractStreamOperator implements OneInputStreamOperator { - private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class); + private static final Logger LOG = LoggerFactory.getLogger(BootstrapOperator.class); protected HoodieTable hoodieTable; @@ -79,65 +89,62 @@ public class BootstrapFunction protected transient org.apache.hadoop.conf.Configuration hadoopConf; protected transient HoodieWriteConfig writeConfig; - private GlobalAggregateManager aggregateManager; - + private transient ListState instantState; private final Pattern pattern; - private boolean alreadyBootstrap; + private String lastInstantTime; + private HoodieFlinkWriteClient writeClient; + private String actionType; - public BootstrapFunction(Configuration conf) { + public BootstrapOperator(Configuration conf) { this.conf = conf; this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX)); } @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); + public void snapshotState(StateSnapshotContext context) throws Exception { + lastInstantTime = this.writeClient.getLastPendingInstant(this.actionType); + instantState.update(Collections.singletonList(lastInstantTime)); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + ListStateDescriptor instantStateDescriptor = new ListStateDescriptor<>( + "instantStateDescriptor", + Types.STRING + ); + instantState = context.getOperatorStateStore().getListState(instantStateDescriptor); + + if (context.isRestored()) { + Iterator instantIterator = instantState.get().iterator(); + if (instantIterator.hasNext()) { + lastInstantTime = instantIterator.next(); + } + } + this.hadoopConf = StreamerUtil.getHadoopConf(); this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); this.hoodieTable = getTable(); - this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager(); + this.writeClient = StreamerUtil.createWriteClient(this.conf, getRuntimeContext()); + this.actionType = CommitUtils.getCommitActionType( + WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)), + HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE))); + + String basePath = hoodieTable.getMetaClient().getBasePath(); + int taskID = getRuntimeContext().getIndexOfThisSubtask(); + LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID); + for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) { + if (pattern.matcher(partitionPath).matches()) { + loadRecords(partitionPath); + } + } + + LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); } @Override @SuppressWarnings("unchecked") - public void processElement(I value, Context ctx, Collector out) throws Exception { - if (!alreadyBootstrap) { - String basePath = hoodieTable.getMetaClient().getBasePath(); - int taskID = getRuntimeContext().getIndexOfThisSubtask(); - LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID); - for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) { - if (pattern.matcher(partitionPath).matches()) { - loadRecords(partitionPath, out); - } - } - - // wait for others bootstrap task send bootstrap complete. - waitForBootstrapReady(taskID); - - alreadyBootstrap = true; - LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); - } - - // send the trigger record - out.collect((O) value); - } - - /** - * Wait for other bootstrap tasks to finish the index bootstrap. - */ - private void waitForBootstrapReady(int taskID) { - int taskNum = getRuntimeContext().getNumberOfParallelSubtasks(); - int readyTaskNum = 1; - while (taskNum != readyTaskNum) { - try { - readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new BootstrapAggFunction()); - LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", taskID); - - TimeUnit.SECONDS.sleep(5); - } catch (Exception e) { - LOG.warn("Update global task bootstrap summary error", e); - } - } + public void processElement(StreamRecord element) throws Exception { + output.collect((StreamRecord) element); } private HoodieFlinkTable getTable() { @@ -153,7 +160,7 @@ public class BootstrapFunction * @param partitionPath The partition path */ @SuppressWarnings("unchecked") - protected void loadRecords(String partitionPath, Collector out) throws Exception { + protected void loadRecords(String partitionPath) throws Exception { long start = System.currentTimeMillis(); BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat()); @@ -163,8 +170,11 @@ public class BootstrapFunction final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); final int taskID = getRuntimeContext().getIndexOfThisSubtask(); - Option latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline() - .filterCompletedInstants().lastInstant(); + HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline(); + if (!StringUtils.isNullOrEmpty(lastInstantTime)) { + commitsTimeline = commitsTimeline.findInstantsAfter(lastInstantTime); + } + Option latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant(); if (latestCommitTime.isPresent()) { List fileSlices = this.hoodieTable.getSliceView() @@ -193,22 +203,22 @@ public class BootstrapFunction } for (HoodieKey hoodieKey : hoodieKeys) { - out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))); + output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)))); } }); // load avro log records List logPaths = fileSlice.getLogFiles() - // filter out crushed files - .filter(logFile -> isValidFile(logFile.getFileStatus())) - .map(logFile -> logFile.getPath().toString()) - .collect(toList()); + // filter out crushed files + .filter(logFile -> isValidFile(logFile.getFileStatus())) + .map(logFile -> logFile.getPath().toString()) + .collect(toList()); HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(), writeConfig, hadoopConf); try { for (String recordKey : scanner.getRecords().keySet()) { - out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))); + output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice)))); } } catch (Exception e) { throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e); @@ -241,7 +251,7 @@ public class BootstrapFunction } @VisibleForTesting - public boolean isAlreadyBootstrap() { - return alreadyBootstrap; + public boolean isAlreadyBootstrap() throws Exception { + return instantState.get().iterator().hasNext(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java deleted file mode 100644 index 14630a1f8..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.bootstrap.aggregate; - -import java.io.Serializable; -import java.util.HashSet; -import java.util.Set; - -/** - * Bootstrap ready task id accumulator. - */ -public class BootstrapAccumulator implements Serializable { - private static final long serialVersionUID = 1L; - - private final Set readyTaskSet; - - public BootstrapAccumulator() { - this.readyTaskSet = new HashSet<>(); - } - - public void update(int taskId) { - readyTaskSet.add(taskId); - } - - public int readyTaskNum() { - return readyTaskSet.size(); - } - - public BootstrapAccumulator merge(BootstrapAccumulator acc) { - if (acc == null) { - return this; - } - - readyTaskSet.addAll(acc.readyTaskSet); - return this; - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java deleted file mode 100644 index 075de6dc8..000000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.bootstrap.aggregate; - -import org.apache.flink.api.common.functions.AggregateFunction; - -/** - * Aggregate Function that accumulates the loaded task number of - * function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}. - */ -public class BootstrapAggFunction implements AggregateFunction { - public static final String NAME = BootstrapAggFunction.class.getSimpleName(); - - @Override - public BootstrapAccumulator createAccumulator() { - return new BootstrapAccumulator(); - } - - @Override - public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) { - bootstrapAccumulator.update(taskId); - return bootstrapAccumulator; - } - - @Override - public Integer getResult(BootstrapAccumulator bootstrapAccumulator) { - return bootstrapAccumulator.readyTaskNum(); - } - - @Override - public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) { - return bootstrapAccumulator.merge(acc); - } -} \ No newline at end of file diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java similarity index 73% rename from hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java index 237858da8..3d97276ad 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java @@ -19,17 +19,17 @@ package org.apache.hudi.sink.bootstrap.batch; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.HashSet; import java.util.Set; /** - * The function to load index from existing hoodieTable. + * The operator to load index from existing hoodieTable. * *

This function should only be used for bounded source. * @@ -39,34 +39,30 @@ import java.util.Set; * *

The input records should shuffle by the partition path to avoid repeated loading. */ -public class BatchBootstrapFunction - extends BootstrapFunction { +public class BatchBootstrapOperator + extends BootstrapOperator { - private Set partitionPathSet; - private boolean haveSuccessfulCommits; + private final Set partitionPathSet; + private final boolean haveSuccessfulCommits; - public BatchBootstrapFunction(Configuration conf) { + public BatchBootstrapOperator(Configuration conf) { super(conf); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); this.partitionPathSet = new HashSet<>(); this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient()); } @Override - public void processElement(I value, Context context, Collector out) throws Exception { - final HoodieRecord record = (HoodieRecord) value; + @SuppressWarnings("unchecked") + public void processElement(StreamRecord element) throws Exception { + final HoodieRecord record = (HoodieRecord) element.getValue(); final String partitionPath = record.getKey().getPartitionPath(); if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) { - loadRecords(partitionPath, out); + loadRecords(partitionPath); partitionPathSet.add(partitionPath); } // send the trigger record - out.collect((O) value); + output.collect((StreamRecord) element); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index b71b18013..f31645cc4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -22,8 +22,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; -import org.apache.hudi.sink.bootstrap.BootstrapFunction; -import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapFunction; +import org.apache.hudi.sink.bootstrap.BootstrapOperator; +import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator; import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; import org.apache.hudi.sink.bulk.RowDataKeyGen; import org.apache.hudi.sink.bulk.sort.SortOperatorGen; @@ -104,11 +104,11 @@ public class Pipelines { DataStream dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - dataStream1 = dataStream1.rebalance() + dataStream1 = dataStream1 .transform( "index_bootstrap", TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))) + new BootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } @@ -133,7 +133,7 @@ public class Pipelines { .transform( "batch_index_bootstrap", TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BatchBootstrapFunction<>(conf))) + new BatchBootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 7fde34e9b..3d3f804af 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; -import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; @@ -114,11 +114,11 @@ public class HoodieFlinkStreamer { .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - dataStream2 = dataStream2.rebalance() + dataStream2 = dataStream2 .transform( "index_bootstrap", TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))) + new BootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism)) .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 6aea1533e..a40a01069 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; @@ -277,7 +277,7 @@ public class StreamWriteITCase extends TestLogger { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { hoodieDataStream = hoodieDataStream.transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))); + new BootstrapOperator<>(conf)); } DataStream pipeline = hoodieDataStream @@ -370,7 +370,7 @@ public class StreamWriteITCase extends TestLogger { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { hoodieDataStream = hoodieDataStream.transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))); + new BootstrapOperator<>(conf)); } DataStream pipeline = hoodieDataStream diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 8e7f8099b..0c5224159 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -725,8 +725,6 @@ public class TestWriteCopyOnWrite { funcWrapper.invoke(rowData); } - assertTrue(funcWrapper.isAlreadyBootstrap()); - checkIndexLoaded( new HoodieKey("id1", "par1"), new HoodieKey("id2", "par1"), @@ -743,6 +741,8 @@ public class TestWriteCopyOnWrite { // this triggers the data write and event send funcWrapper.checkpointFunction(1); + assertTrue(funcWrapper.isAlreadyBootstrap()); + String instant = funcWrapper.getWriteClient() .getLastPendingInstant(getTableType()); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java new file mode 100644 index 000000000..3da21e6eb --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.OutputTag; + +import java.io.IOException; +import java.util.List; + +/** + * Collecting {@link Output} for {@link StreamRecord}. + */ +public class CollectorOutput implements Output> { + + private final List list; + + public CollectorOutput(List list) { + this.list = list; + } + + public List getList() { + return list; + } + + @Override + public void emitWatermark(Watermark mark) { + list.add(mark); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + list.add(latencyMarker); + } + + @Override + public void collect(StreamRecord record) { + try { + ClassLoader cl = record.getClass().getClassLoader(); + T copied = + InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(record.getValue()), cl); + list.add(record.copy(copied)); + } catch (IOException | ClassNotFoundException ex) { + throw new RuntimeException("Unable to deserialize record: " + record, ex); + } + } + + @Override + public void collect(OutputTag outputTag, StreamRecord record) { + throw new UnsupportedOperationException("Side output not supported for CollectorOutput"); + } + + @Override + public void close() { + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java index 4edfe9fb1..d53d58ebb 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java @@ -52,7 +52,6 @@ public class CompactFunctionWrapper { private final IOManager ioManager; private final StreamingRuntimeContext runtimeContext; - private final MockFunctionInitializationContext functionInitializationContext; /** Function that generates the {@link HoodieCompactionPlan}. */ private CompactionPlanOperator compactionPlanOperator; @@ -70,7 +69,6 @@ public class CompactFunctionWrapper { .build(); this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); this.conf = conf; - this.functionInitializationContext = new MockFunctionInitializationContext(); } public void openFunction() throws Exception { diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockFunctionInitializationContext.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java similarity index 71% rename from hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockFunctionInitializationContext.java rename to hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java index e3da4c942..dd89f7111 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockFunctionInitializationContext.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java @@ -19,15 +19,18 @@ package org.apache.hudi.sink.utils; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StatePartitionStreamProvider; /** * A {@link FunctionInitializationContext} for testing purpose. */ -public class MockFunctionInitializationContext implements FunctionInitializationContext { +public class MockStateInitializationContext implements StateInitializationContext { private final MockOperatorStateStore operatorStateStore; - public MockFunctionInitializationContext() { + public MockStateInitializationContext() { operatorStateStore = new MockOperatorStateStore(); } @@ -45,4 +48,14 @@ public class MockFunctionInitializationContext implements FunctionInitialization public KeyedStateStore getKeyedStateStore() { return operatorStateStore; } + + @Override + public Iterable getRawOperatorStateInputs() { + return null; + } + + @Override + public Iterable getRawKeyedStateInputs() { + return null; + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 6f87ef73c..4ada5172b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -25,8 +25,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; -import org.apache.hudi.sink.bootstrap.BootstrapFunction; -import org.apache.hudi.sink.bootstrap.IndexRecord; +import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; @@ -34,6 +33,7 @@ import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -43,8 +43,14 @@ import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCo import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.MockStreamTask; +import org.apache.flink.streaming.util.MockStreamTaskBuilder; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; @@ -68,7 +74,7 @@ public class StreamWriteFunctionWrapper { private final MockOperatorEventGateway gateway; private final MockOperatorCoordinatorContext coordinatorContext; private final StreamWriteOperatorCoordinator coordinator; - private final MockFunctionInitializationContext functionInitializationContext; + private final MockStateInitializationContext stateInitializationContext; /** * Function that converts row data to HoodieRecord. @@ -77,7 +83,7 @@ public class StreamWriteFunctionWrapper { /** * Function that load index in state. */ - private BootstrapFunction, HoodieRecord> bootstrapFunction; + private BootstrapOperator, HoodieRecord> bootstrapOperator; /** * Function that assigns bucket ID. */ @@ -93,6 +99,12 @@ public class StreamWriteFunctionWrapper { private CompactFunctionWrapper compactFunctionWrapper; + private final Output>> output; + + private final MockStreamTask streamTask; + + private final StreamConfig streamConfig; + private final boolean asyncCompaction; public StreamWriteFunctionWrapper(String tablePath) throws Exception { @@ -114,10 +126,17 @@ public class StreamWriteFunctionWrapper { this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); - this.functionInitializationContext = new MockFunctionInitializationContext(); + this.stateInitializationContext = new MockStateInitializationContext(); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.asyncCompaction = StreamerUtil.needsAsyncCompaction(conf); this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); + this.output = new CollectorOutput<>(new ArrayList<>()); + this.streamConfig = new StreamConfig(conf); + streamConfig.setOperatorID(new OperatorID()); + this.streamTask = new MockStreamTaskBuilder(environment) + .setConfig(new StreamConfig(conf)) + .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) + .build(); } public void openFunction() throws Exception { @@ -128,16 +147,16 @@ public class StreamWriteFunctionWrapper { toHoodieFunction.open(conf); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - bootstrapFunction = new BootstrapFunction<>(conf); - bootstrapFunction.setRuntimeContext(runtimeContext); - bootstrapFunction.open(conf); + bootstrapOperator = new BootstrapOperator<>(conf); + bootstrapOperator.setup(streamTask, streamConfig, output); + bootstrapOperator.initializeState(this.stateInitializationContext); } bucketAssignerFunction = new BucketAssignFunction<>(conf); bucketAssignerFunction.setRuntimeContext(runtimeContext); bucketAssignerFunction.open(conf); bucketAssignerFunction.setContext(bucketAssignOperatorContext); - bucketAssignerFunction.initializeState(this.functionInitializationContext); + bucketAssignerFunction.initializeState(this.stateInitializationContext); setupWriteFunction(); @@ -146,6 +165,7 @@ public class StreamWriteFunctionWrapper { } } + @SuppressWarnings("unchecked") public void invoke(I record) throws Exception { HoodieRecord hoodieRecord = toHoodieFunction.map((RowData) record); HoodieRecord[] hoodieRecords = new HoodieRecord[1]; @@ -162,27 +182,16 @@ public class StreamWriteFunctionWrapper { }; if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - List> bootstrapRecords = new ArrayList<>(); - - Collector> bootstrapCollector = new Collector>() { - @Override - public void collect(HoodieRecord record) { - if (record instanceof IndexRecord) { - bootstrapRecords.add(record); - } + List list = ((CollectorOutput) output).getList(); + for (StreamElement streamElement : list) { + if (streamElement.isRecord()) { + HoodieRecord bootstrapRecord = (HoodieRecord) streamElement.asRecord().getValue(); + bucketAssignerFunction.processElement(bootstrapRecord, null, collector); } - - @Override - public void close() { - - } - }; - - bootstrapFunction.processElement(hoodieRecord, null, bootstrapCollector); - for (HoodieRecord bootstrapRecord : bootstrapRecords) { - bucketAssignerFunction.processElement(bootstrapRecord, null, collector); } + bootstrapOperator.processElement(new StreamRecord<>(hoodieRecord)); + list.clear(); this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey()); } @@ -210,14 +219,17 @@ public class StreamWriteFunctionWrapper { public void checkpointFunction(long checkpointId) throws Exception { // checkpoint the coordinator first this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + bootstrapOperator.snapshotState(null); + } bucketAssignerFunction.snapshotState(null); writeFunction.snapshotState(null); - functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); + stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); } public void checkpointComplete(long checkpointId) { - functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); + stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); if (asyncCompaction) { @@ -264,8 +276,8 @@ public class StreamWriteFunctionWrapper { return this.writeFunction.isConfirming(); } - public boolean isAlreadyBootstrap() { - return this.bootstrapFunction.isAlreadyBootstrap(); + public boolean isAlreadyBootstrap() throws Exception { + return this.bootstrapOperator.isAlreadyBootstrap(); } // ------------------------------------------------------------------------- @@ -276,7 +288,7 @@ public class StreamWriteFunctionWrapper { writeFunction = new StreamWriteFunction<>(conf); writeFunction.setRuntimeContext(runtimeContext); writeFunction.setOperatorEventGateway(gateway); - writeFunction.initializeState(this.functionInitializationContext); + writeFunction.initializeState(this.stateInitializationContext); writeFunction.open(conf); // handle the bootstrap event