1
0

Making ExternalSpillableMap generic for any datatype

- Introduced concept of converters to be able to serde generic datatype for SpillableMap
	- Fixed/Added configs to Hoodie Configs
	- Changed HoodieMergeHandle to start using SpillableMap
This commit is contained in:
Nishith Agarwal
2018-03-15 00:20:16 -07:00
committed by vinoth chandar
parent fa787ab5ab
commit 987f5d6b96
24 changed files with 984 additions and 281 deletions

View File

@@ -99,11 +99,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
// used to merge records written to log file
public static final String DEFAULT_PAYLOAD_CLASS = HoodieAvroPayload.class.getName();
public static final String PAYLOAD_CLASS = "hoodie.compaction.payload.class";
public static final String MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP = "hoodie.compaction.spill.threshold";
// Default memory size per compaction, excess spills to disk
public static final String DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES = String.valueOf(1024*1024*1024L); //1GB
public static final String PAYLOAD_CLASS_PROP = "hoodie.compaction.payload.class";
// used to choose a trade off between IO vs Memory when performing compaction process
// Depending on outputfile_size and memory provided, choose true to avoid OOM for large file size + small memory
@@ -212,7 +208,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
}
public Builder withPayloadClass(String payloadClassName) {
props.setProperty(PAYLOAD_CLASS, payloadClassName);
props.setProperty(PAYLOAD_CLASS_PROP, payloadClassName);
return this;
}
@@ -222,12 +218,6 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
return this;
}
public Builder withMaxMemorySizePerCompactionInBytes(long maxMemorySizePerCompactionInBytes) {
props.setProperty(MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP,
String.valueOf(maxMemorySizePerCompactionInBytes));
return this;
}
public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) {
props.setProperty(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,
String.valueOf(maxNumDeltaCommitsBeforeCompaction));
@@ -277,12 +267,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
CLEANER_PARALLELISM, DEFAULT_CLEANER_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(COMPACTION_STRATEGY_PROP),
COMPACTION_STRATEGY_PROP, DEFAULT_COMPACTION_STRATEGY);
setDefaultOnCondition(props, !props.containsKey(PAYLOAD_CLASS),
PAYLOAD_CLASS, DEFAULT_PAYLOAD_CLASS);
setDefaultOnCondition(props, !props.containsKey(PAYLOAD_CLASS_PROP),
PAYLOAD_CLASS_PROP, DEFAULT_PAYLOAD_CLASS);
setDefaultOnCondition(props, !props.containsKey(TARGET_IO_PER_COMPACTION_IN_MB_PROP),
TARGET_IO_PER_COMPACTION_IN_MB_PROP, DEFAULT_TARGET_IO_PER_COMPACTION_IN_MB);
setDefaultOnCondition(props, !props.containsKey(MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP),
MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP, DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES);
setDefaultOnCondition(props, !props.containsKey(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP),
COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED);
setDefaultOnCondition(props, !props.containsKey(COMPACTION_REVERSE_LOG_READ_ENABLED_PROP),

View File

@@ -0,0 +1,147 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.config;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import javax.annotation.concurrent.Immutable;
import org.apache.spark.SparkEnv;
import org.apache.spark.util.Utils;
/**
* Memory related config
*/
@Immutable
public class HoodieMemoryConfig extends DefaultHoodieConfig {
// This fraction is multiplied with the spark.memory.fraction to get a final fraction of heap space to use during merge
// This makes it easier to scale this value as one increases the spark.executor.memory
public static final String MAX_MEMORY_FRACTION_FOR_MERGE_PROP = "hoodie.memory.merge.fraction";
// Default max memory fraction during hash-merge, excess spills to disk
public static final String DEFAULT_MAX_MEMORY_FRACTION_FOR_MERGE = String.valueOf(0.6);
public static final String MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP = "hoodie.memory.compaction.fraction";
// Default max memory fraction during compaction, excess spills to disk
public static final String DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION = String.valueOf(0.6);
// Default memory size per compaction (used if SparkEnv is absent), excess spills to disk
public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES =
1024 * 1024 * 1024L; // 1GB
// Property to set the max memory for merge
public static final String MAX_MEMORY_FOR_MERGE_PROP = "hoodie.memory.merge.max.size";
// Property to set the max memory for compaction
public static final String MAX_MEMORY_FOR_COMPACTION_PROP = "hoodie.memory.compaction.max.size";
private HoodieMemoryConfig(Properties props) {
super(props);
}
public static HoodieMemoryConfig.Builder newBuilder() {
return new Builder();
}
public static class Builder {
private final Properties props = new Properties();
public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
this.props.load(reader);
return this;
} finally {
reader.close();
}
}
public Builder fromProperties(Properties props) {
this.props.putAll(props);
return this;
}
public Builder withMaxMemoryFractionPerPartitionMerge(long maxMemoryFractionPerPartitionMerge) {
props.setProperty(MAX_MEMORY_FRACTION_FOR_MERGE_PROP,
String.valueOf(maxMemoryFractionPerPartitionMerge));
return this;
}
public Builder withMaxMemoryFractionPerCompaction(long maxMemoryFractionPerCompaction) {
props.setProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP,
String.valueOf(maxMemoryFractionPerCompaction));
return this;
}
/**
* Dynamic calculation of max memory to use for for spillable map. user.available.memory =
* spark.executor.memory * (1 - spark.memory.fraction) spillable.available.memory =
* user.available.memory * hoodie.memory.fraction. Anytime the spark.executor.memory or the
* spark.memory.fraction is changed, the memory used for spillable map changes accordingly
*/
private long getMaxMemoryAllowedForMerge(String maxMemoryFraction) {
final String SPARK_EXECUTOR_MEMORY_PROP = "spark.executor.memory";
final String SPARK_EXECUTOR_MEMORY_FRACTION_PROP = "spark.memory.fraction";
// This is hard-coded in spark code {@link https://github.com/apache/spark/blob/576c43fb4226e4efa12189b41c3bc862019862c6/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala#L231}
// so have to re-define this here
final String DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION = "0.6";
// This is hard-coded in spark code {@link https://github.com/apache/spark/blob/576c43fb4226e4efa12189b41c3bc862019862c6/core/src/main/scala/org/apache/spark/SparkContext.scala#L471}
// so have to re-define this here
final String DEFAULT_SPARK_EXECUTOR_MEMORY_MB = "1024"; // in MB
if (SparkEnv.get() != null) {
// 1 GB is the default conf used by Spark, look at SparkContext.scala
long executorMemoryInBytes = Long.valueOf(
Utils.memoryStringToMb(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_PROP,
DEFAULT_SPARK_EXECUTOR_MEMORY_MB)) * 1024
* 1024L);
// 0.6 is the default value used by Spark,
// look at {@link https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkConf.scala#L507}
double memoryFraction = Double
.valueOf(SparkEnv.get().conf().get(SPARK_EXECUTOR_MEMORY_FRACTION_PROP,
DEFAULT_SPARK_EXECUTOR_MEMORY_FRACTION));
double maxMemoryFractionForMerge = Double.valueOf(maxMemoryFraction);
double userAvailableMemory = executorMemoryInBytes * (1 - memoryFraction);
long maxMemoryForMerge = (long) Math
.floor(userAvailableMemory * maxMemoryFractionForMerge);
return maxMemoryForMerge;
} else {
return DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
}
}
public HoodieMemoryConfig build() {
HoodieMemoryConfig config = new HoodieMemoryConfig(props);
setDefaultOnCondition(props,
!props.containsKey(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP),
MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP,
DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION);
setDefaultOnCondition(props,
!props.containsKey(MAX_MEMORY_FRACTION_FOR_MERGE_PROP),
MAX_MEMORY_FRACTION_FOR_MERGE_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_MERGE);
setDefaultOnCondition(props,
!props.containsKey(MAX_MEMORY_FOR_MERGE_PROP),
MAX_MEMORY_FOR_MERGE_PROP, String.valueOf(
getMaxMemoryAllowedForMerge(props.getProperty(MAX_MEMORY_FRACTION_FOR_MERGE_PROP))));
setDefaultOnCondition(props,
!props.containsKey(MAX_MEMORY_FOR_COMPACTION_PROP),
MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf(
getMaxMemoryAllowedForMerge(props.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP))));
return config;
}
}
}

