diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java index 5ecc7e57f..86acc1c1d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java @@ -156,7 +156,7 @@ public class HoodieRowData implements RowData { @Override public StringData getString(int ordinal) { - if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) { + if (ordinal < metaColumnsNum) { return StringData.fromString(getMetaColumnVal(ordinal)); } return row.getString(ordinal - metaColumnsNum); diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index abc67b0ed..44aeb9036 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -350,21 +350,23 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n" + "waits for the instant commit success, only for internal use"); - public static final ConfigOption SINK_SHUFFLE_BY_PARTITION = ConfigOptions - .key("sink.shuffle-by-partition.enable") + public static final ConfigOption WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions + .key("write.bulk_insert.shuffle_by_partition") .booleanType() - .defaultValue(false) - .withDescription( - "The option to enable shuffle data by dynamic partition fields in sink" - + " phase, this can greatly reduce the number of file for filesystem sink but may" - + " lead data skew."); + .defaultValue(true) + .withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true"); - // this is only for internal use - public static final ConfigOption WRITE_BULK_INSERT_PARTITION_SORTED = ConfigOptions - .key("write.bulk_insert.partition.sorted") + public static final ConfigOption WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions + .key("write.bulk_insert.sort_by_partition") .booleanType() - .defaultValue(false) - .withDescription("Whether the bulk insert write task input records are already sorted by the partition path"); + .defaultValue(true) + .withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true"); + + public static final ConfigOption WRITE_SORT_MEMORY = ConfigOptions + .key("write.sort.memory") + .intType() + .defaultValue(128) + .withDescription("Sort memory in MB, default 128MB"); // ------------------------------------------------------------------------ // Compaction Options diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index 50590c0ee..0e3ecb16f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -72,7 +72,7 @@ public class BulkInsertWriterHelper { this.taskId = taskId; this.taskEpochId = taskEpochId; this.rowType = addMetadataFields(rowType); // patch up with metadata fields - this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED); + this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION); this.fileIdPrefix = UUID.randomUUID().toString(); this.keyGen = RowDataKeyGen.instance(conf, rowType); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java index 2c44c137c..5b73f91d0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java @@ -29,13 +29,15 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import java.io.Serializable; import java.util.Arrays; import java.util.List; /** * Key generator for {@link RowData}. */ -public class RowDataKeyGen { +public class RowDataKeyGen implements Serializable { + private static final long serialVersionUID = 1L; // reference: NonpartitionedAvroKeyGenerator private static final String EMPTY_PARTITION = ""; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java new file mode 100644 index 000000000..7be2b410d --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperator.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bulk.sort; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter; +import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; +import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; +import org.apache.flink.table.runtime.util.StreamRecordCollector; +import org.apache.flink.util.MutableObjectIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Operator for batch sort. + * + *

Copied from org.apache.flink.table.runtime.operators.sort.SortOperator to change the annotation. + */ +public class SortOperator extends TableStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private static final Logger LOG = LoggerFactory.getLogger(SortOperator.class); + + private GeneratedNormalizedKeyComputer gComputer; + private GeneratedRecordComparator gComparator; + + private transient BinaryExternalSorter sorter; + private transient StreamRecordCollector collector; + private transient BinaryRowDataSerializer binarySerializer; + + public SortOperator( + GeneratedNormalizedKeyComputer gComputer, GeneratedRecordComparator gComparator) { + this.gComputer = gComputer; + this.gComparator = gComparator; + } + + @Override + public void open() throws Exception { + super.open(); + LOG.info("Opening SortOperator"); + + ClassLoader cl = getContainingTask().getUserCodeClassLoader(); + + AbstractRowDataSerializer inputSerializer = + (AbstractRowDataSerializer) + getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()); + this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity()); + + NormalizedKeyComputer computer = gComputer.newInstance(cl); + RecordComparator comparator = gComparator.newInstance(cl); + gComputer = null; + gComparator = null; + + MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager(); + this.sorter = + new BinaryExternalSorter( + this.getContainingTask(), + memManager, + computeMemorySize(), + this.getContainingTask().getEnvironment().getIOManager(), + inputSerializer, + binarySerializer, + computer, + comparator, + getContainingTask().getJobConfiguration()); + this.sorter.startThreads(); + + collector = new StreamRecordCollector<>(output); + + // register the the metrics. + getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge) sorter::getUsedMemoryInBytes); + getMetricGroup().gauge("numSpillFiles", (Gauge) sorter::getNumSpillFiles); + getMetricGroup().gauge("spillInBytes", (Gauge) sorter::getSpillInBytes); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + this.sorter.write(element.getValue()); + } + + @Override + public void endInput() throws Exception { + BinaryRowData row = binarySerializer.createInstance(); + MutableObjectIterator iterator = sorter.getIterator(); + while ((row = iterator.next(row)) != null) { + collector.collect(row); + } + } + + @Override + public void close() throws Exception { + LOG.info("Closing SortOperator"); + super.close(); + if (sorter != null) { + sorter.close(); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java new file mode 100644 index 000000000..d17866175 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/sort/SortOperatorGen.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bulk.sort; + +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.IntStream; + +/** + * Tools to generate the sort operator. + */ +public class SortOperatorGen { + private final int[] sortIndices; + private final LogicalType[] sortTypes; + private final TableConfig tableConfig = new TableConfig(); + + public SortOperatorGen(RowType rowType, String[] sortFields) { + this.sortIndices = Arrays.stream(sortFields).mapToInt(rowType::getFieldIndex).toArray(); + List fields = rowType.getFields(); + this.sortTypes = Arrays.stream(sortIndices).mapToObj(idx -> fields.get(idx).getType()).toArray(LogicalType[]::new); + } + + public OneInputStreamOperator createSortOperator() { + SortCodeGenerator codeGen = createSortCodeGenerator(); + return new SortOperator( + codeGen.generateNormalizedKeyComputer("SortComputer"), + codeGen.generateRecordComparator("SortComparator")); + } + + private SortCodeGenerator createSortCodeGenerator() { + boolean[] padBooleans = new boolean[sortIndices.length]; + IntStream.range(0, sortIndices.length).forEach(i -> padBooleans[i] = true); + return new SortCodeGenerator(tableConfig, sortIndices, sortTypes, padBooleans, padBooleans); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 8f3ddbc84..49cd9e7b6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -25,6 +25,8 @@ import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.bootstrap.BootstrapFunction; import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; +import org.apache.hudi.sink.bulk.RowDataKeyGen; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; @@ -33,6 +35,7 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; +import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -47,6 +50,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode$; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; @@ -60,18 +64,16 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, private final Configuration conf; private final TableSchema schema; private boolean overwrite = false; - private boolean supportsGrouping = false; public HoodieTableSink(Configuration conf, TableSchema schema) { this.conf = conf; this.schema = schema; } - public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite, boolean supportsGrouping) { + public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite) { this.conf = conf; this.schema = schema; this.overwrite = overwrite; - this.supportsGrouping = supportsGrouping; } @Override @@ -88,18 +90,32 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // bulk_insert mode final String writeOperation = this.conf.get(FlinkOptions.OPERATION); if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { - this.conf.set(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED, this.supportsGrouping); - if (this.supportsGrouping) { - // if partition grouping is true, the input records would be sorted by the partition - // path, we need to chain the SORT operator and writer operator to keep the record - // sequence - dataStream.getTransformation() - .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); - } BulkInsertWriteOperator.OperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType); - return dataStream.transform("hoodie_bulk_insert_write", - TypeInformation.of(Object.class), - operatorFactory) + + final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf); + if (partitionFields.length > 0) { + RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) { + + // shuffle by partition keys + dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath); + } + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) { + SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields); + // sort by partition keys + dataStream = dataStream + .transform("partition_key_sorter", + TypeInformation.of(RowData.class), + sortOperatorGen.createSortOperator()) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + } + } + return dataStream + .transform("hoodie_bulk_insert_write", + TypeInformation.of(Object.class), + operatorFactory) // follow the parallelism of upstream operators to avoid shuffle .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .addSink(new CleanFunction<>(conf)) @@ -179,7 +195,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, @Override public DynamicTableSink copy() { - return new HoodieTableSink(this.conf, this.schema, this.overwrite, this.supportsGrouping); + return new HoodieTableSink(this.conf, this.schema, this.overwrite); } @Override @@ -205,10 +221,4 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, public void applyOverwrite(boolean b) { this.overwrite = b; } - - @Override - public boolean requiresPartitionGrouping(boolean supportsGrouping) { - this.supportsGrouping = supportsGrouping; - return supportsGrouping; - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 91a2e472e..a01cad498 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -324,5 +324,4 @@ public class StreamerUtil { throw new IOException("Could not load transformer class(es) " + classNames, e); } } - } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index c1813dca4..957b9b891 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -600,7 +600,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.OPERATION.key(), "bulk_insert"); - options.put(FlinkOptions.SINK_SHUFFLE_BY_PARTITION.key(), "true"); + options.put(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION.key(), "true"); if (hiveStylePartitioning) { options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); }