From e484e91807caeda5afd905b649c97902ec0cdde5 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Wed, 26 Jul 2017 11:49:22 -0700 Subject: [PATCH] adding new config to separate shuffle and write parallelism --- .../java/com/uber/hoodie/HoodieWriteClient.java | 2 +- .../com/uber/hoodie/config/HoodieWriteConfig.java | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 85cba85ac..c24227d39 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -233,7 +233,7 @@ public class HoodieWriteClient implements Seriali // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions return String .format("%s+%s", record.getPartitionPath(), record.getRecordKey()); - }, true, config.getInsertShuffleParallelism()); + }, true, config.getBulkInsertShuffleParallelism()); JavaRDD writeStatusRDD = sortedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, table), true) .flatMap(writeStatuses -> writeStatuses.iterator()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 08e6a89ab..26704dcaa 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -42,6 +42,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String TABLE_NAME = "hoodie.table.name"; private static final String DEFAULT_PARALLELISM = "200"; private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; + private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert"; private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false"; @@ -54,7 +55,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning"; private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; - private HoodieWriteConfig(Properties props) { super(props); } @@ -82,6 +82,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP)); } + public int getBulkInsertShuffleParallelism() { + return Integer.parseInt(props.getProperty(BULKINSERT_PARALLELISM)); + } + public int getInsertShuffleParallelism() { return Integer.parseInt(props.getProperty(INSERT_PARALLELISM)); } @@ -303,6 +307,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withBulkInsertParallelism(int bulkInsertParallelism) { + props.setProperty(BULKINSERT_PARALLELISM, String.valueOf(bulkInsertParallelism)); + return this; + } + public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) { props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism)); props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism)); @@ -360,6 +369,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { Preconditions.checkArgument(config.getBasePath() != null); setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM, + DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP),