From cf83f10f5b369e1f16e63b4b68750d3ad2dc0240 Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Tue, 8 Jun 2021 13:55:25 +0800 Subject: [PATCH] add BootstrapFunction to support index bootstrap (#3024) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 喻兆靖 --- .../hudi/configuration/FlinkOptions.java | 6 + .../sink/bootstrap/BootstrapFunction.java | 235 ++++++++++++++++++ .../hudi/sink/bootstrap/BootstrapRecord.java | 33 +++ .../aggregate/BootstrapAccumulator.java | 53 ++++ .../bootstrap/aggregate/BootstrapAggFunc.java | 49 ++++ .../partitioner/BucketAssignFunction.java | 102 +------- .../hudi/streamer/HoodieFlinkStreamer.java | 21 +- .../apache/hudi/table/HoodieTableSink.java | 21 +- .../apache/hudi/sink/StreamWriteITCase.java | 28 ++- .../hudi/sink/TestWriteCopyOnWrite.java | 28 ++- .../utils/StreamWriteFunctionWrapper.java | 44 ++++ 11 files changed, 507 insertions(+), 113 deletions(-) create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 7466b413e..57fd5014b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -93,6 +93,12 @@ public class FlinkOptions { .withDescription("Whether to update index for the old partition path\n" + "if same key record with different partition path came in, default false"); + public static final ConfigOption INDEX_PARTITION_REGEX = ConfigOptions + .key("index.partition.regex") + .stringType() + .defaultValue(".*") + .withDescription("Whether to load partitions in state if partition path matching, default *"); + // ------------------------------------------------------------------------ // Read Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java new file mode 100644 index 000000000..5f81559ec --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bootstrap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordGlobalLocation; +import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunc; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.util.StreamerUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +/** + * The function to load index from exists hoodieTable. + * + *

Each subtask in bootstrapFunction triggers the bootstrap index with the first element, + * Received record cannot be sent until the index is all sent. + * + *

The output records should then shuffle by the recordKey and thus do scalable write. + * + * @see BootstrapFunction + */ +public class BootstrapFunction + extends ProcessFunction + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class); + + private HoodieTable hoodieTable; + + private final Configuration conf; + + private transient org.apache.hadoop.conf.Configuration hadoopConf; + + private GlobalAggregateManager aggregateManager; + private ListState bootstrapState; + + private final Pattern pattern; + private boolean alreadyBootstrap; + + public BootstrapFunction(Configuration conf) { + this.conf = conf; + this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX)); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.bootstrapState = context.getOperatorStateStore().getListState( + new ListStateDescriptor<>( + "bootstrap-state", + TypeInformation.of(new TypeHint() {}) + ) + ); + + if (context.isRestored()) { + LOG.info("Restoring state for the {}.", getClass().getSimpleName()); + + for (Boolean alreadyBootstrap : bootstrapState.get()) { + this.alreadyBootstrap = alreadyBootstrap; + } + } + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.hadoopConf = StreamerUtil.getHadoopConf(); + this.hoodieTable = getTable(); + this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager(); + } + + @Override + @SuppressWarnings("unchecked") + public void processElement(I value, Context ctx, Collector out) throws IOException { + if (!alreadyBootstrap) { + LOG.info("Start loading records in table {} into the index state, taskId = {}", conf.getString(FlinkOptions.PATH), getRuntimeContext().getIndexOfThisSubtask()); + String basePath = hoodieTable.getMetaClient().getBasePath(); + for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) { + if (pattern.matcher(partitionPath).matches()) { + loadRecords(partitionPath, out); + } + } + + // wait for others bootstrap task send bootstrap complete. + updateAndWaiting(); + + alreadyBootstrap = true; + LOG.info("Finish send index to BucketAssign, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); + } + + // send data to next operator + out.collect((O) value); + } + + /** + * Wait for other bootstrap task send bootstrap complete. + */ + private void updateAndWaiting() { + int taskNum = getRuntimeContext().getNumberOfParallelSubtasks(); + int readyTaskNum = 1; + while (taskNum != readyTaskNum) { + try { + readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunc.NAME, getRuntimeContext().getIndexOfThisSubtask(), new BootstrapAggFunc()); + LOG.info("Waiting for others bootstrap task complete, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); + + TimeUnit.SECONDS.sleep(5); + } catch (Exception e) { + LOG.warn("update global aggregate error", e); + } + } + } + + private HoodieFlinkTable getTable() { + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); + HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( + new SerializableConfiguration(this.hadoopConf), + new FlinkTaskContextSupplier(getRuntimeContext())); + return HoodieFlinkTable.create(writeConfig, context); + } + + /** + * Load all the indices of give partition path into the backup state. + * + * @param partitionPath The partition path + */ + @SuppressWarnings("unchecked") + private void loadRecords(String partitionPath, Collector out) { + long start = System.currentTimeMillis(); + BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat()); + List latestBaseFiles = + HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable); + LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size()); + + final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); + final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); + final int taskID = getRuntimeContext().getIndexOfThisSubtask(); + for (HoodieBaseFile baseFile : latestBaseFiles) { + boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator( + baseFile.getFileId(), maxParallelism, parallelism) == taskID; + + if (shouldLoad) { + LOG.info("Load records from file {}.", baseFile); + final List hoodieKeys; + try { + hoodieKeys = + fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); + } catch (Exception e) { + throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); + } + + for (HoodieKey hoodieKey : hoodieKeys) { + out.collect((O) new BootstrapRecord(generateHoodieRecord(hoodieKey, baseFile))); + } + } + } + + long cost = System.currentTimeMillis() - start; + LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", + this.getClass().getSimpleName(), taskID, partitionPath, cost); + } + + @SuppressWarnings("unchecked") + public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, HoodieBaseFile baseFile) { + HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null); + hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId())); + hoodieRecord.seal(); + + return hoodieRecord; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + // no operation + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + this.bootstrapState.add(alreadyBootstrap); + } + + @VisibleForTesting + public boolean isAlreadyBootstrap() { + return alreadyBootstrap; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java new file mode 100644 index 000000000..025d844b2 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bootstrap; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; + +/** + * An record to mark HoodieRecord or IndexRecord. + */ +public class BootstrapRecord extends HoodieRecord { + private static final long serialVersionUID = 1L; + + public BootstrapRecord(HoodieRecord record) { + super(record); + } +} \ No newline at end of file diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java new file mode 100644 index 000000000..80067f067 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bootstrap.aggregate; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; + +/** + * Aggregate accumulator. + */ +public class BootstrapAccumulator implements Serializable { + private static final long serialVersionUID = 1L; + + private final Set readyTaskSet; + + public BootstrapAccumulator() { + this.readyTaskSet = new HashSet<>(); + } + + public void update(int taskId) { + readyTaskSet.add(taskId); + } + + public int readyTaskNum() { + return readyTaskSet.size(); + } + + public BootstrapAccumulator merge(BootstrapAccumulator acc) { + if (acc == null) { + return this; + } + + readyTaskSet.addAll(acc.readyTaskSet); + return this; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java new file mode 100644 index 000000000..2233e8422 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bootstrap.aggregate; + +import org.apache.flink.api.common.functions.AggregateFunction; + +/** + * Aggregate Function that accumulates the loaded task number of function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}. + */ +public class BootstrapAggFunc implements AggregateFunction { + public static final String NAME = BootstrapAggFunc.class.getSimpleName(); + + @Override + public BootstrapAccumulator createAccumulator() { + return new BootstrapAccumulator(); + } + + @Override + public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) { + bootstrapAccumulator.update(taskId); + return bootstrapAccumulator; + } + + @Override + public Integer getResult(BootstrapAccumulator bootstrapAccumulator) { + return bootstrapAccumulator.readyTaskNum(); + } + + @Override + public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) { + return bootstrapAccumulator.merge(acc); + } +} \ No newline at end of file diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 36014a8ec..00a82747f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -22,43 +22,32 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.BaseAvroPayload; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.sink.bootstrap.BootstrapRecord; import org.apache.hudi.sink.utils.PayloadCreation; -import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.table.runtime.util.StateTtlConfigUtil; import org.apache.flink.util.Collector; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.List; import java.util.Objects; /** @@ -80,8 +69,6 @@ public class BucketAssignFunction> extends KeyedProcessFunction implements CheckpointedFunction, CheckpointListener { - private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class); - private BucketAssignOperator.Context context; /** @@ -108,13 +95,6 @@ public class BucketAssignFunction> private final boolean isChangingRecords; - private final boolean bootstrapIndex; - - /** - * State to book-keep which partition is loaded into the index state {@code indexState}. - */ - private MapState partitionLoadState; - /** * Used to create DELETE payload. */ @@ -130,7 +110,6 @@ public class BucketAssignFunction> this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); - this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED); this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); } @@ -168,31 +147,29 @@ public class BucketAssignFunction> indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl)); } indexState = context.getKeyedStateStore().getState(indexStateDesc); - if (bootstrapIndex) { - MapStateDescriptor partitionLoadStateDesc = - new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT); - partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc); + } + + @Override + public void processElement(I value, Context ctx, Collector out) throws Exception { + if (value instanceof BootstrapRecord) { + BootstrapRecord bootstrapRecord = (BootstrapRecord) value; + this.context.setCurrentKey(bootstrapRecord.getRecordKey()); + this.indexState.update((HoodieRecordGlobalLocation) bootstrapRecord.getCurrentLocation()); + } else { + processRecord((HoodieRecord) value, out); } } @SuppressWarnings("unchecked") - @Override - public void processElement(I value, Context ctx, Collector out) throws Exception { + private void processRecord(HoodieRecord record, Collector out) throws Exception { // 1. put the record into the BucketAssigner; // 2. look up the state for location, if the record has a location, just send it out; // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out. - HoodieRecord record = (HoodieRecord) value; final HoodieKey hoodieKey = record.getKey(); final String recordKey = hoodieKey.getRecordKey(); final String partitionPath = hoodieKey.getPartitionPath(); final HoodieRecordLocation location; - // The dataset may be huge, thus the processing would block for long, - // disabled by default. - if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) { - // If the partition records are never loaded, load the records first. - loadRecords(partitionPath, recordKey); - } // Only changing records need looking up the index for the location, // append only records are always recognized as INSERT. HoodieRecordGlobalLocation oldLoc = indexState.value(); @@ -216,6 +193,7 @@ public class BucketAssignFunction> } } else { location = getNewRecordLocation(partitionPath); + this.context.setCurrentKey(recordKey); if (isChangingRecords) { updateIndexState(partitionPath, location); } @@ -264,62 +242,8 @@ public class BucketAssignFunction> this.context = context; } - /** - * Load all the indices of give partition path into the backup state. - * - * @param partitionPath The partition path - * @param curRecordKey The current record key - * @throws Exception when error occurs for state update - */ - private void loadRecords(String partitionPath, String curRecordKey) throws Exception { - LOG.info("Start loading records under partition {} into the index state", partitionPath); - HoodieTable hoodieTable = bucketAssigner.getTable(); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat()); - List latestBaseFiles = - HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable); - final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); - final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); - final int taskID = getRuntimeContext().getIndexOfThisSubtask(); - for (HoodieBaseFile baseFile : latestBaseFiles) { - final List hoodieKeys; - try { - hoodieKeys = - fileUtils.fetchRecordKeyPartitionPath(hadoopConf, new Path(baseFile.getPath())); - } catch (Exception e) { - // in case there was some empty parquet file when the pipeline - // crushes exceptionally. - LOG.error("Error when loading record keys from file: {}", baseFile); - continue; - } - hoodieKeys.forEach(hoodieKey -> { - try { - // Reference: org.apache.flink.streaming.api.datastream.KeyedStream, - // the input records is shuffled by record key - boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator( - hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID; - if (shouldLoad) { - this.context.setCurrentKey(hoodieKey.getRecordKey()); - this.indexState.update( - new HoodieRecordGlobalLocation( - hoodieKey.getPartitionPath(), - baseFile.getCommitTime(), - baseFile.getFileId())); - } - } catch (Exception e) { - LOG.error("Error when putting record keys into the state from file: {}", baseFile); - } - }); - } - // recover the currentKey - this.context.setCurrentKey(curRecordKey); - // Mark the partition path as loaded. - partitionLoadState.put(partitionPath, 0); - LOG.info("Finish loading records under partition {} into the index state", partitionPath); - } - @VisibleForTesting public void clearIndexState() { - this.partitionLoadState.clear(); this.indexState.clear(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index b0f7ada0f..9e77e73bc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -22,11 +22,12 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; +import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.compact.CompactionCommitEvent; +import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.compact.CompactionPlanEvent; -import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactFunction; -import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.util.AvroSchemaConverter; @@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -50,6 +52,8 @@ import java.util.Properties; /** * An Utility which can incrementally consume data from Kafka and apply it to the target table. * currently, it only support COW table and insert, upsert operation. + * + * note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from. */ public class HoodieFlinkStreamer { public static void main(String[] args) throws Exception { @@ -82,7 +86,7 @@ public class HoodieFlinkStreamer { StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); - DataStream pipeline = env.addSource(new FlinkKafkaConsumer<>( + DataStream hoodieDataStream = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic, new JsonRowDataDeserializationSchema( rowType, @@ -93,8 +97,15 @@ public class HoodieFlinkStreamer { ), kafkaProps)) .name("kafka_source") .uid("uid_kafka_source") - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) - // Key-by record key, to avoid multiple subtasks write to a partition at the same time + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))); + } + + DataStream pipeline = hoodieDataStream + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 02e76aae3..0d8f7f293 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; +import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.bootstrap.BootstrapRecord; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; @@ -38,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; @@ -74,10 +77,22 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); - DataStream pipeline = dataStream - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) + DataStream hoodieDataStream = dataStream + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + + // TODO: This is a very time-consuming operation, will optimization + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))); + } + + DataStream pipeline = hoodieDataStream + .transform("index_bootstrap", + TypeInformation.of(BootstrapRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))) // Key-by record key, to avoid multiple subtasks write to a bucket at the same time - .keyBy(HoodieRecord::getRecordKey) + .keyBy(BootstrapRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 0590a2373..ad9c9dccd 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -21,6 +21,7 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.bootstrap.BootstrapFunction; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; @@ -49,6 +50,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -109,14 +111,22 @@ public class StreamWriteITCase extends TestLogger { String sourcePath = Objects.requireNonNull(Thread.currentThread() .getContextClassLoader().getResource("test_source.data")).toString(); - DataStream dataStream = execEnv + DataStream hoodieDataStream = execEnv // use continuous file source to trigger checkpoint .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2)) .name("continuous_file_source") .setParallelism(1) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) .setParallelism(4) - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))); + } + + DataStream pipeline = hoodieDataStream // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) .transform( @@ -128,7 +138,7 @@ public class StreamWriteITCase extends TestLogger { .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write"); - execEnv.addOperator(dataStream.getTransformation()); + execEnv.addOperator(pipeline.getTransformation()); JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); // wait for the streaming job to finish @@ -171,12 +181,20 @@ public class StreamWriteITCase extends TestLogger { TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName("UTF-8"); - DataStream pipeline = execEnv + DataStream hoodieDataStream = execEnv // use PROCESS_CONTINUOUSLY mode to trigger checkpoint .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) .setParallelism(4) - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)) + .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))); + } + + DataStream pipeline = hoodieDataStream // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) .transform( diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 247e17e6c..5e68b7656 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -576,10 +576,6 @@ public class TestWriteCopyOnWrite { @Test public void testIndexStateBootstrap() throws Exception { - // reset the config option - conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - // open the function and ingest data funcWrapper.openFunction(); for (RowData rowData : TestData.DATA_SET_INSERT) { @@ -598,13 +594,21 @@ public class TestWriteCopyOnWrite { funcWrapper.checkpointComplete(1); - // Mark the index state as not fully loaded to trigger re-load from the filesystem. - funcWrapper.clearIndexState(); + // the data is not flushed yet + checkWrittenData(tempFile, EXPECTED1); + + // reset the config option + conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // upsert another data buffer + funcWrapper.openFunction(); for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) { funcWrapper.invoke(rowData); } + + assertTrue(funcWrapper.isAlreadyBootstrap()); + checkIndexLoaded( new HoodieKey("id1", "par1"), new HoodieKey("id2", "par1"), @@ -613,11 +617,13 @@ public class TestWriteCopyOnWrite { new HoodieKey("id5", "par3"), new HoodieKey("id6", "par3"), new HoodieKey("id7", "par4"), - new HoodieKey("id8", "par4")); - // the data is not flushed yet - checkWrittenData(tempFile, EXPECTED1); + new HoodieKey("id8", "par4"), + new HoodieKey("id9", "par3"), + new HoodieKey("id10", "par4"), + new HoodieKey("id11", "par4")); + // this triggers the data write and event send - funcWrapper.checkpointFunction(2); + funcWrapper.checkpointFunction(1); String instant = funcWrapper.getWriteClient() .getLastPendingInstant(getTableType()); @@ -631,7 +637,7 @@ public class TestWriteCopyOnWrite { checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); - funcWrapper.checkpointComplete(2); + funcWrapper.checkpointComplete(1); // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED2); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 74e43bad0..705780cdc 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.bootstrap.BootstrapRecord; import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; @@ -46,6 +48,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; import java.util.HashSet; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -68,6 +71,8 @@ public class StreamWriteFunctionWrapper { /** Function that converts row data to HoodieRecord. */ private RowDataToHoodieFunction> toHoodieFunction; + /** Function that load index in state. */ + private BootstrapFunction, HoodieRecord> bootstrapFunction; /** Function that assigns bucket ID. */ private BucketAssignFunction, HoodieRecord> bucketAssignerFunction; /** BucketAssignOperator context. **/ @@ -94,6 +99,8 @@ public class StreamWriteFunctionWrapper { // one function this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); + this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); + this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); this.functionInitializationContext = new MockFunctionInitializationContext(); this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext(); @@ -106,6 +113,13 @@ public class StreamWriteFunctionWrapper { toHoodieFunction.setRuntimeContext(runtimeContext); toHoodieFunction.open(conf); + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + bootstrapFunction = new BootstrapFunction<>(conf); + bootstrapFunction.setRuntimeContext(runtimeContext); + bootstrapFunction.initializeState(this.functionInitializationContext); + bootstrapFunction.open(conf); + } + bucketAssignerFunction = new BucketAssignFunction<>(conf); bucketAssignerFunction.setRuntimeContext(runtimeContext); bucketAssignerFunction.open(conf); @@ -136,6 +150,32 @@ public class StreamWriteFunctionWrapper { } }; + + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + List> bootstrapRecords = new ArrayList<>(); + + Collector> bootstrapCollector = new Collector>() { + @Override + public void collect(HoodieRecord record) { + if (record instanceof BootstrapRecord) { + bootstrapRecords.add(record); + } + } + + @Override + public void close() { + + } + }; + + bootstrapFunction.processElement(hoodieRecord, null, bootstrapCollector); + for (HoodieRecord bootstrapRecord : bootstrapRecords) { + bucketAssignerFunction.processElement(bootstrapRecord, null, collector); + } + + this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey()); + } + bucketAssignerFunction.processElement(hoodieRecord, null, collector); writeFunction.processElement(hoodieRecords[0], null, null); } @@ -210,6 +250,10 @@ public class StreamWriteFunctionWrapper { return this.writeFunction.isConfirming(); } + public boolean isAlreadyBootstrap() { + return this.bootstrapFunction.isAlreadyBootstrap(); + } + // ------------------------------------------------------------------------- // Inner Class // -------------------------------------------------------------------------