1
0

[HUDI-2215] Add rateLimiter when Flink writes to hudi. (#3338)

Co-authored-by: wangminchao <wangminchao@asinking.com>
This commit is contained in:
mincwang
2021-07-28 08:23:23 +08:00
committed by GitHub
parent 60758b36ea
commit 00cd35f90a
6 changed files with 118 additions and 5 deletions

View File

@@ -312,6 +312,12 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB");
public static final ConfigOption<Long> WRITE_RATE_LIMIT = ConfigOptions
.key("write.rate.limit")
.longType()
.defaultValue(0L) // default no limit
.withDescription("Write record rate limit per second to prevent traffic jitter and improve stability, default 0 (no limit)");
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch.size")
.doubleType()

View File

@@ -43,7 +43,7 @@ import static org.apache.hudi.util.StreamerUtil.flinkConf2TypedProperties;
/**
* Function that transforms RowData to HoodieRecord.
*/
public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?>>
public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord>
extends RichMapFunction<I, O> {
/**
* Row type of the input.

View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.transform;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
/**
* Function that transforms RowData to a HoodieRecord with RateLimit.
*/
public class RowDataToHoodieFunctionWithRateLimit<I extends RowData, O extends HoodieRecord>
extends RowDataToHoodieFunction<I, O> {
/**
* Total rate limit per second for this job.
*/
private final double totalLimit;
/**
* Rate limit per second for per task.
*/
private transient RateLimiter rateLimiter;
public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration config) {
super(rowType, config);
this.totalLimit = config.getLong(FlinkOptions.WRITE_RATE_LIMIT);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.rateLimiter =
RateLimiter.create(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
}
@Override
public O map(I i) throws Exception {
rateLimiter.acquire();
return super.map(i);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.transform;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
/**
* Utilities for {@link RowDataToHoodieFunction}.
*/
public abstract class RowDataToHoodieFunctions {
private RowDataToHoodieFunctions() {}
/**
* Creates a {@link RowDataToHoodieFunction} instance based on the given configuration.
*/
@SuppressWarnings("rawtypes")
public static RowDataToHoodieFunction<RowData, HoodieRecord> create(RowType rowType, Configuration conf) {
if (conf.getLong(FlinkOptions.WRITE_RATE_LIMIT) > 0) {
return new RowDataToHoodieFunctionWithRateLimit<>(rowType, conf);
} else {
return new RowDataToHoodieFunction<>(rowType, conf);
}
}
}

View File

@@ -31,7 +31,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
@@ -112,7 +112,8 @@ public class HoodieFlinkStreamer {
}
}
DataStream<HoodieRecord> dataStream2 = dataStream.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
DataStream<HoodieRecord> dataStream2 = dataStream
.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
dataStream2 = dataStream2.rebalance()

View File

@@ -32,7 +32,7 @@ import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -105,7 +105,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
DataStream<HoodieRecord> dataStream1 = dataStream
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
// bootstrap index
// TODO: This is a very time-consuming operation, will optimization