1
0

Add more options in HoodieWriteConfig

This commit is contained in:
Jian Xu
2018-03-06 11:25:26 -08:00
committed by vinoth chandar
parent 7f079632a6
commit ba7c258c61
5 changed files with 20 additions and 7 deletions

View File

@@ -44,6 +44,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES = "true"; public static final String DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES = "true";
public static final String BLOOM_INDEX_USE_CACHING_PROP = "hoodie.bloom.index.use.caching"; public static final String BLOOM_INDEX_USE_CACHING_PROP = "hoodie.bloom.index.use.caching";
public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true"; public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true";
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level";
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
// ***** HBase Index Configs ***** // ***** HBase Index Configs *****
public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
@@ -143,6 +145,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder withBloomIndexInputStorageLevel(String level) {
props.setProperty(BLOOM_INDEX_INPUT_STORAGE_LEVEL, level);
return this;
}
public HoodieIndexConfig build() { public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props); HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP),
@@ -161,6 +168,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL),
BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config; return config;

View File

@@ -68,7 +68,7 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder limitFileSize(int maxFileSize) { public Builder limitFileSize(long maxFileSize) {
props.setProperty(PARQUET_FILE_MAX_BYTES, String.valueOf(maxFileSize)); props.setProperty(PARQUET_FILE_MAX_BYTES, String.valueOf(maxFileSize));
return this; return this;
} }

View File

@@ -275,11 +275,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BUCKETED_INDEX_NUM_BUCKETS_PROP)); return Integer.parseInt(props.getProperty(HoodieIndexConfig.BUCKETED_INDEX_NUM_BUCKETS_PROP));
} }
public StorageLevel getBloomIndexInputStorageLevel() {
return StorageLevel.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));
}
/** /**
* storage properties * storage properties
**/ **/
public int getParquetMaxFileSize() { public long getParquetMaxFileSize() {
return Integer.parseInt(props.getProperty(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES)); return Long.parseLong(props.getProperty(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES));
} }
public int getParquetBlockSize() { public int getParquetBlockSize() {

View File

@@ -75,7 +75,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// Step 0: cache the input record RDD // Step 0: cache the input record RDD
if (config.getBloomIndexUseCaching()) { if (config.getBloomIndexUseCaching()) {
recordRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); recordRDD.persist(config.getBloomIndexInputStorageLevel());
} }
// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)

View File

@@ -26,11 +26,11 @@ public class HoodieParquetConfig {
private CompressionCodecName compressionCodecName; private CompressionCodecName compressionCodecName;
private int blockSize; private int blockSize;
private int pageSize; private int pageSize;
private int maxFileSize; private long maxFileSize;
private Configuration hadoopConf; private Configuration hadoopConf;
public HoodieParquetConfig(HoodieAvroWriteSupport writeSupport, public HoodieParquetConfig(HoodieAvroWriteSupport writeSupport,
CompressionCodecName compressionCodecName, int blockSize, int pageSize, int maxFileSize, CompressionCodecName compressionCodecName, int blockSize, int pageSize, long maxFileSize,
Configuration hadoopConf) { Configuration hadoopConf) {
this.writeSupport = writeSupport; this.writeSupport = writeSupport;
this.compressionCodecName = compressionCodecName; this.compressionCodecName = compressionCodecName;
@@ -56,7 +56,7 @@ public class HoodieParquetConfig {
return pageSize; return pageSize;
} }
public int getMaxFileSize() { public long getMaxFileSize() {
return maxFileSize; return maxFileSize;
} }