View File

@@ -48,7 +48,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4*1024*1024);
private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4 * 1024 * 1024);
private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
@@ -108,7 +108,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
public int getWriteBufferLimitBytes() {
return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
return Integer
.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
}
public boolean shouldCombineBeforeInsert() {
@@ -217,17 +218,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
.parseLong(props.getProperty(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB_PROP));
}
public Long getMaxMemorySizePerCompactionInBytes() {
return Long
.parseLong(props.getProperty(HoodieCompactionConfig.MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP));
}
public Boolean getCompactionLazyBlockReadEnabled() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP));
return Boolean
.valueOf(props.getProperty(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP));
}
public Boolean getCompactionReverseLogReadEnabled() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP));
return Boolean.valueOf(
props.getProperty(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP));
}
public String getPayloadClass() {
return props.getProperty(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
}
/**
@@ -283,7 +285,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
public StorageLevel getBloomIndexInputStorageLevel() {
return StorageLevel.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));
return StorageLevel
.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));
}
/**
@@ -302,7 +305,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
public int getLogFileDataBlockMaxSize() {
return Integer.parseInt(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES));
return Integer
.parseInt(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES));
}
public int getLogFileMaxSize() {
@@ -313,7 +317,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Double.valueOf(props.getProperty(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO));
}
/**
* metrics properties
**/
@@ -342,6 +345,28 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return new Builder();
}
/**
* memory configs
*/
public Double getMaxMemoryFractionPerPartitionMerge() {
return Double.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE_PROP));
}
public Double getMaxMemoryFractionPerCompaction() {
return Double
.valueOf(
props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP));
}
public Long getMaxMemoryPerPartitionMerge() {
return Long.valueOf(props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP));
}
public Long getMaxMemoryPerCompaction() {
return Long
.valueOf(
props.getProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP));
}
public static class Builder {
@@ -351,6 +376,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isCompactionConfigSet = false;
private boolean isMetricsConfigSet = false;
private boolean isAutoCommit = true;
private boolean isMemoryConfigSet = false;
public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
@@ -501,9 +527,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE);
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE);
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
@@ -516,6 +544,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HoodieCompactionConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMetricsConfigSet,
HoodieMetricsConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMemoryConfigSet,
HoodieMemoryConfig.newBuilder().fromProperties(props).build());
return config;
}
}

