[HUDI-3460] Add reader merge memory option for flink (#4911)
* flink TM memory Optimization
This commit is contained in:
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
|
|||||||
import org.apache.hudi.config.HoodieIndexConfig;
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||||
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
|
||||||
@@ -481,6 +482,12 @@ public class FlinkOptions extends HoodieConfig {
|
|||||||
// Compaction Options
|
// Compaction Options
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
|
|
||||||
|
public static final ConfigOption<Double> COMPACTION_MEMORY_FRACTION_PROP = ConfigOptions
|
||||||
|
.key(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP)
|
||||||
|
.doubleType()
|
||||||
|
.defaultValue(0.1)
|
||||||
|
.withDescription("Compaction memory fraction of Task Manager managed memory size, default 0.1.");
|
||||||
|
|
||||||
public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED = ConfigOptions
|
public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED = ConfigOptions
|
||||||
.key("compaction.schedule.enabled")
|
.key("compaction.schedule.enabled")
|
||||||
.booleanType()
|
.booleanType()
|
||||||
|
|||||||
@@ -109,6 +109,10 @@ public class FlinkCompactionConfig extends Configuration {
|
|||||||
description = "Min compaction interval of async compaction service, default 10 minutes")
|
description = "Min compaction interval of async compaction service, default 10 minutes")
|
||||||
public Integer minCompactionIntervalSeconds = 600;
|
public Integer minCompactionIntervalSeconds = 600;
|
||||||
|
|
||||||
|
@Parameter(names = {"--compaction-memory-fraction-prop"},
|
||||||
|
description = "Compaction memory fraction of Task Manager managed memory size, default 0.1")
|
||||||
|
public double compactionMemoryFractionProp = 0.1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
|
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
|
||||||
* The latter is more suitable for the table APIs. It reads all the properties
|
* The latter is more suitable for the table APIs. It reads all the properties
|
||||||
@@ -129,6 +133,7 @@ public class FlinkCompactionConfig extends Configuration {
|
|||||||
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
|
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
|
||||||
conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);
|
conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);
|
||||||
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
|
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
|
||||||
|
conf.setDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP, config.compactionMemoryFractionProp);
|
||||||
// use synchronous compaction always
|
// use synchronous compaction always
|
||||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||||
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule);
|
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule);
|
||||||
|
|||||||
@@ -250,6 +250,10 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB")
|
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB")
|
||||||
public Long compactionTargetIo = 512000L;
|
public Long compactionTargetIo = 512000L;
|
||||||
|
|
||||||
|
@Parameter(names = {"--compaction-memory-fraction-prop"},
|
||||||
|
description = "Compaction memory fraction of Task Manager managed memory size, default 0.1")
|
||||||
|
public double compactionMemoryFractionProp = 0.1;
|
||||||
|
|
||||||
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
|
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
|
||||||
public Boolean cleanAsyncEnabled = true;
|
public Boolean cleanAsyncEnabled = true;
|
||||||
|
|
||||||
@@ -384,6 +388,7 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
|
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
|
||||||
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
|
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
|
||||||
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
|
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
|
||||||
|
conf.setDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP, config.compactionMemoryFractionProp);
|
||||||
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
|
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
|
||||||
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
|
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
|
||||||
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
|
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);
|
||||||
|
|||||||
@@ -75,7 +75,6 @@ import org.apache.flink.table.types.DataType;
|
|||||||
import org.apache.flink.table.types.logical.RowType;
|
import org.apache.flink.table.types.logical.RowType;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -93,7 +92,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;
|
|
||||||
import static org.apache.hudi.table.format.FormatUtils.getParquetConf;
|
import static org.apache.hudi.table.format.FormatUtils.getParquetConf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -159,7 +157,7 @@ public class HoodieTableSource implements
|
|||||||
this.filters = filters == null ? Collections.emptyList() : filters;
|
this.filters = filters == null ? Collections.emptyList() : filters;
|
||||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||||
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
|
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
|
||||||
this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf));
|
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -32,7 +32,6 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
|
||||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
|
|
||||||
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
|
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
@@ -43,7 +42,7 @@ import org.apache.flink.table.data.RowData;
|
|||||||
import org.apache.flink.types.RowKind;
|
import org.apache.flink.types.RowKind;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -190,9 +189,10 @@ public class FormatUtils {
|
|||||||
public BoundedMemoryRecords(
|
public BoundedMemoryRecords(
|
||||||
MergeOnReadInputSplit split,
|
MergeOnReadInputSplit split,
|
||||||
Schema logSchema,
|
Schema logSchema,
|
||||||
Configuration hadoopConf) {
|
Configuration hadoopConf,
|
||||||
|
org.apache.flink.configuration.Configuration flinkConf) {
|
||||||
this.executor = new BoundedInMemoryExecutor<>(
|
this.executor = new BoundedInMemoryExecutor<>(
|
||||||
HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)),
|
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf),
|
||||||
getParallelProducers(),
|
getParallelProducers(),
|
||||||
Option.empty(),
|
Option.empty(),
|
||||||
Function.identity(),
|
Function.identity(),
|
||||||
|
|||||||
@@ -403,7 +403,7 @@ public class MergeOnReadInputFormat
|
|||||||
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
|
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
|
||||||
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
|
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
|
||||||
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
|
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
|
||||||
final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf);
|
final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf, conf);
|
||||||
final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator();
|
final Iterator<HoodieRecord<?>> recordsIterator = records.getRecordsIterator();
|
||||||
|
|
||||||
return new ClosableIterator<RowData>() {
|
return new ClosableIterator<RowData>() {
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.util;
|
package org.apache.hudi.util;
|
||||||
|
|
||||||
|
import org.apache.flink.configuration.TaskManagerOptions;
|
||||||
|
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
|
||||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||||
@@ -509,4 +511,19 @@ public class StreamerUtil {
|
|||||||
public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
|
public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) {
|
||||||
return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
|
return !metaClient.getCommitsTimeline().filterCompletedInstants().empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the max compaction memory in bytes with given conf.
|
||||||
|
*/
|
||||||
|
public static long getMaxCompactionMemoryInBytes(Configuration conf) {
|
||||||
|
if (conf.contains(FlinkOptions.COMPACTION_MAX_MEMORY)) {
|
||||||
|
return conf.get(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
|
||||||
|
}
|
||||||
|
return (long)Math
|
||||||
|
.ceil(conf.getDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP)
|
||||||
|
* TaskExecutorProcessUtils.processSpecFromConfig(
|
||||||
|
TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
|
||||||
|
conf, TaskManagerOptions.TOTAL_PROCESS_MEMORY))
|
||||||
|
.getManagedMemorySize().getBytes());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,6 +83,7 @@ public class TestHoodieTableFactory {
|
|||||||
this.conf = new Configuration();
|
this.conf = new Configuration();
|
||||||
this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath());
|
this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath());
|
||||||
this.conf.setString(FlinkOptions.TABLE_NAME, "t1");
|
this.conf.setString(FlinkOptions.TABLE_NAME, "t1");
|
||||||
|
this.conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
|
||||||
StreamerUtil.initTableIfNotExists(this.conf);
|
StreamerUtil.initTableIfNotExists(this.conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -64,6 +64,7 @@ public class TestHoodieTableSource {
|
|||||||
void beforeEach() throws Exception {
|
void beforeEach() throws Exception {
|
||||||
final String path = tempFile.getAbsolutePath();
|
final String path = tempFile.getAbsolutePath();
|
||||||
conf = TestConfigurations.getDefaultConf(path);
|
conf = TestConfigurations.getDefaultConf(path);
|
||||||
|
conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
|
||||||
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,6 +123,7 @@ public class TestHoodieTableSource {
|
|||||||
final String path = tempFile.getAbsolutePath();
|
final String path = tempFile.getAbsolutePath();
|
||||||
conf = TestConfigurations.getDefaultConf(path);
|
conf = TestConfigurations.getDefaultConf(path);
|
||||||
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
|
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
|
||||||
|
conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
|
||||||
|
|
||||||
HoodieTableSource tableSource = new HoodieTableSource(
|
HoodieTableSource tableSource = new HoodieTableSource(
|
||||||
TestConfigurations.TABLE_SCHEMA,
|
TestConfigurations.TABLE_SCHEMA,
|
||||||
|
|||||||
@@ -73,6 +73,7 @@ public class TestInputFormat {
|
|||||||
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||||
conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
|
conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
|
||||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction
|
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction
|
||||||
|
conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
|
||||||
options.forEach((key, value) -> conf.setString(key, value));
|
options.forEach((key, value) -> conf.setString(key, value));
|
||||||
|
|
||||||
StreamerUtil.initTableIfNotExists(conf);
|
StreamerUtil.initTableIfNotExists(conf);
|
||||||
|
|||||||
Reference in New Issue
Block a user