From 0a4902eccece1df959946fcb7379a94fc5fe0784 Mon Sep 17 00:00:00 2001 From: leesf <490081539@qq.com> Date: Wed, 18 Mar 2020 10:27:40 +0800 Subject: [PATCH] [HUDI-437] Support user-defined index (#1408) * [hotfix] set default value for index class config * class config takes precedence over `hoodie.index.type` --- .../apache/hudi/config/HoodieIndexConfig.java | 9 ++ .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../org/apache/hudi/index/HoodieIndex.java | 10 +++ .../apache/hudi/index/TestHoodieIndex.java | 84 +++++++++++++++++++ 4 files changed, 107 insertions(+) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index db834980a..00c7605b1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -37,6 +37,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String INDEX_TYPE_PROP = "hoodie.index.type"; public static final String DEFAULT_INDEX_TYPE = HoodieIndex.IndexType.BLOOM.name(); + public static final String INDEX_CLASS_PROP = "hoodie.index.class"; + public static final String DEFAULT_INDEX_CLASS = ""; + // ***** Bloom Index configs ***** public static final String BLOOM_FILTER_NUM_ENTRIES = "hoodie.index.bloom.num_entries"; public static final String DEFAULT_BLOOM_FILTER_NUM_ENTRIES = "60000"; @@ -117,6 +120,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder withIndexClass(String indexClass) { + props.setProperty(INDEX_CLASS_PROP, indexClass); + return this; + } + public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig hBaseIndexConfig) { props.putAll(hBaseIndexConfig.getProps()); return this; @@ -195,6 +203,7 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public HoodieIndexConfig build() { HoodieIndexConfig config = new HoodieIndexConfig(props); setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE); + setDefaultOnCondition(props, !props.containsKey(INDEX_CLASS_PROP), INDEX_CLASS_PROP, DEFAULT_INDEX_CLASS); setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_NUM_ENTRIES), BLOOM_FILTER_NUM_ENTRIES, DEFAULT_BLOOM_FILTER_NUM_ENTRIES); setDefaultOnCondition(props, !props.containsKey(BLOOM_FILTER_FPP), BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index f88d96a95..04c1dfdb5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -309,6 +309,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return HoodieIndex.IndexType.valueOf(props.getProperty(HoodieIndexConfig.INDEX_TYPE_PROP)); } + public String getIndexClass() { + return props.getProperty(HoodieIndexConfig.INDEX_CLASS_PROP); + } + public int getBloomFilterNumEntries() { return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES)); } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java index b18cf4595..8305a7b40 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java @@ -24,6 +24,8 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; @@ -51,6 +53,14 @@ public abstract class HoodieIndex implements Seri public static HoodieIndex createIndex(HoodieWriteConfig config, JavaSparkContext jsc) throws HoodieIndexException { + // first use index class config to create index. + if (!StringUtils.isNullOrEmpty(config.getIndexClass())) { + Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config); + if (!(instance instanceof HoodieIndex)) { + throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex"); + } + return (HoodieIndex) instance; + } switch (config.getIndexType()) { case HBASE: return new HBaseIndex<>(config); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index 91435f8fd..46239db38 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -18,20 +18,33 @@ package org.apache.hudi.index; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieClientTestHarness; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; import org.apache.hudi.index.hbase.HBaseIndex; +import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestHoodieIndex extends HoodieClientTestHarness { @@ -67,5 +80,76 @@ public class TestHoodieIndex extends HoodieClientTestHarness { config = clientConfigBuilder.withPath(basePath) .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build(); assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieGlobalBloomIndex); + + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexClass(DummyHoodieIndex.class.getName()).build()).build(); + assertTrue(HoodieIndex.createIndex(config, jsc) instanceof DummyHoodieIndex); + + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithConstructor.class.getName()).build()).build(); + try { + HoodieIndex.createIndex(config, jsc); + fail("exception is expected"); + } catch (HoodieIndexException e) { + assertTrue(e.getMessage().contains("is not a subclass of HoodieIndex")); + } + + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithoutConstructor.class.getName()).build()).build(); + try { + HoodieIndex.createIndex(config, jsc); + fail("exception is expected"); + } catch (HoodieException e) { + assertTrue(e.getMessage().contains("Unable to instantiate class")); + } + } + + public static class DummyHoodieIndex extends HoodieIndex { + public DummyHoodieIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public JavaPairRDD>> fetchRecordLocation(JavaRDD hoodieKeys, JavaSparkContext jsc, HoodieTable hoodieTable) { + return null; + } + + @Override + public JavaRDD> tagLocation(JavaRDD> recordRDD, JavaSparkContext jsc, HoodieTable hoodieTable) throws HoodieIndexException { + return null; + } + + @Override + public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, HoodieTable hoodieTable) throws HoodieIndexException { + return null; + } + + @Override + public boolean rollbackCommit(String commitTime) { + return false; + } + + @Override + public boolean isGlobal() { + return false; + } + + @Override + public boolean canIndexLogFiles() { + return false; + } + + @Override + public boolean isImplicitWithStorage() { + return false; + } + } + + public static class IndexWithConstructor { + public IndexWithConstructor(HoodieWriteConfig config) {} + } + + public static class IndexWithoutConstructor { + } }