diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 95e21812a..3268458ff 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -312,6 +312,12 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Maximum memory in MB for a write task, when the threshold hits,\n" + "it flushes the max size data bucket to avoid OOM, default 1GB"); + public static final ConfigOption WRITE_RATE_LIMIT = ConfigOptions + .key("write.rate.limit") + .longType() + .defaultValue(0L) // default no limit + .withDescription("Write record rate limit per second to prevent traffic jitter and improve stability, default 0 (no limit)"); + public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions .key("write.batch.size") .doubleType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java index 505a8e2f1..f57571377 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java @@ -43,7 +43,7 @@ import static org.apache.hudi.util.StreamerUtil.flinkConf2TypedProperties; /** * Function that transforms RowData to HoodieRecord. */ -public class RowDataToHoodieFunction> +public class RowDataToHoodieFunction extends RichMapFunction { /** * Row type of the input. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java new file mode 100644 index 000000000..4526c6ff9 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.transform; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.configuration.FlinkOptions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +/** + * Function that transforms RowData to a HoodieRecord with RateLimit. + */ +public class RowDataToHoodieFunctionWithRateLimit + extends RowDataToHoodieFunction { + /** + * Total rate limit per second for this job. + */ + private final double totalLimit; + + /** + * Rate limit per second for per task. + */ + private transient RateLimiter rateLimiter; + + public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration config) { + super(rowType, config); + this.totalLimit = config.getLong(FlinkOptions.WRITE_RATE_LIMIT); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.rateLimiter = + RateLimiter.create(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public O map(I i) throws Exception { + rateLimiter.acquire(); + return super.map(i); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java new file mode 100644 index 000000000..5b811a8b0 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctions.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.transform; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.configuration.FlinkOptions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +/** + * Utilities for {@link RowDataToHoodieFunction}. + */ +public abstract class RowDataToHoodieFunctions { + private RowDataToHoodieFunctions() {} + + /** + * Creates a {@link RowDataToHoodieFunction} instance based on the given configuration. + */ + @SuppressWarnings("rawtypes") + public static RowDataToHoodieFunction create(RowType rowType, Configuration conf) { + if (conf.getLong(FlinkOptions.WRITE_RATE_LIMIT) > 0) { + return new RowDataToHoodieFunctionWithRateLimit<>(rowType, conf); + } else { + return new RowDataToHoodieFunction<>(rowType, conf); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index b2b6b54d9..5b0ba4f5a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -31,7 +31,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; -import org.apache.hudi.sink.transform.RowDataToHoodieFunction; +import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.StreamerUtil; @@ -112,7 +112,8 @@ public class HoodieFlinkStreamer { } } - DataStream dataStream2 = dataStream.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + DataStream dataStream2 = dataStream + .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { dataStream2 = dataStream2.rebalance() diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 2cdd27e8f..37d10684f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -32,7 +32,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; -import org.apache.hudi.sink.transform.RowDataToHoodieFunction; +import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -105,7 +105,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); DataStream dataStream1 = dataStream - .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + .map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); // bootstrap index // TODO: This is a very time-consuming operation, will optimization