1
0

[HUDI-881] Replace part of spark context by hadoop configuration in AbstractHoodieClient and HoodieReadClient (#1620)

This commit is contained in:
Shen Hong
2020-05-13 00:33:29 +08:00
committed by GitHub
parent b54517aad0
commit e8ffc6f0aa
4 changed files with 11 additions and 7 deletions

View File

@@ -62,9 +62,9 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineServer) { Option<EmbeddedTimelineService> timelineServer) {
this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration());
this.jsc = jsc;
this.hadoopConf = jsc.hadoopConfiguration(); this.hadoopConf = jsc.hadoopConfiguration();
this.fs = FSUtils.getFs(clientConfig.getBasePath(), hadoopConf);
this.jsc = jsc;
this.basePath = clientConfig.getBasePath(); this.basePath = clientConfig.getBasePath();
this.config = clientConfig; this.config = clientConfig;
this.timelineServer = timelineServer; this.timelineServer = timelineServer;
@@ -99,7 +99,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
if (!timelineServer.isPresent()) { if (!timelineServer.isPresent()) {
// Run Embedded Timeline Server // Run Embedded Timeline Server
LOG.info("Starting Timeline service !!"); LOG.info("Starting Timeline service !!");
timelineServer = Option.of(new EmbeddedTimelineService(jsc.hadoopConfiguration(), jsc.getConf(), timelineServer = Option.of(new EmbeddedTimelineService(hadoopConf, jsc.getConf(),
config.getClientSpecifiedViewStorageConfig())); config.getClientSpecifiedViewStorageConfig()));
try { try {
timelineServer.get().startServer(); timelineServer.get().startServer();
@@ -122,6 +122,6 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
} }
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) { protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return ClientUtils.createMetaClient(jsc.hadoopConfiguration(), config, loadActiveTimelineOnLoad); return ClientUtils.createMetaClient(hadoopConf, config, loadActiveTimelineOnLoad);
} }
} }

View File

@@ -98,7 +98,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
this.hadoopConf = jsc.hadoopConfiguration(); this.hadoopConf = jsc.hadoopConfiguration();
final String basePath = clientConfig.getBasePath(); final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
this.hoodieTable = HoodieTable.create(metaClient, clientConfig, hadoopConf); this.hoodieTable = HoodieTable.create(metaClient, clientConfig, hadoopConf);
this.index = HoodieIndex.createIndex(clientConfig); this.index = HoodieIndex.createIndex(clientConfig);
this.sqlContextOpt = Option.empty(); this.sqlContextOpt = Option.empty();
@@ -197,7 +197,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
*/ */
public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() { public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
HoodieTableMetaClient metaClient = HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(jsc.hadoopConfiguration(), hoodieTable.getMetaClient().getBasePath(), true); new HoodieTableMetaClient(hadoopConf, hoodieTable.getMetaClient().getBasePath(), true);
return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream() return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream()
.map( .map(
instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue())) instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue()))

View File

@@ -19,6 +19,8 @@
package org.apache.hudi.table.action; package org.apache.hudi.table.action;
import java.io.Serializable; import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@@ -26,6 +28,7 @@ import org.apache.spark.api.java.JavaSparkContext;
public abstract class BaseActionExecutor<R> implements Serializable { public abstract class BaseActionExecutor<R> implements Serializable {
protected final transient JavaSparkContext jsc; protected final transient JavaSparkContext jsc;
protected final transient Configuration hadoopConf;
protected final HoodieWriteConfig config; protected final HoodieWriteConfig config;
@@ -35,6 +38,7 @@ public abstract class BaseActionExecutor<R> implements Serializable {
public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) { public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
this.jsc = jsc; this.jsc = jsc;
this.hadoopConf = jsc.hadoopConfiguration();
this.config = config; this.config = config;
this.table = table; this.table = table;
this.instantTime = instantTime; this.instantTime = instantTime;

View File

@@ -182,7 +182,7 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
String actionType = table.getMetaClient().getCommitActionType(); String actionType = table.getMetaClient().getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType); LOG.info("Committing " + instantTime + ", action Type " + actionType);
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, jsc.hadoopConfiguration()); HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(); HoodieCommitMetadata metadata = new HoodieCommitMetadata();