1
0

[MINOR][HUDI-3460]Fix HoodieDataSourceITCase

close #4959
This commit is contained in:
Bo
2022-03-06 10:55:04 +08:00
committed by yuzhao.cyz
parent 34bc752853
commit b6bdb46f7f
7 changed files with 1 additions and 32 deletions

View File

@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -482,12 +481,6 @@ public class FlinkOptions extends HoodieConfig {
// Compaction Options // Compaction Options
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
public static final ConfigOption<Double> COMPACTION_MEMORY_FRACTION_PROP = ConfigOptions
.key(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP)
.doubleType()
.defaultValue(0.1)
.withDescription("Compaction memory fraction of Task Manager managed memory size, default 0.1.");
public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED = ConfigOptions public static final ConfigOption<Boolean> COMPACTION_SCHEDULE_ENABLED = ConfigOptions
.key("compaction.schedule.enabled") .key("compaction.schedule.enabled")
.booleanType() .booleanType()

View File

@@ -109,10 +109,6 @@ public class FlinkCompactionConfig extends Configuration {
description = "Min compaction interval of async compaction service, default 10 minutes") description = "Min compaction interval of async compaction service, default 10 minutes")
public Integer minCompactionIntervalSeconds = 600; public Integer minCompactionIntervalSeconds = 600;
@Parameter(names = {"--compaction-memory-fraction-prop"},
description = "Compaction memory fraction of Task Manager managed memory size, default 0.1")
public double compactionMemoryFractionProp = 0.1;
/** /**
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties * The latter is more suitable for the table APIs. It reads all the properties
@@ -133,7 +129,6 @@ public class FlinkCompactionConfig extends Configuration {
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo); conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks); conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
conf.setDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP, config.compactionMemoryFractionProp);
// use synchronous compaction always // use synchronous compaction always
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule); conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, config.schedule);

View File

@@ -250,10 +250,6 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB") @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB")
public Long compactionTargetIo = 512000L; public Long compactionTargetIo = 512000L;
@Parameter(names = {"--compaction-memory-fraction-prop"},
description = "Compaction memory fraction of Task Manager managed memory size, default 0.1")
public double compactionMemoryFractionProp = 0.1;
@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default") @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
public Boolean cleanAsyncEnabled = true; public Boolean cleanAsyncEnabled = true;
@@ -388,7 +384,6 @@ public class FlinkStreamerConfig extends Configuration {
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds); conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo); conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
conf.setDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP, config.compactionMemoryFractionProp);
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnabled);
conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits); conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits);

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.util; package org.apache.hudi.util;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
@@ -516,14 +514,6 @@ public class StreamerUtil {
* Returns the max compaction memory in bytes with given conf. * Returns the max compaction memory in bytes with given conf.
*/ */
public static long getMaxCompactionMemoryInBytes(Configuration conf) { public static long getMaxCompactionMemoryInBytes(Configuration conf) {
if (conf.contains(FlinkOptions.COMPACTION_MAX_MEMORY)) { return conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
return conf.get(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024;
}
return (long)Math
.ceil(conf.getDouble(FlinkOptions.COMPACTION_MEMORY_FRACTION_PROP)
* TaskExecutorProcessUtils.processSpecFromConfig(
TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
conf, TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getManagedMemorySize().getBytes());
} }
} }

View File

@@ -83,7 +83,6 @@ public class TestHoodieTableFactory {
this.conf = new Configuration(); this.conf = new Configuration();
this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath()); this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath());
this.conf.setString(FlinkOptions.TABLE_NAME, "t1"); this.conf.setString(FlinkOptions.TABLE_NAME, "t1");
this.conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
StreamerUtil.initTableIfNotExists(this.conf); StreamerUtil.initTableIfNotExists(this.conf);
} }

View File

@@ -64,7 +64,6 @@ public class TestHoodieTableSource {
void beforeEach() throws Exception { void beforeEach() throws Exception {
final String path = tempFile.getAbsolutePath(); final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path); conf = TestConfigurations.getDefaultConf(path);
conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
TestData.writeData(TestData.DATA_SET_INSERT, conf); TestData.writeData(TestData.DATA_SET_INSERT, conf);
} }
@@ -123,7 +122,6 @@ public class TestHoodieTableSource {
final String path = tempFile.getAbsolutePath(); final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path); conf = TestConfigurations.getDefaultConf(path);
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true); conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
HoodieTableSource tableSource = new HoodieTableSource( HoodieTableSource tableSource = new HoodieTableSource(
TestConfigurations.TABLE_SCHEMA, TestConfigurations.TABLE_SCHEMA,

View File

@@ -73,7 +73,6 @@ public class TestInputFormat {
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_TYPE, tableType.name()); conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction
conf.set(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
options.forEach((key, value) -> conf.setString(key, value)); options.forEach((key, value) -> conf.setString(key, value));
StreamerUtil.initTableIfNotExists(conf); StreamerUtil.initTableIfNotExists(conf);