1
0

[HUDI-2254] Builtin sort operator for flink bulk insert (#3372)

This commit is contained in:
Danny Chan
2021-07-30 16:58:11 +08:00
committed by GitHub
parent 8b19ec9ca0
commit c4e45a0010
9 changed files with 235 additions and 38 deletions

View File

@@ -156,7 +156,7 @@ public class HoodieRowData implements RowData {
@Override @Override
public StringData getString(int ordinal) { public StringData getString(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) { if (ordinal < metaColumnsNum) {
return StringData.fromString(getMetaColumnVal(ordinal)); return StringData.fromString(getMetaColumnVal(ordinal));
} }
return row.getString(ordinal - metaColumnsNum); return row.getString(ordinal - metaColumnsNum);

View File

@@ -350,21 +350,23 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n" .withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
+ "waits for the instant commit success, only for internal use"); + "waits for the instant commit success, only for internal use");
public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION = ConfigOptions public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions
.key("sink.shuffle-by-partition.enable") .key("write.bulk_insert.shuffle_by_partition")
.booleanType() .booleanType()
.defaultValue(false) .defaultValue(true)
.withDescription( .withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true");
"The option to enable shuffle data by dynamic partition fields in sink"
+ " phase, this can greatly reduce the number of file for filesystem sink but may"
+ " lead data skew.");
// this is only for internal use public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_PARTITION_SORTED = ConfigOptions .key("write.bulk_insert.sort_by_partition")
.key("write.bulk_insert.partition.sorted")
.booleanType() .booleanType()
.defaultValue(false) .defaultValue(true)
.withDescription("Whether the bulk insert write task input records are already sorted by the partition path"); .withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true");
public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
.key("write.sort.memory")
.intType()
.defaultValue(128)
.withDescription("Sort memory in MB, default 128MB");
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Compaction Options // Compaction Options

View File

@@ -72,7 +72,7 @@ public class BulkInsertWriterHelper {
this.taskId = taskId; this.taskId = taskId;
this.taskEpochId = taskEpochId; this.taskEpochId = taskEpochId;
this.rowType = addMetadataFields(rowType); // patch up with metadata fields this.rowType = addMetadataFields(rowType); // patch up with metadata fields
this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED); this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION);
this.fileIdPrefix = UUID.randomUUID().toString(); this.fileIdPrefix = UUID.randomUUID().toString();
this.keyGen = RowDataKeyGen.instance(conf, rowType); this.keyGen = RowDataKeyGen.instance(conf, rowType);
} }

View File

@@ -29,13 +29,15 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
* Key generator for {@link RowData}. * Key generator for {@link RowData}.
*/ */
public class RowDataKeyGen { public class RowDataKeyGen implements Serializable {
private static final long serialVersionUID = 1L;
// reference: NonpartitionedAvroKeyGenerator // reference: NonpartitionedAvroKeyGenerator
private static final String EMPTY_PARTITION = ""; private static final String EMPTY_PARTITION = "";

View File

@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bulk.sort;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Operator for batch sort.
*
* <p>Copied from org.apache.flink.table.runtime.operators.sort.SortOperator to change the annotation.
*/
public class SortOperator extends TableStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
private static final Logger LOG = LoggerFactory.getLogger(SortOperator.class);
private GeneratedNormalizedKeyComputer gComputer;
private GeneratedRecordComparator gComparator;
private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<RowData> collector;
private transient BinaryRowDataSerializer binarySerializer;
public SortOperator(
GeneratedNormalizedKeyComputer gComputer, GeneratedRecordComparator gComparator) {
this.gComputer = gComputer;
this.gComparator = gComparator;
}
@Override
public void open() throws Exception {
super.open();
LOG.info("Opening SortOperator");
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
AbstractRowDataSerializer inputSerializer =
(AbstractRowDataSerializer)
getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());
NormalizedKeyComputer computer = gComputer.newInstance(cl);
RecordComparator comparator = gComparator.newInstance(cl);
gComputer = null;
gComparator = null;
MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
this.sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
computeMemorySize(),
this.getContainingTask().getEnvironment().getIOManager(),
inputSerializer,
binarySerializer,
computer,
comparator,
getContainingTask().getJobConfiguration());
this.sorter.startThreads();
collector = new StreamRecordCollector<>(output);
// register the the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
this.sorter.write(element.getValue());
}
@Override
public void endInput() throws Exception {
BinaryRowData row = binarySerializer.createInstance();
MutableObjectIterator<BinaryRowData> iterator = sorter.getIterator();
while ((row = iterator.next(row)) != null) {
collector.collect(row);
}
}
@Override
public void close() throws Exception {
LOG.info("Closing SortOperator");
super.close();
if (sorter != null) {
sorter.close();
}
}
}

