diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 1be906036..d8b0dc0ce 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; @@ -481,6 +482,12 @@ public class FlinkOptions extends HoodieConfig { // Compaction Options // ------------------------------------------------------------------------ + public static final ConfigOption COMPACTION_MEMORY_FRACTION_PROP = ConfigOptions + .key(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP) + .doubleType() + .defaultValue(0.1) + .withDescription("Compaction memory fraction of Task Manager managed memory size, default 0.1."); + public static final ConfigOption COMPACTION_SCHEDULE_ENABLED = ConfigOptions .key("compaction.schedule.enabled") .booleanType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 4f3faadb9..41d317998 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -109,6 +109,10 @@ public class FlinkCompactionConfig extends Configuration { description = "Min compaction interval of async compaction service, default 10 minutes") public Integer minCompactionIntervalSeconds = 600; + @Parameter(names = {"--compaction-memory-fraction-prop"}, + description = "Compaction memory fraction of Task Manager managed memory size, default 0.1") + public double compactionMemoryFractionProp = 0.1; + /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * The latter is more suitable for the table APIs. It reads all the properties @@ -129,6 +133,7 @@ public class FlinkCompactionConfig extends Configuration { conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo); conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); + conf.setDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP, config.compactionMemoryFractionProp); // use synchronous compaction always conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 1d7111f49..0892fc5c4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -250,6 +250,10 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB") public Long compactionTargetIo = 512000L; + @Parameter(names = {"--compaction-memory-fraction-prop"}, + description = "Compaction memory fraction of Task Manager managed memory size, default 0.1") + public double compactionMemoryFractionProp = 0.1; + @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default") public Boolean cleanAsyncEnabled = true; @@ -384,6 +388,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds); conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo); + conf.setDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP, config.compactionMemoryFractionProp); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled); conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 3efd1d561..d00eb3e3e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -75,7 +75,6 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +92,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes; import static org.apache.hudi.table.format.FormatUtils.getParquetConf; /** @@ -159,7 +157,7 @@ public class HoodieTableSource implements this.filters = filters == null ? Collections.emptyList() : filters; this.hadoopConf = StreamerUtil.getHadoopConf(); this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf); - this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf)); + this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index fa404cc21..666dc3a73 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.avro.Schema; @@ -43,7 +42,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hudi.util.StreamerUtil; import java.util.ArrayList; import java.util.Arrays; @@ -190,9 +189,10 @@ public class FormatUtils { public BoundedMemoryRecords( MergeOnReadInputSplit split, Schema logSchema, - Configuration hadoopConf) { + Configuration hadoopConf, + org.apache.flink.configuration.Configuration flinkConf) { this.executor = new BoundedInMemoryExecutor<>( - HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(new JobConf(hadoopConf)), + StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), getParallelProducers(), Option.empty(), Function.identity(), diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 4404e15ea..4ea202a70 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -403,7 +403,7 @@ public class MergeOnReadInputFormat final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema); final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf); + final FormatUtils.BoundedMemoryRecords records = new FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf, conf); final Iterator> recordsIterator = records.getRecordsIterator(); return new ClosableIterator() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 45d23f2ff..8861b4388 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,6 +18,8 @@ package org.apache.hudi.util; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; @@ -509,4 +511,19 @@ public class StreamerUtil { public static boolean haveSuccessfulCommits(HoodieTableMetaClient metaClient) { return !metaClient.getCommitsTimeline().filterCompletedInstants().empty(); } + + /** + * Returns the max compaction memory in bytes with given conf. + */ + public static long getMaxCompactionMemoryInBytes(Configuration conf) { + if (conf.contains(FlinkOptions.COMPACTION_MAX_MEMORY)) { + return conf.get(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024; + } + return (long)Math + .ceil(conf.getDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP) + * TaskExecutorProcessUtils.processSpecFromConfig( + TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( + conf, TaskManagerOptions.TOTAL_PROCESS_MEMORY)) + .getManagedMemorySize().getBytes()); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index a76e00816..b3b76d0d7 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -83,6 +83,7 @@ public class TestHoodieTableFactory { this.conf = new Configuration(); this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath()); this.conf.setString(FlinkOptions.TABLE_NAME, "t1"); + this.conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); StreamerUtil.initTableIfNotExists(this.conf); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index 8ee18a960..c41040a56 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -64,6 +64,7 @@ public class TestHoodieTableSource { void beforeEach() throws Exception { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path); + conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); TestData.writeData(TestData.DATA_SET_INSERT, conf); } @@ -122,6 +123,7 @@ public class TestHoodieTableSource { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path); conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true); + conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); HoodieTableSource tableSource = new HoodieTableSource( TestConfigurations.TABLE_SCHEMA, diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 6fbbab81f..1e6029a2d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -73,6 +73,7 @@ public class TestInputFormat { conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction + conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024); options.forEach((key, value) -> conf.setString(key, value)); StreamerUtil.initTableIfNotExists(conf);