1
0

[HUDI-1863] Add rate limiter to Flink writer to avoid OOM for bootstrap (#2891)

This commit is contained in:
Danny Chan
2021-04-29 20:32:10 +08:00
committed by GitHub
parent c9bcb5e33f
commit 6e9c5dd765
7 changed files with 177 additions and 167 deletions

View File

@@ -267,20 +267,20 @@ public class FlinkOptions {
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual write, default is 4");
public static final ConfigOption<Double> WRITE_BUFFER_SIZE = ConfigOptions
.key("write.buffer.size.MB")
.doubleType()
.defaultValue(256D) // 256MB
.withDescription("Total buffer size in MB to flush data into the underneath filesystem, default 256MB");
public static final ConfigOption<Double> WRITE_BUCKET_SIZE = ConfigOptions
.key("write.bucket.size.MB")
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch.size")
.doubleType()
.defaultValue(64D) // 64MB
.withDescription("Bucket size in MB to flush data into the underneath filesystem, default 64MB");
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB");
public static final ConfigOption<Long> WRITE_RATE_LIMIT = ConfigOptions
.key("write.rate.limit")
.longType()
.defaultValue(-1L) // default no limit
.withDescription("Write records rate limit per second to reduce risk of OOM, default -1 (no limit)");
public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions
.key("write.log_block.size.MB")
.key("write.log_block.size")
.intType()
.defaultValue(128)
.withDescription("Max log block size in MB for log file, default 128MB");

View File

@@ -41,21 +41,18 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
/**
* Sink function to write the data to the underneath filesystem.
@@ -63,8 +60,7 @@ import java.util.stream.Collectors;
* <p><h2>Work Flow</h2>
*
* <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
* It flushes(write) the records bucket when the bucket size exceeds the configured threshold {@link FlinkOptions#WRITE_BUCKET_SIZE}
* or the whole data buffer size exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE}
* It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
* or a Flink checkpoint starts. After a batch has been written successfully,
* the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
*
@@ -102,11 +98,6 @@ public class StreamWriteFunction<K, I, O>
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
/**
* Write buffer size detector.
*/
private transient BufferSizeDetector detector;
/**
* Write buffer as buckets for a checkpoint. The key is bucket ID.
*/
@@ -232,7 +223,6 @@ public class StreamWriteFunction<K, I, O>
// -------------------------------------------------------------------------
private void initBuffer() {
this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BUFFER_SIZE));
this.buckets = new LinkedHashMap<>();
}
@@ -259,49 +249,18 @@ public class StreamWriteFunction<K, I, O>
/**
* Data bucket.
*/
private static class DataBucket implements Comparable<DataBucket> {
private static class DataBucket {
private final List<HoodieRecord> records;
private final BucketSizeTracer tracer;
private final BufferSizeDetector detector;
private DataBucket(Double batchSize) {
this.records = new ArrayList<>();
this.tracer = new BucketSizeTracer(batchSize);
this.detector = new BufferSizeDetector(batchSize);
}
public void reset() {
this.records.clear();
this.tracer.reset();
}
@Override
public int compareTo(@NotNull DataBucket other) {
return Double.compare(tracer.threshold, other.tracer.threshold);
}
}
/**
* Tool to detect if to flush out the existing bucket.
*/
private static class BucketSizeTracer {
private final double threshold;
private long totalSize = 0L;
BucketSizeTracer(double bucketSizeMb) {
this.threshold = bucketSizeMb * 1024 * 1024;
}
/**
* Trace the bucket size with given record size,
* returns true if the bucket size exceeds specified threshold.
*/
boolean trace(long recordSize) {
totalSize += recordSize;
return totalSize > this.threshold;
}
void reset() {
this.totalSize = 0L;
this.detector.reset();
}
}
@@ -313,13 +272,13 @@ public class StreamWriteFunction<K, I, O>
private final Random random = new Random(47);
private static final int DENOMINATOR = 100;
private final double threshold;
private final double batchSizeBytes;
private long lastRecordSize = -1L;
private long totalSize = 0L;
BufferSizeDetector(double batchSizeMb) {
this.threshold = batchSizeMb * 1024 * 1024;
this.batchSizeBytes = batchSizeMb * 1024 * 1024;
}
boolean detect(Object record) {
@@ -327,7 +286,7 @@ public class StreamWriteFunction<K, I, O>
lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
}
totalSize += lastRecordSize;
return totalSize > this.threshold;
return totalSize > this.batchSizeBytes;
}
boolean sampling() {
@@ -339,10 +298,6 @@ public class StreamWriteFunction<K, I, O>
this.lastRecordSize = -1L;
this.totalSize = 0L;
}
public void countDown(long bucketSize) {
this.totalSize -= bucketSize;
}
}
/**
@@ -357,49 +312,19 @@ public class StreamWriteFunction<K, I, O>
/**
* Buffers the given record.
*
* <p>Flush the data bucket first if one of the condition meets:
*
* <ul>
* <li>The bucket size is greater than the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}.</li>
* <li>Flush half of the data buckets if the whole buffer size
* exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE}.</li>
* </ul>
* <p>Flush the data bucket first if the bucket records size is greater than
* the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
*
* @param value HoodieRecord
*/
private void bufferRecord(I value) {
boolean flushBuffer = detector.detect(value);
if (flushBuffer) {
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.filter(b -> b.records.size() > 0)
.sorted(Comparator.comparingLong(b -> b.tracer.totalSize))
.collect(Collectors.toList());
// flush half bytes size of buckets to avoid flushing too small buckets
// which cause small files.
long totalSize = detector.totalSize;
long flushedBytes = 0;
for (DataBucket bucket : sortedBuckets) {
final long bucketSize = bucket.tracer.totalSize;
flushBucket(bucket);
detector.countDown(bucketSize);
bucket.reset();
flushedBytes += bucketSize;
if (flushedBytes > detector.totalSize / 2) {
break;
}
}
LOG.info("Flush {} bytes data buckets because the total buffer size {} bytes exceeds the threshold {} bytes",
flushedBytes, totalSize, detector.threshold);
}
final String bucketID = getBucketID(value);
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BUCKET_SIZE)));
boolean flushBucket = bucket.tracer.trace(detector.lastRecordSize);
if (flushBucket) {
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
boolean needFlush = bucket.detector.detect(value);
if (needFlush) {
flushBucket(bucket);
detector.countDown(bucket.tracer.totalSize);
bucket.reset();
}
bucket.records.add((HoodieRecord<?>) value);
@@ -465,7 +390,6 @@ public class StreamWriteFunction<K, I, O>
.build();
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.detector.reset();
this.currentInstant = "";
}
}

