1
0

Down the reader mem check

This commit is contained in:
v-zhangjc9
2022-06-30 19:13:51 +08:00
parent 215a794fd3
commit 6be03ca56a

View File

@@ -325,10 +325,10 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
private final double maxBufferSize; private final double maxBufferSize;
TotalSizeTracer(Configuration conf) { TotalSizeTracer(Configuration conf) {
long mergeReaderMem = 100; // constant 100MB long mergeReaderMem = 10; // constant 100MB
long mergeMapMaxMem = conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY); long mergeMapMaxMem = conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
this.maxBufferSize = (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - mergeReaderMem - mergeMapMaxMem) * 1024 * 1024; this.maxBufferSize = (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - mergeReaderMem - mergeMapMaxMem) * 1024 * 1024;
final String errMsg = String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)", final String errMsg = String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 50MB now)",
FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key()); FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
ValidationUtils.checkState(this.maxBufferSize > 0, errMsg); ValidationUtils.checkState(this.maxBufferSize > 0, errMsg);
} }