[HUDI-437] Support user-defined index (#1408)
* [hotfix] set default value for index class config * class config takes precedence over `hoodie.index.type`
This commit is contained in:
@@ -37,6 +37,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
public static final String INDEX_TYPE_PROP = "hoodie.index.type";
|
public static final String INDEX_TYPE_PROP = "hoodie.index.type";
|
||||||
public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name();
|
public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name();
|
||||||
|
|
||||||
|
public static final String INDEX_CLASS_PROP = "hoodie.index.class";
|
||||||
|
public static final String DEFAULT_INDEX_CLASS = "";
|
||||||
|
|
||||||
// ***** Bloom Index configs *****
|
// ***** Bloom Index configs *****
|
||||||
public static final String BLOOM_FILTER_NUM_ENTRIES = "hoodie.index.bloom.num_entries";
|
public static final String BLOOM_FILTER_NUM_ENTRIES = "hoodie.index.bloom.num_entries";
|
||||||
public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = "60000";
|
public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = "60000";
|
||||||
@@ -117,6 +120,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withIndexClass(String indexClass) {
|
||||||
|
props.setProperty(INDEX_CLASS_PROP, indexClass);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig hBaseIndexConfig) {
|
public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig hBaseIndexConfig) {
|
||||||
props.putAll(hBaseIndexConfig.getProps());
|
props.putAll(hBaseIndexConfig.getProps());
|
||||||
return this;
|
return this;
|
||||||
@@ -195,6 +203,7 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
|||||||
public HoodieIndexConfig build() {
|
public HoodieIndexConfig build() {
|
||||||
HoodieIndexConfig config = new HoodieIndexConfig(props);
|
HoodieIndexConfig config = new HoodieIndexConfig(props);
|
||||||
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
|
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(INDEX_CLASS_PROP), INDEX_CLASS_PROP, DEFAULT_INDEX_CLASS);
|
||||||
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_NUM_ENTRIES), BLOOM_FILTER_NUM_ENTRIES,
|
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_NUM_ENTRIES), BLOOM_FILTER_NUM_ENTRIES,
|
||||||
DEFAULT_BLOOM_FILTER_NUM_ENTRIES);
|
DEFAULT_BLOOM_FILTER_NUM_ENTRIES);
|
||||||
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_FPP), BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP);
|
setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_FPP), BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP);
|
||||||
|
|||||||
@@ -309,6 +309,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return HoodieIndex.IndexType.valueOf(props.getProperty(HoodieIndexConfig.INDEX_TYPE_PROP));
|
return HoodieIndex.IndexType.valueOf(props.getProperty(HoodieIndexConfig.INDEX_TYPE_PROP));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getIndexClass() {
|
||||||
|
return props.getProperty(HoodieIndexConfig.INDEX_CLASS_PROP);
|
||||||
|
}
|
||||||
|
|
||||||
public int getBloomFilterNumEntries() {
|
public int getBloomFilterNumEntries() {
|
||||||
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES));
|
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,8 @@ import org.apache.hudi.common.model.HoodieKey;
|
|||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieIndexException;
|
import org.apache.hudi.exception.HoodieIndexException;
|
||||||
@@ -51,6 +53,14 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
|
|||||||
|
|
||||||
public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config,
|
public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config,
|
||||||
JavaSparkContext jsc) throws HoodieIndexException {
|
JavaSparkContext jsc) throws HoodieIndexException {
|
||||||
|
// first use index class config to create index.
|
||||||
|
if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
|
||||||
|
Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config);
|
||||||
|
if (!(instance instanceof HoodieIndex)) {
|
||||||
|
throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
|
||||||
|
}
|
||||||
|
return (HoodieIndex) instance;
|
||||||
|
}
|
||||||
switch (config.getIndexType()) {
|
switch (config.getIndexType()) {
|
||||||
case HBASE:
|
case HBASE:
|
||||||
return new HBaseIndex<>(config);
|
return new HBaseIndex<>(config);
|
||||||
|
|||||||
@@ -18,20 +18,33 @@
|
|||||||
|
|
||||||
package org.apache.hudi.index;
|
package org.apache.hudi.index;
|
||||||
|
|
||||||
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.common.HoodieClientTestHarness;
|
import org.apache.hudi.common.HoodieClientTestHarness;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieIndexConfig;
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.exception.HoodieIndexException;
|
||||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||||
import org.apache.hudi.index.bloom.HoodieBloomIndex;
|
import org.apache.hudi.index.bloom.HoodieBloomIndex;
|
||||||
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
|
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
|
||||||
import org.apache.hudi.index.hbase.HBaseIndex;
|
import org.apache.hudi.index.hbase.HBaseIndex;
|
||||||
|
|
||||||
|
import org.apache.hudi.table.HoodieTable;
|
||||||
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class TestHoodieIndex extends HoodieClientTestHarness {
|
public class TestHoodieIndex extends HoodieClientTestHarness {
|
||||||
|
|
||||||
@@ -67,5 +80,76 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
|
|||||||
config = clientConfigBuilder.withPath(basePath)
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
|
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
|
||||||
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieGlobalBloomIndex);
|
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieGlobalBloomIndex);
|
||||||
|
|
||||||
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
|
.withIndexConfig(indexConfigBuilder.withIndexClass(DummyHoodieIndex.class.getName()).build()).build();
|
||||||
|
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof DummyHoodieIndex);
|
||||||
|
|
||||||
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
|
.withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithConstructor.class.getName()).build()).build();
|
||||||
|
try {
|
||||||
|
HoodieIndex.createIndex(config, jsc);
|
||||||
|
fail("exception is expected");
|
||||||
|
} catch (HoodieIndexException e) {
|
||||||
|
assertTrue(e.getMessage().contains("is not a subclass of HoodieIndex"));
|
||||||
|
}
|
||||||
|
|
||||||
|
config = clientConfigBuilder.withPath(basePath)
|
||||||
|
.withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithoutConstructor.class.getName()).build()).build();
|
||||||
|
try {
|
||||||
|
HoodieIndex.createIndex(config, jsc);
|
||||||
|
fail("exception is expected");
|
||||||
|
} catch (HoodieException e) {
|
||||||
|
assertTrue(e.getMessage().contains("Unable to instantiate class"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class DummyHoodieIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||||
|
public DummyHoodieIndex(HoodieWriteConfig config) {
|
||||||
|
super(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys, JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) throws HoodieIndexException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) throws HoodieIndexException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean rollbackCommit(String commitTime) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isGlobal() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canIndexLogFiles() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isImplicitWithStorage() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class IndexWithConstructor {
|
||||||
|
public IndexWithConstructor(HoodieWriteConfig config) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class IndexWithoutConstructor {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user