Add config for index parallelism and make clean public (#109)
* Add config for index parallelism and make clean public * Review comments on clean api modification
This commit is contained in:
@@ -367,8 +367,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
// Save was a success
|
||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||
archiveLog.archiveIfRequired();
|
||||
// Call clean to cleanup if there is anything to cleanup after the commit,
|
||||
clean(commitTime);
|
||||
if(config.isAutoClean()) {
|
||||
// Call clean to cleanup if there is anything to cleanup after the commit,
|
||||
logger.info("Auto cleaning is enabled. Running cleaner now");
|
||||
clean(commitTime);
|
||||
} else {
|
||||
logger.info("Auto cleaning is not enabled. Not running cleaner now");
|
||||
}
|
||||
if (writeContext != null) {
|
||||
long durationInMs = metrics.getDurationInMs(writeContext.stop());
|
||||
metrics.updateCommitMetrics(
|
||||
@@ -698,6 +703,18 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
|
||||
/**
|
||||
* Clean up any stale/old files/data lying around (either on file storage or index storage)
|
||||
* based on the configurations and CleaningPolicy used. (typically files that no longer can be used
|
||||
* by a running query can be cleaned)
|
||||
*/
|
||||
public void clean() throws HoodieIOException {
|
||||
String startCleanTime = startCommit();
|
||||
clean(startCleanTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up any stale/old files/data lying around (either on file storage or index storage)
|
||||
* based on the configurations and CleaningPolicy used. (typically files that no longer can be used
|
||||
* by a running query can be cleaned)
|
||||
*/
|
||||
private void clean(String startCleanTime) throws HoodieIOException {
|
||||
try {
|
||||
|
||||
@@ -34,6 +34,9 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
private static final String DEFAULT_CLEANER_POLICY =
|
||||
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
|
||||
|
||||
public static final String AUTO_CLEAN_PROP = "hoodie.clean.automatic";
|
||||
private static final String DEFAULT_AUTO_CLEAN = "true";
|
||||
|
||||
public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP =
|
||||
"hoodie.cleaner.fileversions.retained";
|
||||
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
|
||||
@@ -94,6 +97,11 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
}
|
||||
}
|
||||
|
||||
public Builder withAutoClean(Boolean autoClean) {
|
||||
props.setProperty(AUTO_CLEAN_PROP, String.valueOf(Boolean.TRUE));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
|
||||
props.setProperty(CLEANER_POLICY_PROP, policy.name());
|
||||
return this;
|
||||
@@ -143,6 +151,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
||||
|
||||
public HoodieCompactionConfig build() {
|
||||
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
|
||||
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP),
|
||||
AUTO_CLEAN_PROP, DEFAULT_AUTO_CLEAN);
|
||||
setDefaultOnCondition(props, !props.containsKey(CLEANER_POLICY_PROP),
|
||||
CLEANER_POLICY_PROP, DEFAULT_CLEANER_POLICY);
|
||||
setDefaultOnCondition(props, !props.containsKey(CLEANER_FILE_VERSIONS_RETAINED_PROP),
|
||||
|
||||
@@ -39,6 +39,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
||||
public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
|
||||
public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
|
||||
public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
|
||||
public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism";
|
||||
// Disable explicit bloom index parallelism setting by default - hoodie auto computes
|
||||
public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0";
|
||||
|
||||
private HoodieIndexConfig(Properties props) {
|
||||
super(props);
|
||||
@@ -91,6 +94,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder bloomIndexParallelism(int parallelism) {
|
||||
props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieIndexConfig build() {
|
||||
HoodieIndexConfig config = new HoodieIndexConfig(props);
|
||||
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP),
|
||||
@@ -99,6 +107,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
||||
BLOOM_FILTER_NUM_ENTRIES, DEFAULT_BLOOM_FILTER_NUM_ENTRIES);
|
||||
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_FPP),
|
||||
BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP);
|
||||
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_PARALLELISM_PROP),
|
||||
BLOOM_INDEX_PARALLELISM_PROP, DEFAULT_BLOOM_INDEX_PARALLELISM);
|
||||
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
|
||||
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
|
||||
return config;
|
||||
|
||||
@@ -144,6 +144,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return Integer.parseInt(props.getProperty(HoodieCompactionConfig.CLEANER_PARALLELISM));
|
||||
}
|
||||
|
||||
public boolean isAutoClean() {
|
||||
return Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
||||
}
|
||||
|
||||
/**
|
||||
* index properties
|
||||
**/
|
||||
@@ -171,6 +175,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP);
|
||||
}
|
||||
|
||||
public int getBloomIndexParallelism() {
|
||||
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
|
||||
}
|
||||
|
||||
/**
|
||||
* storage properties
|
||||
**/
|
||||
|
||||
@@ -255,8 +255,11 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
||||
for (long subparts : subpartitionCountMap.values()) {
|
||||
totalSubparts += (int) subparts;
|
||||
}
|
||||
int joinParallelism = Math.max(totalSubparts, inputParallelism);
|
||||
// If bloom index parallelism is set, use it to to check against the input parallelism and take the max
|
||||
int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
|
||||
int joinParallelism = Math.max(totalSubparts, indexParallelism);
|
||||
logger.info("InputParallelism: ${" + inputParallelism + "}, " +
|
||||
"IndexParallelism: ${" + config.getBloomIndexParallelism() + "}, " +
|
||||
"TotalSubParts: ${" + totalSubparts + "}, " +
|
||||
"Join Parallelism set to : " + joinParallelism);
|
||||
return joinParallelism;
|
||||
|
||||
Reference in New Issue
Block a user