[HUDI-1878] Add max memory option for flink writer task (#2920)
Also removes the rate limiter because it has the similar functionality, modify the create and merge handle cleans the retry files automatically.
This commit is contained in:
@@ -267,18 +267,19 @@ public class FlinkOptions {
|
||||
.defaultValue(4)
|
||||
.withDescription("Parallelism of tasks that do actual write, default is 4");
|
||||
|
||||
public static final ConfigOption<Double> WRITE_TASK_MAX_SIZE = ConfigOptions
|
||||
.key("write.task.max.size")
|
||||
.doubleType()
|
||||
.defaultValue(1024D) // 1GB
|
||||
.withDescription("Maximum memory in MB for a write task, when the threshold hits,\n"
|
||||
+ "it flushes the max size data bucket to avoid OOM, default 1GB");
|
||||
|
||||
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
|
||||
.key("write.batch.size")
|
||||
.doubleType()
|
||||
.defaultValue(64D) // 64MB
|
||||
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB");
|
||||
|
||||
public static final ConfigOption<Long> WRITE_RATE_LIMIT = ConfigOptions
|
||||
.key("write.rate.limit")
|
||||
.longType()
|
||||
.defaultValue(-1L) // default no limit
|
||||
.withDescription("Write records rate limit per second to reduce risk of OOM, default -1 (no limit)");
|
||||
|
||||
public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions
|
||||
.key("write.log_block.size")
|
||||
.intType()
|
||||
|
||||
@@ -33,6 +33,7 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.api.common.state.CheckpointListener;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
@@ -52,6 +53,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Sink function to write the data to the underneath filesystem.
|
||||
@@ -59,7 +61,8 @@ import java.util.function.BiFunction;
|
||||
* <p><h2>Work Flow</h2>
|
||||
*
|
||||
* <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
|
||||
* It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
|
||||
* It flushes(write) the records batch when the batch size exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
|
||||
* or the total buffer size exceeds the configured size {@link FlinkOptions#WRITE_TASK_MAX_SIZE}
|
||||
* or a Flink checkpoint starts. After a batch has been written successfully,
|
||||
* the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
|
||||
*
|
||||
@@ -91,7 +94,7 @@ import java.util.function.BiFunction;
|
||||
*/
|
||||
public class StreamWriteFunction<K, I, O>
|
||||
extends KeyedProcessFunction<K, I, O>
|
||||
implements CheckpointedFunction {
|
||||
implements CheckpointedFunction, CheckpointListener {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@@ -134,6 +137,11 @@ public class StreamWriteFunction<K, I, O>
|
||||
*/
|
||||
private transient String actionType;
|
||||
|
||||
/**
|
||||
* Total size tracer.
|
||||
*/
|
||||
private transient TotalSizeTracer tracer;
|
||||
|
||||
/**
|
||||
* Constructs a StreamingSinkFunction.
|
||||
*
|
||||
@@ -150,6 +158,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
this.actionType = CommitUtils.getCommitActionType(
|
||||
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
|
||||
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
|
||||
this.tracer = new TotalSizeTracer(this.config);
|
||||
initBuffer();
|
||||
initWriteFunction();
|
||||
}
|
||||
@@ -168,7 +177,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) throws Exception {
|
||||
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
|
||||
bufferRecord(value);
|
||||
}
|
||||
|
||||
@@ -180,6 +189,11 @@ public class StreamWriteFunction<K, I, O>
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyCheckpointComplete(long checkpointId) {
|
||||
this.writeClient.cleanHandles();
|
||||
}
|
||||
|
||||
/**
|
||||
* End input action for batch source.
|
||||
*/
|
||||
@@ -294,6 +308,44 @@ public class StreamWriteFunction<K, I, O>
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tool to trace the total buffer size. It computes the maximum buffer size,
|
||||
* if current buffer size is greater than the maximum buffer size, the data bucket
|
||||
* flush triggers.
|
||||
*/
|
||||
private static class TotalSizeTracer {
|
||||
private long bufferSize = 0L;
|
||||
private final double maxBufferSize;
|
||||
|
||||
TotalSizeTracer(Configuration conf) {
|
||||
long mergeReaderMem = 100; // constant 100MB
|
||||
long mergeMapMaxMem = conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
|
||||
this.maxBufferSize = (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - mergeReaderMem - mergeMapMaxMem) * 1024 * 1024;
|
||||
final String errMsg = String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)",
|
||||
FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
|
||||
ValidationUtils.checkState(this.maxBufferSize > 0, errMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Trace the given record size {@code recordSize}.
|
||||
*
|
||||
* @param recordSize The record size
|
||||
* @return true if the buffer size exceeds the maximum buffer size
|
||||
*/
|
||||
boolean trace(long recordSize) {
|
||||
this.bufferSize += recordSize;
|
||||
return this.bufferSize > this.maxBufferSize;
|
||||
}
|
||||
|
||||
void countDown(long size) {
|
||||
this.bufferSize -= size;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
this.bufferSize = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the bucket ID with the given value {@code value}.
|
||||
*/
|
||||
@@ -309,6 +361,9 @@ public class StreamWriteFunction<K, I, O>
|
||||
* <p>Flush the data bucket first if the bucket records size is greater than
|
||||
* the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
|
||||
*
|
||||
* <p>Flush the max size data bucket if the total buffer size exceeds the configured
|
||||
* threshold {@link FlinkOptions#WRITE_TASK_MAX_SIZE}.
|
||||
*
|
||||
* @param value HoodieRecord
|
||||
*/
|
||||
private void bufferRecord(I value) {
|
||||
@@ -316,10 +371,21 @@ public class StreamWriteFunction<K, I, O>
|
||||
|
||||
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
|
||||
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)));
|
||||
boolean needFlush = bucket.detector.detect(value);
|
||||
if (needFlush) {
|
||||
boolean flushBucket = bucket.detector.detect(value);
|
||||
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
|
||||
if (flushBucket) {
|
||||
flushBucket(bucket);
|
||||
this.tracer.countDown(bucket.detector.totalSize);
|
||||
bucket.reset();
|
||||
} else if (flushBuffer) {
|
||||
// find the max size bucket and flush it out
|
||||
List<DataBucket> sortedBuckets = this.buckets.values().stream()
|
||||
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
|
||||
.collect(Collectors.toList());
|
||||
final DataBucket bucketToFlush = sortedBuckets.get(0);
|
||||
flushBucket(bucketToFlush);
|
||||
this.tracer.countDown(bucketToFlush.detector.totalSize);
|
||||
bucketToFlush.reset();
|
||||
}
|
||||
bucket.records.add((HoodieRecord<?>) value);
|
||||
}
|
||||
@@ -384,7 +450,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
.build();
|
||||
this.eventGateway.sendEventToCoordinator(event);
|
||||
this.buckets.clear();
|
||||
this.writeClient.cleanHandles();
|
||||
this.tracer.reset();
|
||||
this.currentInstant = "";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,8 +44,6 @@ import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Function that transforms RowData to HoodieRecord.
|
||||
@@ -82,12 +80,6 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
|
||||
*/
|
||||
private final Configuration config;
|
||||
|
||||
/**
|
||||
* Rate limit per second for this task.
|
||||
* The task sleep a little while when the consuming rate exceeds the threshold.
|
||||
*/
|
||||
private transient RateLimiter rateLimiter;
|
||||
|
||||
public RowDataToHoodieFunction(RowType rowType, Configuration config) {
|
||||
this.rowType = rowType;
|
||||
this.config = config;
|
||||
@@ -100,30 +92,12 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
|
||||
this.converter = RowDataToAvroConverters.createConverter(this.rowType);
|
||||
this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
|
||||
this.payloadCreation = PayloadCreation.instance(config);
|
||||
long totalLimit = this.config.getLong(FlinkOptions.WRITE_RATE_LIMIT);
|
||||
if (totalLimit > 0) {
|
||||
this.rateLimiter = new RateLimiter(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public O map(I i) throws Exception {
|
||||
if (rateLimiter != null) {
|
||||
final O hoodieRecord;
|
||||
if (rateLimiter.sampling()) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
hoodieRecord = (O) toHoodieRecord(i);
|
||||
long endTime = System.currentTimeMillis();
|
||||
rateLimiter.processTime(endTime - startTime);
|
||||
} else {
|
||||
hoodieRecord = (O) toHoodieRecord(i);
|
||||
}
|
||||
rateLimiter.sleepIfNeeded();
|
||||
return hoodieRecord;
|
||||
} else {
|
||||
return (O) toHoodieRecord(i);
|
||||
}
|
||||
return (O) toHoodieRecord(i);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -191,43 +165,4 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Inner Class
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Tool to decide whether the limit the processing rate.
|
||||
* Sampling the record to compute the process time with 0.01 percentage.
|
||||
*/
|
||||
private static class RateLimiter {
|
||||
private final Random random = new Random(47);
|
||||
private static final int DENOMINATOR = 100;
|
||||
|
||||
private final long maxProcessTime;
|
||||
|
||||
private long processTime = -1L;
|
||||
private long timeToSleep = -1;
|
||||
|
||||
RateLimiter(long rate) {
|
||||
ValidationUtils.checkArgument(rate > 0, "rate should be positive");
|
||||
this.maxProcessTime = 1000 / rate;
|
||||
}
|
||||
|
||||
void processTime(long processTime) {
|
||||
this.processTime = processTime;
|
||||
this.timeToSleep = maxProcessTime - processTime;
|
||||
}
|
||||
|
||||
boolean sampling() {
|
||||
// 0.01 sampling percentage
|
||||
return processTime == -1 || random.nextInt(DENOMINATOR) == 1;
|
||||
}
|
||||
|
||||
void sleepIfNeeded() throws Exception {
|
||||
if (timeToSleep > 0) {
|
||||
TimeUnit.MILLISECONDS.sleep(timeToSleep);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user