1
0

adding new config to separate shuffle and write parallelism

This commit is contained in:
Nishith Agarwal
2017-07-26 11:49:22 -07:00
committed by vinoth chandar
parent b1cf097b0c
commit e484e91807
2 changed files with 13 additions and 2 deletions

View File

@@ -233,7 +233,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String return String
.format("%s+%s", record.getPartitionPath(), record.getRecordKey()); .format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, config.getInsertShuffleParallelism()); }, true, config.getBulkInsertShuffleParallelism());
JavaRDD<WriteStatus> writeStatusRDD = sortedRecords JavaRDD<WriteStatus> writeStatusRDD = sortedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true) .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
.flatMap(writeStatuses -> writeStatuses.iterator()); .flatMap(writeStatuses -> writeStatuses.iterator());

View File

@@ -42,6 +42,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String TABLE_NAME = "hoodie.table.name"; public static final String TABLE_NAME = "hoodie.table.name";
private static final String DEFAULT_PARALLELISM = "200"; private static final String DEFAULT_PARALLELISM = "200";
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert"; private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false"; private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
@@ -54,7 +55,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning"; private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private HoodieWriteConfig(Properties props) { private HoodieWriteConfig(Properties props) {
super(props); super(props);
} }
@@ -82,6 +82,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP)); return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP));
} }
public int getBulkInsertShuffleParallelism() {
return Integer.parseInt(props.getProperty(BULKINSERT_PARALLELISM));
}
public int getInsertShuffleParallelism() { public int getInsertShuffleParallelism() {
return Integer.parseInt(props.getProperty(INSERT_PARALLELISM)); return Integer.parseInt(props.getProperty(INSERT_PARALLELISM));
} }
@@ -303,6 +307,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder withBulkInsertParallelism(int bulkInsertParallelism) {
props.setProperty(BULKINSERT_PARALLELISM, String.valueOf(bulkInsertParallelism));
return this;
}
public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) { public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism)); props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism));
props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism)); props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism));
@@ -360,6 +369,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
Preconditions.checkArgument(config.getBasePath() != null); Preconditions.checkArgument(config.getBasePath() != null);
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
DEFAULT_PARALLELISM); DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM,
DEFAULT_PARALLELISM); DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP),