1
0

[HUDI-870] Remove spark context in ClientUtils and HoodieIndex (#1609)

This commit is contained in:
Shen Hong
2020-05-11 19:05:36 +08:00
committed by GitHub
parent 8d0e23173b
commit 6dac10115c
9 changed files with 21 additions and 22 deletions

View File

@@ -119,6 +119,6 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
}
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return ClientUtils.createMetaClient(jsc, config, loadActiveTimelineOnLoad);
return ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, loadActiveTimelineOnLoad);
}
}

View File

@@ -97,7 +97,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
this.hoodieTable = HoodieTable.create(metaClient, clientConfig, jsc);
this.index = HoodieIndex.createIndex(clientConfig, jsc);
this.index = HoodieIndex.createIndex(clientConfig);
this.sqlContextOpt = Option.empty();
}

View File

@@ -98,7 +98,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param rollbackPending whether need to cleanup pending commits
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending) {
this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig, jsc));
this(jsc, clientConfig, rollbackPending, HoodieIndex.createIndex(clientConfig));
}
HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index) {

View File

@@ -18,25 +18,24 @@
package org.apache.hudi.client.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaSparkContext;
public class ClientUtils {
/**
* Create Consistency Aware MetaClient.
*
* @param jsc JavaSparkContext
* @param hadoopConf Configuration
* @param config HoodieWriteConfig
* @param loadActiveTimelineOnLoad early loading of timeline
*/
public static HoodieTableMetaClient createMetaClient(JavaSparkContext jsc, HoodieWriteConfig config,
public static HoodieTableMetaClient createMetaClient(Configuration hadoopConf, HoodieWriteConfig config,
boolean loadActiveTimelineOnLoad) {
return new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), loadActiveTimelineOnLoad,
return new HoodieTableMetaClient(hadoopConf, config.getBasePath(), loadActiveTimelineOnLoad,
config.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())));
}

View File

@@ -51,8 +51,8 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
this.config = config;
}
public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(HoodieWriteConfig config,
JavaSparkContext jsc) throws HoodieIndexException {
public static <T extends HoodieRecordPayload> HoodieIndex<T> createIndex(
HoodieWriteConfig config) throws HoodieIndexException {
// first use index class config to create index.
if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config);

View File

@@ -95,7 +95,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
this.viewManager = FileSystemViewManager.createViewManager(new SerializableConfiguration(jsc.hadoopConfiguration()),
config.getViewStorageConfig());
this.metaClient = metaClient;
this.index = HoodieIndex.createIndex(config, jsc);
this.index = HoodieIndex.createIndex(config);
}
private synchronized FileSystemViewManager getViewManager() {

View File

@@ -89,7 +89,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
}
protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, jsc));
return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg));
}
protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
@@ -247,7 +247,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
return (commit, numRecords) -> {
final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc);
@@ -268,7 +268,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
return (numRecords) -> {
final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
List<HoodieKey> records = keyGenFunction.apply(numRecords);
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc);

View File

@@ -73,20 +73,20 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE)
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build())
.build();
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HBaseIndex);
assertTrue(HoodieIndex.createIndex(config) instanceof HBaseIndex);
config = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof InMemoryHashIndex);
assertTrue(HoodieIndex.createIndex(config) instanceof InMemoryHashIndex);
config = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieBloomIndex);
assertTrue(HoodieIndex.createIndex(config) instanceof HoodieBloomIndex);
config = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieGlobalBloomIndex);
assertTrue(HoodieIndex.createIndex(config) instanceof HoodieGlobalBloomIndex);
config = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexClass(DummyHoodieIndex.class.getName()).build()).build();
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof DummyHoodieIndex);
assertTrue(HoodieIndex.createIndex(config) instanceof DummyHoodieIndex);
}
@Test
@@ -94,14 +94,14 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
final HoodieWriteConfig config1 = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithConstructor.class.getName()).build()).build();
final Throwable thrown1 = assertThrows(HoodieException.class, () -> {
HoodieIndex.createIndex(config1, jsc);
HoodieIndex.createIndex(config1);
}, "exception is expected");
assertTrue(thrown1.getMessage().contains("is not a subclass of HoodieIndex"));
final HoodieWriteConfig config2 = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithoutConstructor.class.getName()).build()).build();
final Throwable thrown2 = assertThrows(HoodieException.class, () -> {
HoodieIndex.createIndex(config2, jsc);
HoodieIndex.createIndex(config2);
}, "exception is expected");
assertTrue(thrown2.getMessage().contains("Unable to instantiate class"));
}

View File

@@ -138,7 +138,7 @@ public class TestCleaner extends TestHoodieClientBase {
"The clean instant should be the same as the commit instant");
}
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
HoodieIndex index = HoodieIndex.createIndex(cfg);
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
}