[HUDI-2143] Tweak the default compaction target IO to 500GB when flink async compaction is off (#3238)
This commit is contained in:
@@ -152,8 +152,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
}
|
}
|
||||||
// hoodie key about options
|
// hoodie key about options
|
||||||
setupHoodieKeyOptions(conf, table);
|
setupHoodieKeyOptions(conf, table);
|
||||||
// cleaning options
|
// compaction options
|
||||||
setupCleaningOptions(conf);
|
setupCompactionOptions(conf);
|
||||||
// infer avro schema from physical DDL schema
|
// infer avro schema from physical DDL schema
|
||||||
inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
|
inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
|
||||||
}
|
}
|
||||||
@@ -192,9 +192,9 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up the cleaning options from the table definition.
|
* Sets up the compaction options from the table definition.
|
||||||
*/
|
*/
|
||||||
private static void setupCleaningOptions(Configuration conf) {
|
private static void setupCompactionOptions(Configuration conf) {
|
||||||
int commitsToRetain = conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS);
|
int commitsToRetain = conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS);
|
||||||
int minCommitsToKeep = conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS);
|
int minCommitsToKeep = conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS);
|
||||||
if (commitsToRetain >= minCommitsToKeep) {
|
if (commitsToRetain >= minCommitsToKeep) {
|
||||||
@@ -205,6 +205,12 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
|
|||||||
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10);
|
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10);
|
||||||
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20);
|
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20);
|
||||||
}
|
}
|
||||||
|
if (conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED)
|
||||||
|
&& !conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)
|
||||||
|
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.COMPACTION_TARGET_IO)) {
|
||||||
|
// if compaction schedule is on, tweak the target io to 500GB
|
||||||
|
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user