Each subtask of the function triggers the index bootstrap when the first element came in,
* the record cannot be sent until all the index records have been sent.
*
*
The output records should then shuffle by the recordKey and thus do scalable write.
*/
-public class BootstrapFunction
- extends ProcessFunction {
+public class BootstrapOperator
+ extends AbstractStreamOperator implements OneInputStreamOperator {
- private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(BootstrapOperator.class);
protected HoodieTable, ?, ?, ?> hoodieTable;
@@ -79,65 +89,62 @@ public class BootstrapFunction
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
protected transient HoodieWriteConfig writeConfig;
- private GlobalAggregateManager aggregateManager;
-
+ private transient ListState instantState;
private final Pattern pattern;
- private boolean alreadyBootstrap;
+ private String lastInstantTime;
+ private HoodieFlinkWriteClient writeClient;
+ private String actionType;
- public BootstrapFunction(Configuration conf) {
+ public BootstrapOperator(Configuration conf) {
this.conf = conf;
this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ lastInstantTime = this.writeClient.getLastPendingInstant(this.actionType);
+ instantState.update(Collections.singletonList(lastInstantTime));
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ ListStateDescriptor instantStateDescriptor = new ListStateDescriptor<>(
+ "instantStateDescriptor",
+ Types.STRING
+ );
+ instantState = context.getOperatorStateStore().getListState(instantStateDescriptor);
+
+ if (context.isRestored()) {
+ Iterator instantIterator = instantState.get().iterator();
+ if (instantIterator.hasNext()) {
+ lastInstantTime = instantIterator.next();
+ }
+ }
+
this.hadoopConf = StreamerUtil.getHadoopConf();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
this.hoodieTable = getTable();
- this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager();
+ this.writeClient = StreamerUtil.createWriteClient(this.conf, getRuntimeContext());
+ this.actionType = CommitUtils.getCommitActionType(
+ WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)),
+ HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)));
+
+ String basePath = hoodieTable.getMetaClient().getBasePath();
+ int taskID = getRuntimeContext().getIndexOfThisSubtask();
+ LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
+ for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
+ if (pattern.matcher(partitionPath).matches()) {
+ loadRecords(partitionPath);
+ }
+ }
+
+ LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
}
@Override
@SuppressWarnings("unchecked")
- public void processElement(I value, Context ctx, Collector out) throws Exception {
- if (!alreadyBootstrap) {
- String basePath = hoodieTable.getMetaClient().getBasePath();
- int taskID = getRuntimeContext().getIndexOfThisSubtask();
- LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
- for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
- if (pattern.matcher(partitionPath).matches()) {
- loadRecords(partitionPath, out);
- }
- }
-
- // wait for others bootstrap task send bootstrap complete.
- waitForBootstrapReady(taskID);
-
- alreadyBootstrap = true;
- LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
- }
-
- // send the trigger record
- out.collect((O) value);
- }
-
- /**
- * Wait for other bootstrap tasks to finish the index bootstrap.
- */
- private void waitForBootstrapReady(int taskID) {
- int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
- int readyTaskNum = 1;
- while (taskNum != readyTaskNum) {
- try {
- readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new BootstrapAggFunction());
- LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", taskID);
-
- TimeUnit.SECONDS.sleep(5);
- } catch (Exception e) {
- LOG.warn("Update global task bootstrap summary error", e);
- }
- }
+ public void processElement(StreamRecord element) throws Exception {
+ output.collect((StreamRecord) element);
}
private HoodieFlinkTable getTable() {
@@ -153,7 +160,7 @@ public class BootstrapFunction
* @param partitionPath The partition path
*/
@SuppressWarnings("unchecked")
- protected void loadRecords(String partitionPath, Collector out) throws Exception {
+ protected void loadRecords(String partitionPath) throws Exception {
long start = System.currentTimeMillis();
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
@@ -163,8 +170,11 @@ public class BootstrapFunction
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
- Option latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline()
- .filterCompletedInstants().lastInstant();
+ HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
+ if (!StringUtils.isNullOrEmpty(lastInstantTime)) {
+ commitsTimeline = commitsTimeline.findInstantsAfter(lastInstantTime);
+ }
+ Option latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
List fileSlices = this.hoodieTable.getSliceView()
@@ -193,22 +203,22 @@ public class BootstrapFunction
}
for (HoodieKey hoodieKey : hoodieKeys) {
- out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)));
+ output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
}
});
// load avro log records
List logPaths = fileSlice.getLogFiles()
- // filter out crushed files
- .filter(logFile -> isValidFile(logFile.getFileStatus()))
- .map(logFile -> logFile.getPath().toString())
- .collect(toList());
+ // filter out crushed files
+ .filter(logFile -> isValidFile(logFile.getFileStatus()))
+ .map(logFile -> logFile.getPath().toString())
+ .collect(toList());
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);
try {
for (String recordKey : scanner.getRecords().keySet()) {
- out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice)));
+ output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
@@ -241,7 +251,7 @@ public class BootstrapFunction
}
@VisibleForTesting
- public boolean isAlreadyBootstrap() {
- return alreadyBootstrap;
+ public boolean isAlreadyBootstrap() throws Exception {
+ return instantState.get().iterator().hasNext();
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
deleted file mode 100644
index 14630a1f8..000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.bootstrap.aggregate;
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Bootstrap ready task id accumulator.
- */
-public class BootstrapAccumulator implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Set readyTaskSet;
-
- public BootstrapAccumulator() {
- this.readyTaskSet = new HashSet<>();
- }
-
- public void update(int taskId) {
- readyTaskSet.add(taskId);
- }
-
- public int readyTaskNum() {
- return readyTaskSet.size();
- }
-
- public BootstrapAccumulator merge(BootstrapAccumulator acc) {
- if (acc == null) {
- return this;
- }
-
- readyTaskSet.addAll(acc.readyTaskSet);
- return this;
- }
-}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java
deleted file mode 100644
index 075de6dc8..000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.bootstrap.aggregate;
-
-import org.apache.flink.api.common.functions.AggregateFunction;
-
-/**
- * Aggregate Function that accumulates the loaded task number of
- * function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}.
- */
-public class BootstrapAggFunction implements AggregateFunction {
- public static final String NAME = BootstrapAggFunction.class.getSimpleName();
-
- @Override
- public BootstrapAccumulator createAccumulator() {
- return new BootstrapAccumulator();
- }
-
- @Override
- public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) {
- bootstrapAccumulator.update(taskId);
- return bootstrapAccumulator;
- }
-
- @Override
- public Integer getResult(BootstrapAccumulator bootstrapAccumulator) {
- return bootstrapAccumulator.readyTaskNum();
- }
-
- @Override
- public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) {
- return bootstrapAccumulator.merge(acc);
- }
-}
\ No newline at end of file
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
similarity index 73%
rename from hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java
rename to hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
index 237858da8..3d97276ad 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/batch/BatchBootstrapOperator.java
@@ -19,17 +19,17 @@
package org.apache.hudi.sink.bootstrap.batch;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.HashSet;
import java.util.Set;
/**
- * The function to load index from existing hoodieTable.
+ * The operator to load index from existing hoodieTable.
*
*
This function should only be used for bounded source.
*
@@ -39,34 +39,30 @@ import java.util.Set;
*
*
The input records should shuffle by the partition path to avoid repeated loading.
*/
-public class BatchBootstrapFunction
- extends BootstrapFunction {
+public class BatchBootstrapOperator
+ extends BootstrapOperator {
- private Set partitionPathSet;
- private boolean haveSuccessfulCommits;
+ private final Set partitionPathSet;
+ private final boolean haveSuccessfulCommits;
- public BatchBootstrapFunction(Configuration conf) {
+ public BatchBootstrapOperator(Configuration conf) {
super(conf);
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
this.partitionPathSet = new HashSet<>();
this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient());
}
@Override
- public void processElement(I value, Context context, Collector out) throws Exception {
- final HoodieRecord record = (HoodieRecord>) value;
+ @SuppressWarnings("unchecked")
+ public void processElement(StreamRecord element) throws Exception {
+ final HoodieRecord record = (HoodieRecord>) element.getValue();
final String partitionPath = record.getKey().getPartitionPath();
if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) {
- loadRecords(partitionPath, out);
+ loadRecords(partitionPath);
partitionPathSet.add(partitionPath);
}
// send the trigger record
- out.collect((O) value);
+ output.collect((StreamRecord) element);
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index b71b18013..f31645cc4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
-import org.apache.hudi.sink.bootstrap.BootstrapFunction;
-import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -104,11 +104,11 @@ public class Pipelines {
DataStream dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- dataStream1 = dataStream1.rebalance()
+ dataStream1 = dataStream1
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
- new ProcessOperator<>(new BootstrapFunction<>(conf)))
+ new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
@@ -133,7 +133,7 @@ public class Pipelines {
.transform(
"batch_index_bootstrap",
TypeInformation.of(HoodieRecord.class),
- new ProcessOperator<>(new BatchBootstrapFunction<>(conf)))
+ new BatchBootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
.uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index 7fde34e9b..3d3f804af 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
-import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -114,11 +114,11 @@ public class HoodieFlinkStreamer {
.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- dataStream2 = dataStream2.rebalance()
+ dataStream2 = dataStream2
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
- new ProcessOperator<>(new BootstrapFunction<>(conf)))
+ new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 6aea1533e..a40a01069 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.bootstrap.BootstrapFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -277,7 +277,7 @@ public class StreamWriteITCase extends TestLogger {
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
- new ProcessOperator<>(new BootstrapFunction<>(conf)));
+ new BootstrapOperator<>(conf));
}
DataStream