1
0

add BootstrapFunction to support index bootstrap (#3024)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-06-08 13:55:25 +08:00
committed by GitHub
parent 57611d10b5
commit cf83f10f5b
11 changed files with 507 additions and 113 deletions

View File

@@ -93,6 +93,12 @@ public class FlinkOptions {
.withDescription("Whether to update index for the old partition path\n"
+ "if same key record with different partition path came in, default false");
public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions
.key("index.partition.regex")
.stringType()
.defaultValue(".*")
.withDescription("Whether to load partitions in state if partition path matching default *");
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------

View File

@@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bootstrap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunc;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
* The function to load index from exists hoodieTable.
*
* <p>Each subtask in bootstrapFunction triggers the bootstrap index with the first element,
* Received record cannot be sent until the index is all sent.
*
* <p>The output records should then shuffle by the recordKey and thus do scalable write.
*
* @see BootstrapFunction
*/
public class BootstrapFunction<I, O extends HoodieRecord>
extends ProcessFunction<I, O>
implements CheckpointedFunction, CheckpointListener {
private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
private HoodieTable<?, ?, ?, ?> hoodieTable;
private final Configuration conf;
private transient org.apache.hadoop.conf.Configuration hadoopConf;
private GlobalAggregateManager aggregateManager;
private ListState<Boolean> bootstrapState;
private final Pattern pattern;
private boolean alreadyBootstrap;
public BootstrapFunction(Configuration conf) {
this.conf = conf;
this.pattern = Pattern.compile(conf.getString(FlinkOptions.INDEX_PARTITION_REGEX));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.bootstrapState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"bootstrap-state",
TypeInformation.of(new TypeHint<Boolean>() {})
)
);
if (context.isRestored()) {
LOG.info("Restoring state for the {}.", getClass().getSimpleName());
for (Boolean alreadyBootstrap : bootstrapState.get()) {
this.alreadyBootstrap = alreadyBootstrap;
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hoodieTable = getTable();
this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager();
}
@Override
@SuppressWarnings("unchecked")
public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
if (!alreadyBootstrap) {
LOG.info("Start loading records in table {} into the index state, taskId = {}", conf.getString(FlinkOptions.PATH), getRuntimeContext().getIndexOfThisSubtask());
String basePath = hoodieTable.getMetaClient().getBasePath();
for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
if (pattern.matcher(partitionPath).matches()) {
loadRecords(partitionPath, out);
}
}
// wait for others bootstrap task send bootstrap complete.
updateAndWaiting();
alreadyBootstrap = true;
LOG.info("Finish send index to BucketAssign, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
}
// send data to next operator
out.collect((O) value);
}
/**
* Wait for other bootstrap task send bootstrap complete.
*/
private void updateAndWaiting() {
int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
int readyTaskNum = 1;
while (taskNum != readyTaskNum) {
try {
readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunc.NAME, getRuntimeContext().getIndexOfThisSubtask(), new BootstrapAggFunc());
LOG.info("Waiting for others bootstrap task complete, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
LOG.warn("update global aggregate error", e);
}
}
}
private HoodieFlinkTable getTable() {
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext()));
return HoodieFlinkTable.create(writeConfig, context);
}
/**
* Load all the indices of give partition path into the backup state.
*
* @param partitionPath The partition path
*/
@SuppressWarnings("unchecked")
private void loadRecords(String partitionPath, Collector<O> out) {
long start = System.currentTimeMillis();
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
List<HoodieBaseFile> latestBaseFiles =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable);
LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size());
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
for (HoodieBaseFile baseFile : latestBaseFiles) {
boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
baseFile.getFileId(), maxParallelism, parallelism) == taskID;
if (shouldLoad) {
LOG.info("Load records from file {}.", baseFile);
final List<HoodieKey> hoodieKeys;
try {
hoodieKeys =
fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
}
for (HoodieKey hoodieKey : hoodieKeys) {
out.collect((O) new BootstrapRecord(generateHoodieRecord(hoodieKey, baseFile)));
}
}
}
long cost = System.currentTimeMillis() - start;
LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.",
this.getClass().getSimpleName(), taskID, partitionPath, cost);
}
@SuppressWarnings("unchecked")
public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, HoodieBaseFile baseFile) {
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null);
hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId()));
hoodieRecord.seal();
return hoodieRecord;
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// no operation
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
this.bootstrapState.add(alreadyBootstrap);
}
@VisibleForTesting
public boolean isAlreadyBootstrap() {
return alreadyBootstrap;
}
}

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bootstrap;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
/**
* An record to mark HoodieRecord or IndexRecord.
*/
public class BootstrapRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> {
private static final long serialVersionUID = 1L;
public BootstrapRecord(HoodieRecord<T> record) {
super(record);
}
}