View File

@@ -19,7 +19,6 @@ package com.uber.hoodie.io;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -65,10 +64,12 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
}
public Path makeTempPath(String partitionPath, int taskPartitionId, String fileName, int stageId, long taskAttemptId) {
public Path makeTempPath(String partitionPath, int taskPartitionId, String fileName, int stageId,
long taskAttemptId) {
Path path = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
return new Path(path.toString(),
FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId, taskAttemptId));
FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId,
taskAttemptId));
}
/**

View File

@@ -25,17 +25,15 @@ import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
import com.uber.hoodie.common.util.collection.converter.StringConverter;
import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.io.storage.HoodieStorageWriter;
import com.uber.hoodie.io.storage.HoodieStorageWriterFactory;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
@@ -43,6 +41,11 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
@SuppressWarnings("Duplicates")
public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHandle<T> {
@@ -145,9 +148,14 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
* @return
*/
private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
// Load the new records in a map
// TODO (NA) instantiate a ExternalSpillableMap
this.keyToNewRecords = new HashMap<>();
try {
// Load the new records in a map
logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(),
Optional.empty(), new StringConverter(), new HoodieRecordConverter(schema, config.getPayloadClass()));
} catch(IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
String partitionPath = null;
while (newRecordsItr.hasNext()) {
HoodieRecord<T> record = newRecordsItr.next();
@@ -156,6 +164,15 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
// update the new location of the record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
}
logger.debug("Number of entries in MemoryBasedMap => " +
((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
+ "Total size in bytes of MemoryBasedMap => " +
((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize()
+ "Number of entries in DiskBasedMap => " +
((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries()
+ "Size of file spilled to disk => " +
((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
return partitionPath;
}

View File

@@ -98,11 +98,12 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
.getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
.filterCompletedInstants().lastInstant().get().getTimestamp();
log.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs,
metaClient.getBasePath(), operation.getDeltaFilePaths(), readerSchema, maxInstantTime,
config.getMaxMemorySizePerCompactionInBytes(), config.getCompactionLazyBlockReadEnabled(),
config.getMaxMemoryPerCompaction(), config.getCompactionLazyBlockReadEnabled(),
config.getCompactionReverseLogReadEnabled());
if (!scanner.iterator().hasNext()) {
return Lists.<WriteStatus>newArrayList();