[HUDI-2297] Estimate available memory size for spillable map accurately. (#3455)
This commit is contained in:
@@ -32,20 +32,24 @@ import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_
|
||||
|
||||
public class IOUtils {
|
||||
/**
|
||||
* Dynamic calculation of max memory to use for for spillable map. user.available.memory = executor.memory *
|
||||
* (1 - memory.fraction) spillable.available.memory = user.available.memory * hoodie.memory.fraction. Anytime
|
||||
* the engine memory fractions/total memory is changed, the memory used for spillable map changes
|
||||
* accordingly
|
||||
* Dynamic calculation of max memory to use for spillable map. There is always more than one task
|
||||
* running on a executor and the each task maintains a spillable map.
|
||||
* user.available.memory = executor.memory * (1 - memory.fraction)
|
||||
* spillable.available.memory = user.available.memory * hoodie.memory.fraction / executor.cores.
|
||||
* Anytime the engine memory fractions/total memory is changed, the memory used for spillable map
|
||||
* changes accordingly.
|
||||
*/
|
||||
public static long getMaxMemoryAllowedForMerge(TaskContextSupplier context, String maxMemoryFraction) {
|
||||
Option<String> totalMemoryOpt = context.getProperty(EngineProperty.TOTAL_MEMORY_AVAILABLE);
|
||||
Option<String> memoryFractionOpt = context.getProperty(EngineProperty.MEMORY_FRACTION_IN_USE);
|
||||
Option<String> totalCoresOpt = context.getProperty(EngineProperty.TOTAL_CORES_PER_EXECUTOR);
|
||||
|
||||
if (totalMemoryOpt.isPresent() && memoryFractionOpt.isPresent()) {
|
||||
if (totalMemoryOpt.isPresent() && memoryFractionOpt.isPresent() && totalCoresOpt.isPresent()) {
|
||||
long executorMemoryInBytes = Long.parseLong(totalMemoryOpt.get());
|
||||
double memoryFraction = Double.parseDouble(memoryFractionOpt.get());
|
||||
double maxMemoryFractionForMerge = Double.parseDouble(maxMemoryFraction);
|
||||
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
|
||||
long executorCores = Long.parseLong(totalCoresOpt.get());
|
||||
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction) / executorCores;
|
||||
long maxMemoryForMerge = (long) Math.floor(userAvailableMemory * maxMemoryFractionForMerge);
|
||||
return Math.max(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, maxMemoryForMerge);
|
||||
} else {
|
||||
|
||||
@@ -78,6 +78,14 @@ public class SparkTaskContextSupplier extends TaskContextSupplier implements Ser
|
||||
.get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP, DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION));
|
||||
}
|
||||
return Option.empty();
|
||||
} else if (prop == EngineProperty.TOTAL_CORES_PER_EXECUTOR) {
|
||||
final String DEFAULT_SPARK_EXECUTOR_CORES = "1";
|
||||
final String SPARK_EXECUTOR_EXECUTOR_CORES_PROP = "spark.executor.cores";
|
||||
if (SparkEnv.get() != null) {
|
||||
return Option.ofNullable(SparkEnv.get().conf()
|
||||
.get(SPARK_EXECUTOR_EXECUTOR_CORES_PROP, DEFAULT_SPARK_EXECUTOR_CORES));
|
||||
}
|
||||
return Option.empty();
|
||||
}
|
||||
throw new HoodieException("Unknown engine property :" + prop);
|
||||
}
|
||||
|
||||
@@ -26,8 +26,9 @@ public enum EngineProperty {
|
||||
EMBEDDED_SERVER_HOST,
|
||||
// Pool/queue to use to run compaction.
|
||||
COMPACTION_POOL_NAME,
|
||||
TOTAL_CORES_PER_EXECUTOR,
|
||||
// Amount of total memory available to each engine executor
|
||||
TOTAL_MEMORY_AVAILABLE,
|
||||
// Fraction of that memory, that is already in use by the engine
|
||||
MEMORY_FRACTION_IN_USE,
|
||||
MEMORY_FRACTION_IN_USE
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user