View File

@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bootstrap.aggregate;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
/**
* Aggregate accumulator.
*/
public class BootstrapAccumulator implements Serializable {
private static final long serialVersionUID = 1L;
private final Set<Integer> readyTaskSet;
public BootstrapAccumulator() {
this.readyTaskSet = new HashSet<>();
}
public void update(int taskId) {
readyTaskSet.add(taskId);
}
public int readyTaskNum() {
return readyTaskSet.size();
}
public BootstrapAccumulator merge(BootstrapAccumulator acc) {
if (acc == null) {
return this;
}
readyTaskSet.addAll(acc.readyTaskSet);
return this;
}
}

View File

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bootstrap.aggregate;
import org.apache.flink.api.common.functions.AggregateFunction;
/**
* Aggregate Function that accumulates the loaded task number of function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}.
*/
public class BootstrapAggFunc implements AggregateFunction<Integer, BootstrapAccumulator, Integer> {
public static final String NAME = BootstrapAggFunc.class.getSimpleName();
@Override
public BootstrapAccumulator createAccumulator() {
return new BootstrapAccumulator();
}
@Override
public BootstrapAccumulator add(Integer taskId, BootstrapAccumulator bootstrapAccumulator) {
bootstrapAccumulator.update(taskId);
return bootstrapAccumulator;
}
@Override
public Integer getResult(BootstrapAccumulator bootstrapAccumulator) {
return bootstrapAccumulator.readyTaskNum();
}
@Override
public BootstrapAccumulator merge(BootstrapAccumulator bootstrapAccumulator, BootstrapAccumulator acc) {
return bootstrapAccumulator.merge(acc);
}
}

View File

