[HUDI-2316] Support Flink batch upsert (#3494)
This commit is contained in:
@@ -72,12 +72,12 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class);
|
||||
|
||||
private HoodieTable<?, ?, ?, ?> hoodieTable;
|
||||
protected HoodieTable<?, ?, ?, ?> hoodieTable;
|
||||
|
||||
private final Configuration conf;
|
||||
protected final Configuration conf;
|
||||
|
||||
private transient org.apache.hadoop.conf.Configuration hadoopConf;
|
||||
private transient HoodieWriteConfig writeConfig;
|
||||
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
|
||||
protected transient HoodieWriteConfig writeConfig;
|
||||
|
||||
private GlobalAggregateManager aggregateManager;
|
||||
|
||||
@@ -153,7 +153,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
|
||||
* @param partitionPath The partition path
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void loadRecords(String partitionPath, Collector<O> out) throws Exception {
|
||||
protected void loadRecords(String partitionPath, Collector<O> out) throws Exception {
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sink.bootstrap.batch;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The function to load specify partition index from existing hoodieTable.
|
||||
*/
|
||||
public class BatchBootstrapFunction<I, O extends HoodieRecord>
|
||||
extends BootstrapFunction<I, O> {
|
||||
|
||||
private Set<String> partitionPathSet;
|
||||
private boolean haveSuccessfulCommits;
|
||||
|
||||
public BatchBootstrapFunction(Configuration conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
this.partitionPathSet = new HashSet<>();
|
||||
this.haveSuccessfulCommits = StreamerUtil.haveSuccessfulCommits(hoodieTable.getMetaClient());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(I value, Context context, Collector<O> out) throws Exception {
|
||||
final HoodieRecord record = (HoodieRecord<?>) value;
|
||||
final String partitionPath = record.getKey().getPartitionPath();
|
||||
|
||||
if (haveSuccessfulCommits && !partitionPathSet.contains(partitionPath)) {
|
||||
loadRecords(partitionPath, out);
|
||||
partitionPathSet.add(partitionPath);
|
||||
}
|
||||
|
||||
// send the trigger record
|
||||
out.collect((O) value);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sink.utils;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.sink.CleanFunction;
|
||||
import org.apache.hudi.sink.StreamWriteOperatorFactory;
|
||||
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
|
||||
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapFunction;
|
||||
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
|
||||
import org.apache.hudi.sink.bulk.RowDataKeyGen;
|
||||
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
|
||||
import org.apache.hudi.sink.compact.CompactFunction;
|
||||
import org.apache.hudi.sink.compact.CompactionCommitEvent;
|
||||
import org.apache.hudi.sink.compact.CompactionCommitSink;
|
||||
import org.apache.hudi.sink.compact.CompactionPlanEvent;
|
||||
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
|
||||
import org.apache.hudi.table.format.FilePathUtils;
|
||||
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSink;
|
||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
/**
|
||||
* Utilities to generate all kinds of sub-pipelines.
|
||||
*/
|
||||
public class Pipelines {
|
||||
|
||||
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
||||
BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
|
||||
|
||||
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
|
||||
if (partitionFields.length > 0) {
|
||||
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
|
||||
|
||||
// shuffle by partition keys
|
||||
dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
|
||||
}
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
|
||||
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
|
||||
// sort by partition keys
|
||||
dataStream = dataStream
|
||||
.transform("partition_key_sorter",
|
||||
TypeInformation.of(RowData.class),
|
||||
sortOperatorGen.createSortOperator())
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
|
||||
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
|
||||
}
|
||||
}
|
||||
return dataStream
|
||||
.transform("hoodie_bulk_insert_write",
|
||||
TypeInformation.of(Object.class),
|
||||
operatorFactory)
|
||||
// follow the parallelism of upstream operators to avoid shuffle
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
||||
.addSink(new CleanFunction<>(conf))
|
||||
.setParallelism(1)
|
||||
.name("clean_commits");
|
||||
}
|
||||
|
||||
public static DataStream<HoodieRecord> bootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) {
|
||||
DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream);
|
||||
|
||||
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
||||
dataStream1 = dataStream1.rebalance()
|
||||
.transform(
|
||||
"index_bootstrap",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new ProcessOperator<>(new BootstrapFunction<>(conf)))
|
||||
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
|
||||
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
|
||||
}
|
||||
|
||||
return dataStream1;
|
||||
}
|
||||
|
||||
public static DataStream<HoodieRecord> batchBootstrap(Configuration conf, RowType rowType, int defaultParallelism, DataStream<RowData> dataStream) {
|
||||
// shuffle and sort by partition keys
|
||||
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
|
||||
if (partitionFields.length > 0) {
|
||||
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
|
||||
// shuffle by partition keys
|
||||
dataStream = dataStream
|
||||
.keyBy(rowDataKeyGen::getPartitionPath);
|
||||
}
|
||||
|
||||
return rowDataToHoodieRecord(conf, rowType, dataStream)
|
||||
.transform(
|
||||
"batch_index_bootstrap",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new ProcessOperator<>(new BatchBootstrapFunction<>(conf)))
|
||||
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
|
||||
.uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
|
||||
}
|
||||
|
||||
public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
||||
return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
|
||||
}
|
||||
|
||||
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
|
||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
|
||||
return dataStream
|
||||
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
|
||||
.keyBy(HoodieRecord::getRecordKey)
|
||||
.transform(
|
||||
"bucket_assigner",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
|
||||
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
|
||||
// shuffle by fileId(bucket id)
|
||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||
}
|
||||
|
||||
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
|
||||
return dataStream.transform("compact_plan_generate",
|
||||
TypeInformation.of(CompactionPlanEvent.class),
|
||||
new CompactionPlanOperator(conf))
|
||||
.setParallelism(1) // plan generate must be singleton
|
||||
.rebalance()
|
||||
.transform("compact_task",
|
||||
TypeInformation.of(CompactionCommitEvent.class),
|
||||
new ProcessOperator<>(new CompactFunction(conf)))
|
||||
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
|
||||
.addSink(new CompactionCommitSink(conf))
|
||||
.name("compact_commit")
|
||||
.setParallelism(1); // compaction commit should be singleton
|
||||
}
|
||||
|
||||
public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
|
||||
return dataStream.addSink(new CleanFunction<>(conf))
|
||||
.setParallelism(1)
|
||||
.name("clean_commits");
|
||||
}
|
||||
}
|
||||
@@ -21,37 +21,19 @@ package org.apache.hudi.table;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.sink.CleanFunction;
|
||||
import org.apache.hudi.sink.StreamWriteOperatorFactory;
|
||||
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
|
||||
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
|
||||
import org.apache.hudi.sink.bulk.RowDataKeyGen;
|
||||
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
|
||||
import org.apache.hudi.sink.compact.CompactFunction;
|
||||
import org.apache.hudi.sink.compact.CompactionCommitEvent;
|
||||
import org.apache.hudi.sink.compact.CompactionCommitSink;
|
||||
import org.apache.hudi.sink.compact.CompactionPlanEvent;
|
||||
import org.apache.hudi.sink.compact.CompactionPlanOperator;
|
||||
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
|
||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
|
||||
import org.apache.hudi.table.format.FilePathUtils;
|
||||
import org.apache.hudi.sink.utils.Pipelines;
|
||||
import org.apache.hudi.util.ChangelogModes;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||
import org.apache.flink.table.catalog.ResolvedSchema;
|
||||
import org.apache.flink.table.connector.ChangelogMode;
|
||||
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
|
||||
import org.apache.flink.table.connector.sink.DynamicTableSink;
|
||||
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
|
||||
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.util.Map;
|
||||
@@ -90,90 +72,23 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
||||
// bulk_insert mode
|
||||
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
|
||||
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
|
||||
BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
|
||||
|
||||
final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf);
|
||||
if (partitionFields.length > 0) {
|
||||
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
|
||||
|
||||
// shuffle by partition keys
|
||||
dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
|
||||
}
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
|
||||
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
|
||||
// sort by partition keys
|
||||
dataStream = dataStream
|
||||
.transform("partition_key_sorter",
|
||||
TypeInformation.of(RowData.class),
|
||||
sortOperatorGen.createSortOperator())
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
|
||||
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
|
||||
}
|
||||
}
|
||||
return dataStream
|
||||
.transform("hoodie_bulk_insert_write",
|
||||
TypeInformation.of(Object.class),
|
||||
operatorFactory)
|
||||
// follow the parallelism of upstream operators to avoid shuffle
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
||||
.addSink(new CleanFunction<>(conf))
|
||||
.setParallelism(1)
|
||||
.name("clean_commits");
|
||||
return Pipelines.bulkInsert(conf, rowType, dataStream);
|
||||
}
|
||||
|
||||
// stream write
|
||||
// default parallelism
|
||||
int parallelism = dataStream.getExecutionConfig().getParallelism();
|
||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
|
||||
final DataStream<HoodieRecord> dataStream1 = context.isBounded()
|
||||
? Pipelines.batchBootstrap(conf, rowType, parallelism, dataStream)
|
||||
: Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
|
||||
|
||||
DataStream<HoodieRecord> dataStream1 = dataStream
|
||||
.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
|
||||
// write pipeline
|
||||
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, dataStream1);
|
||||
|
||||
// bootstrap index
|
||||
// TODO: This is a very time-consuming operation, will optimization
|
||||
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
||||
dataStream1 = dataStream1.rebalance()
|
||||
.transform(
|
||||
"index_bootstrap",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new ProcessOperator<>(new BootstrapFunction<>(conf)))
|
||||
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
|
||||
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
|
||||
}
|
||||
|
||||
DataStream<Object> pipeline = dataStream1
|
||||
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
|
||||
.keyBy(HoodieRecord::getRecordKey)
|
||||
.transform(
|
||||
"bucket_assigner",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
|
||||
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
|
||||
// shuffle by fileId(bucket id)
|
||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||
// compaction
|
||||
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||
return pipeline.transform("compact_plan_generate",
|
||||
TypeInformation.of(CompactionPlanEvent.class),
|
||||
new CompactionPlanOperator(conf))
|
||||
.setParallelism(1) // plan generate must be singleton
|
||||
.rebalance()
|
||||
.transform("compact_task",
|
||||
TypeInformation.of(CompactionCommitEvent.class),
|
||||
new ProcessOperator<>(new CompactFunction(conf)))
|
||||
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
|
||||
.addSink(new CompactionCommitSink(conf))
|
||||
.name("compact_commit")
|
||||
.setParallelism(1); // compaction commit should be singleton
|
||||
return Pipelines.compact(conf, pipeline);
|
||||
} else {
|
||||
return pipeline.addSink(new CleanFunction<>(conf))
|
||||
.setParallelism(1)
|
||||
.name("clean_commits");
|
||||
return Pipelines.clean(conf, pipeline);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -379,4 +379,8 @@ public class StreamerUtil {
|
||||
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
|
||||
return operationType == WriteOperationType.INSERT && !conf.getBoolean(FlinkOptions.INSERT_DEDUP);
|
||||
}
|
||||
|
||||
public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
|
||||
return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user