[HUDI-283] : Ensure a sane minimum for merge buffer memory (#964)
- Some environments e.g spark-shell provide 0 for memory size - This causes unnecessary performance degradation
This commit is contained in:
@@ -40,8 +40,10 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
|
|||||||
public static final String MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP = "hoodie.memory.compaction.fraction";
|
public static final String MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP = "hoodie.memory.compaction.fraction";
|
||||||
// Default max memory fraction during compaction, excess spills to disk
|
// Default max memory fraction during compaction, excess spills to disk
|
||||||
public static final String DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION = String.valueOf(0.6);
|
public static final String DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION = String.valueOf(0.6);
|
||||||
// Default memory size per compaction (used if SparkEnv is absent), excess spills to disk
|
// Default memory size (1GB) per compaction (used if SparkEnv is absent), excess spills to disk
|
||||||
public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L; // 1GB
|
public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L;
|
||||||
|
// Minimum memory size (100MB) for the spillable map.
|
||||||
|
public static final long DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 100 * 1024 * 1024L;
|
||||||
// Property to set the max memory for merge
|
// Property to set the max memory for merge
|
||||||
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size";
|
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size";
|
||||||
// Property to set the max memory for compaction
|
// Property to set the max memory for compaction
|
||||||
@@ -91,6 +93,12 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withMaxMemoryMaxSize(long mergeMaxSize, long compactionMaxSize) {
|
||||||
|
props.setProperty(MAX_MEMORY_FOR_MERGE_PROP, String.valueOf(mergeMaxSize));
|
||||||
|
props.setProperty(MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf(compactionMaxSize));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withMaxMemoryFractionPerCompaction(double maxMemoryFractionPerCompaction) {
|
public Builder withMaxMemoryFractionPerCompaction(double maxMemoryFractionPerCompaction) {
|
||||||
props.setProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, String.valueOf(maxMemoryFractionPerCompaction));
|
props.setProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, String.valueOf(maxMemoryFractionPerCompaction));
|
||||||
return this;
|
return this;
|
||||||
@@ -136,7 +144,7 @@ public class HoodieMemoryConfig extends DefaultHoodieConfig {
|
|||||||
double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction);
|
double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction);
|
||||||
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
|
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
|
||||||
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
|
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
|
||||||
return maxMemoryForMerge;
|
return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge);
|
||||||
} else {
|
} else {
|
||||||
return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
|
return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -193,9 +193,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
|
|||||||
private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
|
private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
|
||||||
try {
|
try {
|
||||||
// Load the new records in a map
|
// Load the new records in a map
|
||||||
logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
|
long memoryForMerge = config.getMaxMemoryPerPartitionMerge();
|
||||||
this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(),
|
logger.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
|
||||||
config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
|
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge,
|
||||||
|
config.getSpillableMapBasePath(), new DefaultSizeEstimator(),
|
||||||
|
new HoodieRecordSizeEstimator(originalSchema));
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
|
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user