@@ -22,43 +22,32 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.BaseAvroPayload;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.sink.bootstrap.BootstrapRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
/**
@@ -80,8 +69,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
extends KeyedProcessFunction<K, I, O>
implements CheckpointedFunction, CheckpointListener {
private static final Logger LOG = LoggerFactory.getLogger(BucketAssignFunction.class);
private BucketAssignOperator.Context context;
/**
@@ -108,13 +95,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
private final boolean isChangingRecords;
private final boolean bootstrapIndex;
/**
* State to book-keep which partition is loaded into the index state {@code indexState}.
*/
private MapState<String, Integer> partitionLoadState;
/**
* Used to create DELETE payload.
*/
@@ -130,7 +110,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
}
@@ -168,31 +147,29 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
}
indexState = context.getKeyedStateStore().getState(indexStateDesc);
if (bootstrapIndex) {
MapStateDescriptor<String, Integer> partitionLoadStateDesc =
new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
}
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (value instanceof BootstrapRecord) {
BootstrapRecord bootstrapRecord = (BootstrapRecord) value;
this.context.setCurrentKey(bootstrapRecord.getRecordKey());
this.indexState.update((HoodieRecordGlobalLocation) bootstrapRecord.getCurrentLocation());
} else {
processRecord((HoodieRecord<?>) value, out);
}
}
@SuppressWarnings("unchecked")
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
HoodieRecord<?> record = (HoodieRecord<?>) value;
final HoodieKey hoodieKey = record.getKey();
final String recordKey = hoodieKey.getRecordKey();
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
// The dataset may be huge, thus the processing would block for long,
// disabled by default.
if (bootstrapIndex && !partitionLoadState.contains(partitionPath)) {
// If the partition records are never loaded, load the records first.
loadRecords(partitionPath, recordKey);
}
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
HoodieRecordGlobalLocation oldLoc = indexState.value();
@@ -216,6 +193,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
}
} else {
location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey);
if (isChangingRecords) {
updateIndexState(partitionPath, location);
}
@@ -264,62 +242,8 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
this.context = context;
}
/**
* Load all the indices of give partition path into the backup state.
*
* @param partitionPath The partition path
* @param curRecordKey The current record key
* @throws Exception when error occurs for state update
*/
private void loadRecords(String partitionPath, String curRecordKey) throws Exception {
LOG.info("Start loading records under partition {} into the index state", partitionPath);
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat());
List<HoodieBaseFile> latestBaseFiles =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
for (HoodieBaseFile baseFile : latestBaseFiles) {
final List<HoodieKey> hoodieKeys;
try {
hoodieKeys =
fileUtils.fetchRecordKeyPartitionPath(hadoopConf, new Path(baseFile.getPath()));
} catch (Exception e) {
// in case there was some empty parquet file when the pipeline
// crushes exceptionally.
LOG.error("Error when loading record keys from file: {}", baseFile);
continue;
}
hoodieKeys.forEach(hoodieKey -> {
try {
// Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
// the input records is shuffled by record key
boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
if (shouldLoad) {
this.context.setCurrentKey(hoodieKey.getRecordKey());
this.indexState.update(
new HoodieRecordGlobalLocation(
hoodieKey.getPartitionPath(),
baseFile.getCommitTime(),
baseFile.getFileId()));
}
} catch (Exception e) {
LOG.error("Error when putting record keys into the state from file: {}", baseFile);
}
});
}
// recover the currentKey
this.context.setCurrentKey(curRecordKey);
// Mark the partition path as loaded.
partitionLoadState.put(partitionPath, 0);
LOG.info("Finish loading records under partition {} into the index state", partitionPath);
}
@VisibleForTesting
public void clearIndexState() {
this.partitionLoadState.clear();
this.indexState.clear();
}
}

View File