View File

@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bulk.sort;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
/**
* Tools to generate the sort operator.
*/
public class SortOperatorGen {
private final int[] sortIndices;
private final LogicalType[] sortTypes;
private final TableConfig tableConfig = new TableConfig();
public SortOperatorGen(RowType rowType, String[] sortFields) {
this.sortIndices = Arrays.stream(sortFields).mapToInt(rowType::getFieldIndex).toArray();
List<RowType.RowField> fields = rowType.getFields();
this.sortTypes = Arrays.stream(sortIndices).mapToObj(idx -> fields.get(idx).getType()).toArray(LogicalType[]::new);
}
public OneInputStreamOperator<RowData, RowData> createSortOperator() {
SortCodeGenerator codeGen = createSortCodeGenerator();
return new SortOperator(
codeGen.generateNormalizedKeyComputer("SortComputer"),
codeGen.generateRecordComparator("SortComparator"));
}
private SortCodeGenerator createSortCodeGenerator() {
boolean[] padBooleans = new boolean[sortIndices.length];
IntStream.range(0, sortIndices.length).forEach(i -> padBooleans[i] = true);
return new SortCodeGenerator(tableConfig, sortIndices, sortTypes, padBooleans, padBooleans);
}
}

View File

@@ -25,6 +25,8 @@ import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction; import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -33,6 +35,7 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
@@ -47,6 +50,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
@@ -60,18 +64,16 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
private final Configuration conf; private final Configuration conf;
private final TableSchema schema; private final TableSchema schema;
private boolean overwrite = false; private boolean overwrite = false;
private boolean supportsGrouping = false;
public HoodieTableSink(Configuration conf, TableSchema schema) { public HoodieTableSink(Configuration conf, TableSchema schema) {
this.conf = conf; this.conf = conf;
this.schema = schema; this.schema = schema;
} }
public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite, boolean supportsGrouping) { public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite) {
this.conf = conf; this.conf = conf;
this.schema = schema; this.schema = schema;
this.overwrite = overwrite; this.overwrite = overwrite;
this.supportsGrouping = supportsGrouping;
} }
@Override @Override
@@ -88,16 +90,30 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// bulk_insert mode // bulk_insert mode
final String writeOperation = this.conf.get(FlinkOptions.OPERATION); final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
this.conf.set(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED, this.supportsGrouping);
if (this.supportsGrouping) {
// if partition grouping is true, the input records would be sorted by the partition
// path, we need to chain the SORT operator and writer operator to keep the record
// sequence
dataStream.getTransformation()
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType); BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
return dataStream.transform("hoodie_bulk_insert_write",
final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf);
if (partitionFields.length > 0) {
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
// shuffle by partition keys
dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
}
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
// sort by partition keys
dataStream = dataStream
.transform("partition_key_sorter",
TypeInformation.of(RowData.class),
sortOperatorGen.createSortOperator())
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
}
return dataStream
.transform("hoodie_bulk_insert_write",
TypeInformation.of(Object.class), TypeInformation.of(Object.class),
operatorFactory) operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle // follow the parallelism of upstream operators to avoid shuffle
@@ -179,7 +195,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
@Override @Override
public DynamicTableSink copy() { public DynamicTableSink copy() {
return new HoodieTableSink(this.conf, this.schema, this.overwrite, this.supportsGrouping); return new HoodieTableSink(this.conf, this.schema, this.overwrite);
} }
@Override @Override
@@ -205,10 +221,4 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
public void applyOverwrite(boolean b) { public void applyOverwrite(boolean b) {
this.overwrite = b; this.overwrite = b;
} }
@Override
public boolean requiresPartitionGrouping(boolean supportsGrouping) {
this.supportsGrouping = supportsGrouping;
return supportsGrouping;
}
} }

View File

@@ -324,5 +324,4 @@ public class StreamerUtil {
throw new IOException("Could not load transformer class(es) " + classNames, e); throw new IOException("Could not load transformer class(es) " + classNames, e);
} }
} }
} }

View File

@@ -600,7 +600,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
Map<String, String> options = new HashMap<>(); Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.OPERATION.key(), "bulk_insert"); options.put(FlinkOptions.OPERATION.key(), "bulk_insert");
options.put(FlinkOptions.SINK_SHUFFLE_BY_PARTITION.key(), "true"); options.put(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION.key(), "true");
if (hiveStylePartitioning) { if (hiveStylePartitioning) {
options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
} }