From f1b7afad2143cba6a87e03207e0e45c7cfc8ef10 Mon Sep 17 00:00:00 2001 From: prazanna Date: Tue, 21 Mar 2017 17:36:46 -0700 Subject: [PATCH] Add config for index parallelism and make clean public (#109) * Add config for index parallelism and make clean public * Review comments on clean api modification --- .../com/uber/hoodie/HoodieWriteClient.java | 21 +++++++++++++++++-- .../hoodie/config/HoodieCompactionConfig.java | 10 +++++++++ .../uber/hoodie/config/HoodieIndexConfig.java | 10 +++++++++ .../uber/hoodie/config/HoodieWriteConfig.java | 8 +++++++ .../uber/hoodie/index/HoodieBloomIndex.java | 5 ++++- 5 files changed, 51 insertions(+), 3 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index c1359f5c7..c96217522 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -367,8 +367,13 @@ public class HoodieWriteClient implements Seriali // Save was a success // We cannot have unbounded commit files. Archive commits if we have to archive archiveLog.archiveIfRequired(); - // Call clean to cleanup if there is anything to cleanup after the commit, - clean(commitTime); + if(config.isAutoClean()) { + // Call clean to cleanup if there is anything to cleanup after the commit, + logger.info("Auto cleaning is enabled. Running cleaner now"); + clean(commitTime); + } else { + logger.info("Auto cleaning is not enabled. Not running cleaner now"); + } if (writeContext != null) { long durationInMs = metrics.getDurationInMs(writeContext.stop()); metrics.updateCommitMetrics( @@ -698,6 +703,18 @@ public class HoodieWriteClient implements Seriali /** * Clean up any stale/old files/data lying around (either on file storage or index storage) + * based on the configurations and CleaningPolicy used. (typically files that no longer can be used + * by a running query can be cleaned) + */ + public void clean() throws HoodieIOException { + String startCleanTime = startCommit(); + clean(startCleanTime); + } + + /** + * Clean up any stale/old files/data lying around (either on file storage or index storage) + * based on the configurations and CleaningPolicy used. (typically files that no longer can be used + * by a running query can be cleaned) */ private void clean(String startCleanTime) throws HoodieIOException { try { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index a431adf4a..c1a257b46 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -34,6 +34,9 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(); + public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic"; + private static final String DEFAULT_AUTO_CLEAN = "true"; + public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions.retained"; private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3"; @@ -94,6 +97,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { } } + public Builder withAutoClean(Boolean autoClean) { + props.setProperty(AUTO_CLEAN_PROP, String.valueOf(Boolean.TRUE)); + return this; + } + public Builder withCleanerPolicy(HoodieCleaningPolicy policy) { props.setProperty(CLEANER_POLICY_PROP, policy.name()); return this; @@ -143,6 +151,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); + setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), + AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN); setDefaultOnCondition(props, !props.containsKey(CLEANER_POLICY_PROP), CLEANER_POLICY_PROP, DEFAULT_CLEANER_POLICY); setDefaultOnCondition(props, !props.containsKey(CLEANER_FILE_VERSIONS_RETAINED_PROP), diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index 40f4f4521..a392ad8a7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -39,6 +39,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport"; public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table"; + public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism"; + // Disable explicit bloom index parallelism setting by default - hoodie auto computes + public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0"; private HoodieIndexConfig(Properties props) { super(props); @@ -91,6 +94,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder bloomIndexParallelism(int parallelism) { + props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism)); + return this; + } + public HoodieIndexConfig build() { HoodieIndexConfig config = new HoodieIndexConfig(props); setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), @@ -99,6 +107,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { BLOOM_FILTER_NUM_ENTRIES, DEFAULT_BLOOM_FILTER_NUM_ENTRIES); setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_FPP), BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP); + setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_PARALLELISM_PROP), + BLOOM_INDEX_PARALLELISM_PROP, DEFAULT_BLOOM_INDEX_PARALLELISM); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); return config; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index f6058ab5f..c1b82d957 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -144,6 +144,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieCompactionConfig.CLEANER_PARALLELISM)); } + public boolean isAutoClean() { + return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP)); + } + /** * index properties **/ @@ -171,6 +175,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP); } + public int getBloomIndexParallelism() { + return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP)); + } + /** * storage properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java index 248979b4a..e7153adb6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java @@ -255,8 +255,11 @@ public class HoodieBloomIndex extends HoodieIndex for (long subparts : subpartitionCountMap.values()) { totalSubparts += (int) subparts; } - int joinParallelism = Math.max(totalSubparts, inputParallelism); + // If bloom index parallelism is set, use it to to check against the input parallelism and take the max + int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); + int joinParallelism = Math.max(totalSubparts, indexParallelism); logger.info("InputParallelism: ${" + inputParallelism + "}, " + + "IndexParallelism: ${" + config.getBloomIndexParallelism() + "}, " + "TotalSubParts: ${" + totalSubparts + "}, " + "Join Parallelism set to : " + joinParallelism); return joinParallelism;