@@ -22,11 +22,12 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -50,6 +52,8 @@ import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
*
* note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from.
*/
public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception {
@@ -82,7 +86,7 @@ public class HoodieFlinkStreamer {
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
DataStream<Object> pipeline = env.addSource(new FlinkKafkaConsumer<>(
DataStream<HoodieRecord> hoodieDataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
@@ -93,8 +97,15 @@ public class HoodieFlinkStreamer {
), kafkaProps))
.name("kafka_source")
.uid("uid_kafka_source")
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
// Key-by record key, to avoid multiple subtasks write to a partition at the same time
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
}
DataStream<Object> pipeline = hoodieDataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",

View File

@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.BootstrapRecord;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -38,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -74,10 +77,22 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
DataStream<Object> pipeline = dataStream
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
DataStream<HoodieRecord> hoodieDataStream = dataStream
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
// TODO: This is a very time-consuming operation, will optimization
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
}
DataStream<Object> pipeline = hoodieDataStream
.transform("index_bootstrap",
TypeInformation.of(BootstrapRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.keyBy(BootstrapRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -49,6 +50,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -109,14 +111,22 @@ public class StreamWriteITCase extends TestLogger {
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();
DataStream<Object> dataStream = execEnv
DataStream<HoodieRecord> hoodieDataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
}
DataStream<Object> pipeline = hoodieDataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
@@ -128,7 +138,7 @@ public class StreamWriteITCase extends TestLogger {
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write");
execEnv.addOperator(dataStream.getTransformation());
execEnv.addOperator(pipeline.getTransformation());
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
// wait for the streaming job to finish
@@ -171,12 +181,20 @@ public class StreamWriteITCase extends TestLogger {
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");
DataStream<Object> pipeline = execEnv
DataStream<HoodieRecord> hoodieDataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
}
DataStream<Object> pipeline = hoodieDataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(

View File

@@ -576,10 +576,6 @@ public class TestWriteCopyOnWrite {
@Test
public void testIndexStateBootstrap() throws Exception {
// reset the config option
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// open the function and ingest data
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_INSERT) {
@@ -598,13 +594,21 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointComplete(1);
// Mark the index state as not fully loaded to trigger re-load from the filesystem.
funcWrapper.clearIndexState();
// the data is not flushed yet
checkWrittenData(tempFile, EXPECTED1);
// reset the config option
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
// upsert another data buffer
funcWrapper.openFunction();
for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
funcWrapper.invoke(rowData);
}
assertTrue(funcWrapper.isAlreadyBootstrap());
checkIndexLoaded(
new HoodieKey("id1", "par1"),
new HoodieKey("id2", "par1"),
@@ -613,11 +617,13 @@ public class TestWriteCopyOnWrite {
new HoodieKey("id5", "par3"),
new HoodieKey("id6", "par3"),
new HoodieKey("id7", "par4"),
new HoodieKey("id8", "par4"));
// the data is not flushed yet
checkWrittenData(tempFile, EXPECTED1);
new HoodieKey("id8", "par4"),
new HoodieKey("id9", "par3"),
new HoodieKey("id10", "par4"),
new HoodieKey("id11", "par4"));
// this triggers the data write and event send
funcWrapper.checkpointFunction(2);
funcWrapper.checkpointFunction(1);
String instant = funcWrapper.getWriteClient()
.getLastPendingInstant(getTableType());
@@ -631,7 +637,7 @@ public class TestWriteCopyOnWrite {
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant);
funcWrapper.checkpointComplete(2);
funcWrapper.checkpointComplete(1);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2);

View File

@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.BootstrapRecord;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -46,6 +48,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -68,6 +71,8 @@ public class StreamWriteFunctionWrapper<I> {
/** Function that converts row data to HoodieRecord. */
private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
/** Function that load index in state. */
private BootstrapFunction<HoodieRecord<?>, HoodieRecord<?>> bootstrapFunction;
/** Function that assigns bucket ID. */
private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
/** BucketAssignOperator context. **/
@@ -94,6 +99,8 @@ public class StreamWriteFunctionWrapper<I> {
// one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
this.functionInitializationContext = new MockFunctionInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignOperatorContext = new MockBucketAssignOperatorContext();
@@ -106,6 +113,13 @@ public class StreamWriteFunctionWrapper<I> {
toHoodieFunction.setRuntimeContext(runtimeContext);
toHoodieFunction.open(conf);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapFunction = new BootstrapFunction<>(conf);
bootstrapFunction.setRuntimeContext(runtimeContext);
bootstrapFunction.initializeState(this.functionInitializationContext);
bootstrapFunction.open(conf);
}
bucketAssignerFunction = new BucketAssignFunction<>(conf);
bucketAssignerFunction.setRuntimeContext(runtimeContext);
bucketAssignerFunction.open(conf);
@@ -136,6 +150,32 @@ public class StreamWriteFunctionWrapper<I> {
}
};
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
List<HoodieRecord<?>> bootstrapRecords = new ArrayList<>();
Collector<HoodieRecord<?>> bootstrapCollector = new Collector<HoodieRecord<?>>() {
@Override
public void collect(HoodieRecord<?> record) {
if (record instanceof BootstrapRecord) {
bootstrapRecords.add(record);
}
}
@Override
public void close() {
}
};
bootstrapFunction.processElement(hoodieRecord, null, bootstrapCollector);
for (HoodieRecord bootstrapRecord : bootstrapRecords) {
bucketAssignerFunction.processElement(bootstrapRecord, null, collector);
}
this.bucketAssignOperatorContext.setCurrentKey(hoodieRecord.getRecordKey());
}
bucketAssignerFunction.processElement(hoodieRecord, null, collector);
writeFunction.processElement(hoodieRecords[0], null, null);
}
@@ -210,6 +250,10 @@ public class StreamWriteFunctionWrapper<I> {
return this.writeFunction.isConfirming();
}
public boolean isAlreadyBootstrap() {
return this.bootstrapFunction.isAlreadyBootstrap();
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------