View File

@@ -44,6 +44,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Function that transforms RowData to HoodieRecord.
@@ -80,6 +82,12 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
*/
private final Configuration config;
/**
* Rate limit per second for this task.
* The task sleep a little while when the consuming rate exceeds the threshold.
*/
private transient RateLimiter rateLimiter;
public RowDataToHoodieFunction(RowType rowType, Configuration config) {
this.rowType = rowType;
this.config = config;
@@ -92,12 +100,30 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
this.converter = RowDataToAvroConverters.createConverter(this.rowType);
this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
this.payloadCreation = PayloadCreation.instance(config);
long totalLimit = this.config.getLong(FlinkOptions.WRITE_RATE_LIMIT);
if (totalLimit > 0) {
this.rateLimiter = new RateLimiter(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
}
}
@SuppressWarnings("unchecked")
@Override
public O map(I i) throws Exception {
return (O) toHoodieRecord(i);
if (rateLimiter != null) {
final O hoodieRecord;
if (rateLimiter.sampling()) {
long startTime = System.currentTimeMillis();
hoodieRecord = (O) toHoodieRecord(i);
long endTime = System.currentTimeMillis();
rateLimiter.processTime(endTime - startTime);
} else {
hoodieRecord = (O) toHoodieRecord(i);
}
rateLimiter.sleepIfNeeded();
return hoodieRecord;
} else {
return (O) toHoodieRecord(i);
}
}
/**
@@ -165,4 +191,43 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
}
}
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* Tool to decide whether the limit the processing rate.
* Sampling the record to compute the process time with 0.01 percentage.
*/
private static class RateLimiter {
private final Random random = new Random(47);
private static final int DENOMINATOR = 100;
private final long maxProcessTime;
private long processTime = -1L;
private long timeToSleep = -1;
RateLimiter(long rate) {
ValidationUtils.checkArgument(rate > 0, "rate should be positive");
this.maxProcessTime = 1000 / rate;
}
void processTime(long processTime) {
this.processTime = processTime;
this.timeToSleep = maxProcessTime - processTime;
}
boolean sampling() {
// 0.01 sampling percentage
return processTime == -1 || random.nextInt(DENOMINATOR) == 1;
}
void sleepIfNeeded() throws Exception {
if (timeToSleep > 0) {
TimeUnit.MILLISECONDS.sleep(timeToSleep);
}
}
}
}

View File

@@ -80,7 +80,7 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
// them to the executor. This state is used to ensure that only one read task is in that splits queue at a time, so that
// read tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set to RUNNING.
// When there are no more files to read, this will be set to IDLE.
private transient SplitState currentSplitState;
private transient volatile SplitState currentSplitState;
private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService,
MailboxExecutor mailboxExecutor) {
@@ -141,7 +141,7 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
private void enqueueProcessSplits() {
if (currentSplitState == SplitState.IDLE && !splits.isEmpty()) {
currentSplitState = SplitState.RUNNING;
executor.execute(this::processSplits, this.getClass().getSimpleName());
executor.execute(this::processSplits, "process input split");
}
}
@@ -155,8 +155,8 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
// This log is important to indicate the consuming process, there is only one log message for one data bucket.
LOG.info("Processing input split : {}", split);
format.open(split);
try {
format.open(split);
RowData nextElement = null;
while (!format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);