diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 0bb4017c8..2b75b55b1 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -92,7 +92,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest { // archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); - archiveLog.archiveIfRequired(jsc); + archiveLog.archiveIfRequired(hadoopConf); } @AfterEach diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java index 47a0214c5..5e382f13c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.ClientUtils; import org.apache.hudi.common.fs.FSUtils; @@ -43,6 +44,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl protected final transient FileSystem fs; protected final transient JavaSparkContext jsc; + protected final transient Configuration hadoopConf; protected final HoodieWriteConfig config; protected final String basePath; @@ -62,6 +64,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl Option timelineServer) { this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration()); this.jsc = jsc; + this.hadoopConf = jsc.hadoopConfiguration(); this.basePath = clientConfig.getBasePath(); this.config = clientConfig; this.timelineServer = timelineServer; diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 8d730e25d..58e01a5c7 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -99,7 +99,7 @@ public abstract class AbstractHoodieWriteClient e LOG.info("Committing " + instantTime); // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); @@ -180,7 +180,7 @@ public abstract class AbstractHoodieWriteClient e // TODO : make sure we cannot rollback / archive last commit file try { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise // there may be race conditions HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType); @@ -231,7 +231,7 @@ public abstract class AbstractHoodieWriteClient e setWriteSchemaForDeletes(metaClient); } // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { writeContext = metrics.getCommitCtx(); } else { diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index 1eee57add..b92eaa84e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; @@ -68,6 +69,7 @@ public class HoodieReadClient implements Serializ private HoodieTable hoodieTable; private transient Option sqlContextOpt; private final transient JavaSparkContext jsc; + private final transient Configuration hadoopConf; /** * @param basePath path to Hoodie table @@ -93,10 +95,11 @@ public class HoodieReadClient implements Serializ */ public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) { this.jsc = jsc; + this.hadoopConf = jsc.hadoopConfiguration(); final String basePath = clientConfig.getBasePath(); // Create a Hoodie table which encapsulated the commits and files visible HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - this.hoodieTable = HoodieTable.create(metaClient, clientConfig, jsc); + this.hoodieTable = HoodieTable.create(metaClient, clientConfig, hadoopConf); this.index = HoodieIndex.createIndex(clientConfig); this.sqlContextOpt = Option.empty(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 984769e5f..c781c69b1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -139,7 +139,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD> filterExists(JavaRDD> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); Timer.Context indexTimer = metrics.getIndexCtx(); JavaRDD> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); @@ -336,7 +336,7 @@ public class HoodieWriteClient extends AbstractHo } // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true)); - archiveLog.archiveIfRequired(jsc); + archiveLog.archiveIfRequired(hadoopConf); if (config.isAutoClean()) { // Call clean to cleanup if there is anything to cleanup after the commit, LOG.info("Auto cleaning is enabled. Running cleaner now"); @@ -356,7 +356,7 @@ public class HoodieWriteClient extends AbstractHo * @param comment - Comment for the savepoint */ public void savepoint(String user, String comment) { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } @@ -380,7 +380,7 @@ public class HoodieWriteClient extends AbstractHo * @param comment - Comment for the savepoint */ public void savepoint(String instantTime, String user, String comment) { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); table.savepoint(jsc, instantTime, user, comment); } @@ -392,7 +392,7 @@ public class HoodieWriteClient extends AbstractHo * @return true if the savepoint was deleted successfully */ public void deleteSavepoint(String savepointTime) { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); SavepointHelpers.deleteSavepoint(table, savepointTime); } @@ -407,7 +407,7 @@ public class HoodieWriteClient extends AbstractHo * @return true if the savepoint was restored to successfully */ public void restoreToSavepoint(String savepointTime) { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); SavepointHelpers.validateSavepointPresence(table, savepointTime); restoreToInstant(savepointTime); SavepointHelpers.validateSavepointRestore(table, savepointTime); @@ -424,7 +424,7 @@ public class HoodieWriteClient extends AbstractHo final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime(); final Timer.Context context = this.metrics.getRollbackCtx(); try { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) .findFirst()); @@ -455,7 +455,7 @@ public class HoodieWriteClient extends AbstractHo final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime(); Timer.Context context = metrics.getRollbackCtx(); try { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); HoodieRestoreMetadata restoreMetadata = table.restore(jsc, restoreInstantTime, instantTime); if (context != null) { final long durationInMs = metrics.getDurationInMs(context.stop()); @@ -488,7 +488,7 @@ public class HoodieWriteClient extends AbstractHo public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException { LOG.info("Cleaner started"); final Timer.Context context = metrics.getCleanCtx(); - HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime); + HoodieCleanMetadata metadata = HoodieTable.create(config, hadoopConf).clean(jsc, cleanInstantTime); if (context != null) { long durationMs = metrics.getDurationInMs(context.stop()); metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); @@ -540,7 +540,7 @@ public class HoodieWriteClient extends AbstractHo HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime), "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" + latestPending + ", Ingesting at " + instantTime)); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitActionType = table.getMetaClient().getCommitActionType(); activeTimeline.createNewInstant(new HoodieInstant(State.REQUESTED, commitActionType, instantTime)); @@ -564,7 +564,7 @@ public class HoodieWriteClient extends AbstractHo */ public boolean scheduleCompactionAtInstant(String instantTime, Option> extraMetadata) throws HoodieIOException { LOG.info("Scheduling compaction at instant time :" + instantTime); - Option plan = HoodieTable.create(config, jsc) + Option plan = HoodieTable.create(config, hadoopConf) .scheduleCompaction(jsc, instantTime, extraMetadata); return plan.isPresent(); } @@ -588,7 +588,7 @@ public class HoodieWriteClient extends AbstractHo */ public void commitCompaction(String compactionInstantTime, JavaRDD writeStatuses, Option> extraMetadata) throws IOException { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata( table, compactionInstantTime, writeStatuses, config.getSchema()); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); @@ -634,7 +634,7 @@ public class HoodieWriteClient extends AbstractHo * Cleanup all pending commits. */ private void rollbackPendingCommits() { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); @@ -650,7 +650,7 @@ public class HoodieWriteClient extends AbstractHo * @return RDD of Write Status */ private JavaRDD compact(String compactionInstantTime, boolean shouldComplete) { - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 73c1b23a8..ed2918080 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -20,6 +20,7 @@ package org.apache.hudi.table; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -86,8 +87,8 @@ public class HoodieCopyOnWriteTable extends Hoodi private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class); - public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) { - super(config, jsc, metaClient); + public HoodieCopyOnWriteTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { + super(config, hadoopConf, metaClient); } @Override diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index cacc1efc5..65981c2db 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -70,8 +71,8 @@ public class HoodieMergeOnReadTable extends Hoodi private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class); - HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) { - super(config, jsc, metaClient); + HoodieMergeOnReadTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { + super(config, hadoopConf, metaClient); } @Override diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index ef156ad82..8acd35114 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -89,10 +89,10 @@ public abstract class HoodieTable implements Seri protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier(); - protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) { + protected HoodieTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) { this.config = config; - this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration()); - this.viewManager = FileSystemViewManager.createViewManager(new SerializableConfiguration(jsc.hadoopConfiguration()), + this.hadoopConfiguration = new SerializableConfiguration(hadoopConf); + this.viewManager = FileSystemViewManager.createViewManager(new SerializableConfiguration(hadoopConf), config.getViewStorageConfig()); this.metaClient = metaClient; this.index = HoodieIndex.createIndex(config); @@ -105,25 +105,25 @@ public abstract class HoodieTable implements Seri return viewManager; } - public static HoodieTable create(HoodieWriteConfig config, JavaSparkContext jsc) { + public static HoodieTable create(HoodieWriteConfig config, Configuration hadoopConf) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient( - jsc.hadoopConfiguration(), + hadoopConf, config.getBasePath(), true, config.getConsistencyGuardConfig(), Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())) ); - return HoodieTable.create(metaClient, config, jsc); + return HoodieTable.create(metaClient, config, hadoopConf); } public static HoodieTable create(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - JavaSparkContext jsc) { + Configuration hadoopConf) { switch (metaClient.getTableType()) { case COPY_ON_WRITE: - return new HoodieCopyOnWriteTable<>(config, jsc, metaClient); + return new HoodieCopyOnWriteTable<>(config, hadoopConf, metaClient); case MERGE_ON_READ: - return new HoodieMergeOnReadTable<>(config, jsc, metaClient); + return new HoodieMergeOnReadTable<>(config, hadoopConf, metaClient); default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index cfd71c0d7..c94f30b3d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -53,7 +54,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; import java.io.FileNotFoundException; import java.io.IOException; @@ -112,9 +112,9 @@ public class HoodieTimelineArchiveLog { /** * Check if commits need to be archived. If yes, archive commits. */ - public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException { + public boolean archiveIfRequired(final Configuration hadoopConf) throws IOException { try { - List instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList()); + List instantsToArchive = getInstantsToArchive(hadoopConf).collect(Collectors.toList()); boolean success = true; if (!instantsToArchive.isEmpty()) { @@ -133,13 +133,13 @@ public class HoodieTimelineArchiveLog { } } - private Stream getInstantsToArchive(JavaSparkContext jsc) { + private Stream getInstantsToArchive(Configuration hadoopConf) { // TODO : rename to max/minInstantsToKeep int maxCommitsToKeep = config.getMaxCommitsToKeep(); int minCommitsToKeep = config.getMinCommitsToKeep(); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // GroupBy each action and limit each action timeline to maxCommitsToKeep // TODO: Handle ROLLBACK_ACTION in future diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 93a655ea8..2daf9424e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -182,7 +182,7 @@ public abstract class BaseCommitActionExecutor> String actionType = table.getMetaClient().getCommitActionType(); LOG.info("Committing " + instantTime + ", action Type " + actionType); // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, jsc.hadoopConfiguration()); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java index 2c0afa9cb..a2ce958d6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java @@ -89,7 +89,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor { } HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); // Compacting is very similar to applying updates to existing file - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc, metaClient); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc.hadoopConfiguration(), metaClient); List operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); LOG.info("Compactor compacting " + operations + " files"); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 7ac916b11..bdfd380bb 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -99,7 +99,7 @@ public class TestClientRollback extends TestHoodieClientBase { List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, getConfig(), hadoopConf); final BaseFileOnlyView view1 = table.getBaseFileOnlyView(); List dataFiles = partitionPaths.stream().flatMap(s -> { @@ -124,7 +124,7 @@ public class TestClientRollback extends TestHoodieClientBase { assertNoWriteErrors(statuses); metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, getConfig(), jsc); + table = HoodieTable.create(metaClient, getConfig(), hadoopConf); final BaseFileOnlyView view2 = table.getBaseFileOnlyView(); dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList()); @@ -140,7 +140,7 @@ public class TestClientRollback extends TestHoodieClientBase { client.restoreToSavepoint(savepoint.getTimestamp()); metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, getConfig(), jsc); + table = HoodieTable.create(metaClient, getConfig(), hadoopConf); final BaseFileOnlyView view3 = table.getBaseFileOnlyView(); dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList()); assertEquals(3, dataFiles.size(), "The data files for commit 002 be available"); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java index 899c8686c..89f109aa9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java @@ -156,7 +156,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { } protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); ((SyncableFileSystemView) (table.getSliceView())).reset(); return table; } @@ -250,7 +250,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { final HoodieIndex index = HoodieIndex.createIndex(writeConfig); List records = recordGenFunction.apply(commit, numRecords); final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc); + HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf); JavaRDD taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table); return taggedRecords.collect(); }; @@ -271,7 +271,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { final HoodieIndex index = HoodieIndex.createIndex(writeConfig); List records = keyGenFunction.apply(numRecords); final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc); + HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf); JavaRDD recordsToDelete = jsc.parallelize(records, 1) .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); JavaRDD taggedRecords = index.tagLocation(recordsToDelete, jsc, table); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 239ea344e..246ba37e5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -809,7 +809,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); String instantTime = "000"; client.startCommitWithTime(instantTime); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index 255243711..2e5a18a3b 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -67,7 +67,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { public void testSchemaEvolutionOnUpdate() throws Exception { // Create a bunch of records with a old version of schema final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt"); - final HoodieTable table = HoodieTable.create(config, jsc); + final HoodieTable table = HoodieTable.create(config, hadoopConf); final List statuses = jsc.parallelize(Arrays.asList(1)).map(x -> { String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," @@ -102,7 +102,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { final WriteStatus insertResult = statuses.get(0); String fileId = insertResult.getFileId(); - final HoodieTable table2 = HoodieTable.create(config2, jsc); + final HoodieTable table2 = HoodieTable.create(config2, hadoopConf); assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> { // New content with values for the newly added field String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java index 1312b23ee..62d5505d0 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java @@ -50,6 +50,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class); protected transient JavaSparkContext jsc = null; + protected transient Configuration hadoopConf = null; protected transient SQLContext sqlContext; protected transient FileSystem fs; protected transient HoodieTestDataGenerator dataGen = null; @@ -103,6 +104,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im // Initialize a local spark env jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName)); jsc.setLogLevel("ERROR"); + hadoopConf = jsc.hadoopConfiguration(); // SQLContext stuff sqlContext = new SQLContext(jsc); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java index 82dc8ace4..42a0b970f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java @@ -143,7 +143,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { HBaseIndex index = new HBaseIndex(config); try (HoodieWriteClient writeClient = getWriteClient(config);) { metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -163,7 +163,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.create(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); @@ -184,7 +184,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); writeClient.startCommitWithTime(newCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); JavaRDD javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -202,7 +202,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.create(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); @@ -228,7 +228,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // commit this upsert writeClient.commit(newCommitTime, writeStatues); - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Now tagLocation for these records, hbaseIndex should tag them JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200); @@ -243,7 +243,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // Rollback the last commit writeClient.rollback(newCommitTime); - hoodieTable = HoodieTable.create(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled // back commit javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -272,7 +272,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { List records = dataGen.generateInserts(newCommitTime, 250); JavaRDD writeRecords = jsc.parallelize(records, 1); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Insert 250 records JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); @@ -297,7 +297,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { List records = dataGen.generateInserts(newCommitTime, 250); JavaRDD writeRecords = jsc.parallelize(records, 1); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Insert 200 records JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); @@ -409,7 +409,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { HBaseIndex index = new HBaseIndex(config); try (HoodieWriteClient writeClient = getWriteClient(config);) { metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -429,7 +429,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.create(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); @@ -449,7 +449,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { HBaseIndex index = new HBaseIndex(config); try (HoodieWriteClient writeClient = getWriteClient(config);) { metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -463,7 +463,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.create(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); @@ -500,7 +500,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { assertTrue(index.canIndexLogFiles()); assertThrows(UnsupportedOperationException.class, () -> { - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); index.fetchRecordLocation(jsc.parallelize(new ArrayList(), 1), jsc, hoodieTable); }, "HbaseIndex supports fetchRecordLocation"); } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 1e8622506..a6f69b612 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -153,7 +153,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); List> filesList = index.loadInvolvedFiles(partitions, jsc, table); // Still 0, as no valid commit assertEquals(0, filesList.size()); @@ -163,7 +163,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { Files.createFile(hoodieDir.resolve("20160401010101.commit")); Files.createFile(hoodieDir.resolve("20150312101010.commit")); - table = HoodieTable.create(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, hadoopConf); filesList = index.loadInvolvedFiles(partitions, jsc, table); assertEquals(4, filesList.size()); @@ -279,7 +279,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Also create the metadata and config HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -318,7 +318,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Also create the metadata and config HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -339,7 +339,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, hadoopConf); taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); @@ -389,7 +389,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Also create the metadata and config HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -411,7 +411,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, hadoopConf); taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table); // Check results @@ -461,7 +461,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2)); HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index ccb0edc55..8953213a6 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -131,7 +131,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up List partitions = Arrays.asList("2016/01/21", "2016/04/01"); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // partitions will NOT be respected by this loadInvolvedFiles(...) call List> filesList = index.loadInvolvedFiles(partitions, jsc, table); // Still 0, as no valid commit @@ -142,7 +142,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { Files.createFile(hoodieDir.resolve("20160401010101.commit")); Files.createFile(hoodieDir.resolve("20150312101010.commit")); - table = HoodieTable.create(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, hadoopConf); filesList = index.loadInvolvedFiles(partitions, jsc, table); assertEquals(4, filesList.size()); @@ -261,7 +261,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // Add some commits Files.createDirectories(Paths.get(basePath, ".hoodie")); @@ -346,7 +346,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // Add some commits Files.createDirectories(Paths.get(basePath, ".hoodie")); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index a7618f748..d56f29781 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -80,7 +80,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { .withParallelism(2, 2).forTable("test-trip-table").build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); - boolean result = archiveLog.archiveIfRequired(jsc); + boolean result = archiveLog.archiveIfRequired(hadoopConf); assertTrue(result); } @@ -159,7 +159,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); - assertTrue(archiveLog.archiveIfRequired(jsc)); + assertTrue(archiveLog.archiveIfRequired(hadoopConf)); // reload the timeline and remove the remaining commits timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); @@ -248,7 +248,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(jsc); + boolean result = archiveLog.archiveIfRequired(hadoopConf); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5"); @@ -291,7 +291,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(jsc); + boolean result = archiveLog.archiveIfRequired(hadoopConf); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe"); @@ -318,7 +318,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(jsc); + boolean result = archiveLog.archiveIfRequired(hadoopConf); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertEquals(5, timeline.countInstants(), @@ -354,7 +354,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline(); assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(jsc); + boolean result = archiveLog.archiveIfRequired(hadoopConf); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline(); assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")), @@ -401,7 +401,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf()); HoodieInstant instant5 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5"); - boolean result = archiveLog.archiveIfRequired(jsc); + boolean result = archiveLog.archiveIfRequired(hadoopConf); assertTrue(result); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java index 027fa0043..5d1324cd0 100755 --- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java @@ -44,7 +44,7 @@ public class TestHoodieStorageWriterFactory extends TestHoodieClientBase { final String instantTime = "100"; final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"); final HoodieWriteConfig cfg = getConfig(); - HoodieTable table = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); HoodieStorageWriter parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 5c62423c8..2efdeb5e9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -125,7 +125,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit."); // Should have 100 records in table (check using Index), all in locations marked at commit - HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), hadoopConf); assertFalse(table.getCompletedCommitsTimeline().empty()); if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) { @@ -211,7 +211,7 @@ public class TestCleaner extends TestHoodieClientBase { Map compactionFileIdToLatestFileSlice = new HashMap<>(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, getConfig(), hadoopConf); for (String partitionPath : dataGen.getPartitionPaths()) { TableFileSystemView fsView = table.getFileSystemView(); Option added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> { @@ -248,7 +248,7 @@ public class TestCleaner extends TestHoodieClientBase { assertNoWriteErrors(statuses); metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, getConfig(), jsc); + table = HoodieTable.create(metaClient, getConfig(), hadoopConf); HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline(); TableFileSystemView fsView = table.getFileSystemView(); @@ -381,7 +381,7 @@ public class TestCleaner extends TestHoodieClientBase { assertNoWriteErrors(statuses); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable table1 = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest // commit @@ -894,7 +894,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000")); table.getActiveTimeline().transitionRequestedToInflight( @@ -1040,7 +1040,7 @@ public class TestCleaner extends TestHoodieClientBase { if (j == i && j <= maxNumFileIdsForCompaction) { expFileIdToPendingCompaction.put(fileId, compactionInstants[j]); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); FileSlice slice = table.getSliceView().getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .filter(fs -> fs.getFileId().equals(fileId)).findFirst().get(); @@ -1082,7 +1082,7 @@ public class TestCleaner extends TestHoodieClientBase { // Test for safety final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); + final HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); expFileIdToPendingCompaction.forEach((fileId, value) -> { String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 309282b98..57f5edbf8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -153,7 +153,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.compact(compactionCommitTime); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); @@ -308,7 +308,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -417,7 +417,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.rollback(newCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); @@ -449,7 +449,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -530,7 +530,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { .filter(file -> file.getPath().getName().contains(commitTime2)).count()); metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); @@ -596,7 +596,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -759,7 +759,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -849,7 +849,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // Verify that all data file has one log file metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state ((SyncableFileSystemView) (table.getSliceView())).reset(); @@ -873,7 +873,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // Verify that recently written compacted data file has no log file metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, hadoopConf); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); assertTrue(HoodieTimeline @@ -907,7 +907,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.commit(newCommitTime, statuses); HoodieTable table = - HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); + HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -981,7 +981,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.rollback(newCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -1017,7 +1017,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { statuses.collect(); HoodieTable table = - HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); + HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -1038,7 +1038,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); // Trigger a rollback of compaction writeClient.rollback(newCommitTime); - table = HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); + table = HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf); tableRTFileSystemView = table.getSliceView(); ((SyncableFileSystemView) tableRTFileSystemView).reset(); Option lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant(); @@ -1059,7 +1059,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getWriteClient(cfg);) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); // Create a commit without rolling stats in metadata to test backwards compatibility HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); @@ -1080,7 +1080,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - table = HoodieTable.create(cfg, jsc); + table = HoodieTable.create(cfg, hadoopConf); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); @@ -1104,7 +1104,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - table = HoodieTable.create(cfg, jsc); + table = HoodieTable.create(cfg, hadoopConf); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1128,7 +1128,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.rollback(instantTime); // Read from commit file - table = HoodieTable.create(cfg, jsc); + table = HoodieTable.create(cfg, hadoopConf); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1171,7 +1171,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - HoodieTable table = HoodieTable.create(cfg, jsc); + HoodieTable table = HoodieTable.create(cfg, hadoopConf); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1200,7 +1200,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - table = HoodieTable.create(cfg, jsc); + table = HoodieTable.create(cfg, hadoopConf); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1231,7 +1231,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.commitCompaction(instantTime, statuses, Option.empty()); // Read from commit file - table = HoodieTable.create(cfg, jsc); + table = HoodieTable.create(cfg, hadoopConf); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(), @@ -1259,7 +1259,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue(client.commit(instantTime, statuses), "Commit should succeed"); // Read from commit file - table = HoodieTable.create(cfg, jsc); + table = HoodieTable.create(cfg, hadoopConf); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1305,7 +1305,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, jsc); + HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -1398,7 +1398,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); assertTrue(deltaCommit.isPresent()); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 3fc8497fc..54fb555bb 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -98,7 +98,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { String instantTime = HoodieTestUtils.makeNewCommitTime(); HoodieWriteConfig config = makeHoodieClientConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); Pair newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> { HoodieRecord record = mock(HoodieRecord.class); @@ -134,7 +134,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); String partitionPath = "2016/01/31"; - HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); // Get some records belong to the same partition (2016/01/31) String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," @@ -276,7 +276,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); // Get some records belong to the same partition (2016/01/31) String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," @@ -314,7 +314,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { HoodieWriteConfig config = makeHoodieClientConfig(); String instantTime = HoodieTestUtils.makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); // Case 1: // 10 records for partition 1, 1 record for partition 2. @@ -369,7 +369,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { .limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build(); String instantTime = HoodieTestUtils.makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); List records = new ArrayList<>(); // Approx 1150 records are written for block size of 64KB @@ -403,7 +403,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness { HoodieWriteConfig config = makeHoodieClientConfigBuilder() .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); metaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); + final HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); String instantTime = "000"; // Perform inserts of 100 records to test CreateHandle and BufferedExecutor final List inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index ebdf0fd5f..46a91569e 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -77,7 +77,7 @@ public class TestUpsertPartitioner extends HoodieClientTestHarness { HoodieClientTestUtils.fakeCommitFile(basePath, "001"); HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); List insertRecords = dataGenerator.generateInserts("001", numInserts); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java index dd7b5058e..f348f0c92 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java @@ -119,7 +119,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Reload and rollback inflight compaction metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf); // hoodieTable.rollback(jsc, // new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index 07b4207a9..0ebebed65 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -104,7 +104,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { @Test public void testCompactionOnCopyOnWriteFail() throws Exception { metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); - HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, getConfig(), hadoopConf); String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); assertThrows(HoodieNotSupportedException.class, () -> { table.scheduleCompaction(jsc, compactionInstantTime, Option.empty()); @@ -116,7 +116,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { public void testCompactionEmpty() throws Exception { HoodieWriteConfig config = getConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); try (HoodieWriteClient writeClient = getWriteClient(config);) { String newCommitTime = writeClient.startCommit(); @@ -143,7 +143,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { writeClient.insert(recordsRDD, newCommitTime).collect(); // Update all the 100 records - HoodieTable table = HoodieTable.create(config, jsc); + HoodieTable table = HoodieTable.create(config, hadoopConf); newCommitTime = "101"; writeClient.startCommitWithTime(newCommitTime); @@ -157,7 +157,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords); // Verify that all data file has one log file - table = HoodieTable.create(config, jsc); + table = HoodieTable.create(config, hadoopConf); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); @@ -168,7 +168,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime); // Do a compaction - table = HoodieTable.create(config, jsc); + table = HoodieTable.create(config, hadoopConf); String compactionInstantTime = "102"; table.scheduleCompaction(jsc, compactionInstantTime, Option.empty()); table.getMetaClient().reloadActiveTimeline(); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index f0891e3eb..8bb460990 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -55,7 +55,7 @@ class IncrementalRelation(val sqlContext: SQLContext, } // TODO : Figure out a valid HoodieWriteConfig private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(), - sqlContext.sparkContext) + sqlContext.sparkContext.hadoopConfiguration) val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull")