diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java index 18bc9be41..a0b4be835 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java @@ -118,10 +118,6 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl return config; } - public Option getTimelineServer() { - return timelineServer; - } - protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) { return ClientUtils.createMetaClient(jsc, config, loadActiveTimelineOnLoad); } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 6008fe4bc..ea5ed9f61 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -140,9 +140,9 @@ public abstract class AbstractHoodieWriteClient e private boolean commit(String instantTime, JavaRDD writeStatuses, Option> extraMetadata, String actionType) { - LOG.info("Commiting " + instantTime); + LOG.info("Committing " + instantTime); // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); @@ -221,7 +221,7 @@ public abstract class AbstractHoodieWriteClient e // TODO : make sure we cannot rollback / archive last commit file try { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise // there may be race conditions HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType); @@ -272,7 +272,7 @@ public abstract class AbstractHoodieWriteClient e setWriteSchemaFromLastInstant(metaClient); } // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { writeContext = metrics.getCommitCtx(); } else { @@ -321,8 +321,7 @@ public abstract class AbstractHoodieWriteClient e // Create a Hoodie table which encapsulated the commits and files visible try { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable( - createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); Option rollbackInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() .filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback)) @@ -341,8 +340,7 @@ public abstract class AbstractHoodieWriteClient e protected List doRollbackAndGetStats(final HoodieInstant instantToRollback) throws IOException { final String commitToRollback = instantToRollback.getTimestamp(); - HoodieTable table = HoodieTable.getHoodieTable( - createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline(); HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); // Check if any of the commits is a savepoint - do not allow rollback on those commits @@ -391,7 +389,7 @@ public abstract class AbstractHoodieWriteClient e private void finishRollback(final Timer.Context context, List rollbackStats, List commitsToRollback, final String startRollbackTime) throws IOException { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); Option durationInMs = Option.empty(); long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); if (context != null) { diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java deleted file mode 100644 index d622f7099..000000000 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieCleanClient.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.client; - -import org.apache.hudi.avro.model.HoodieCleanMetadata; -import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.client.embedded.EmbeddedTimelineService; -import org.apache.hudi.common.HoodieCleanStat; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieInstant.State; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; -import org.apache.hudi.common.util.CleanerUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.metrics.HoodieMetrics; -import org.apache.hudi.table.HoodieTable; - -import com.codahale.metrics.Timer; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; - -import java.io.IOException; -import java.util.List; - -public class HoodieCleanClient extends AbstractHoodieClient { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class); - private final transient HoodieMetrics metrics; - - public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { - this(jsc, clientConfig, metrics, Option.empty()); - } - - public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics, - Option timelineService) { - super(jsc, clientConfig, timelineService); - this.metrics = metrics; - } - - /** - * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the - * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be - * cleaned) - */ - public void clean() throws HoodieIOException { - String startCleanTime = HoodieActiveTimeline.createNewInstantTime(); - clean(startCleanTime); - } - - /** - * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the - * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be - * cleaned) - * - * @param startCleanTime Cleaner Instant Timestamp - * @throws HoodieIOException in case of any IOException - */ - public HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException { - // Create a Hoodie table which encapsulated the commits and files visible - final HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - - // If there are inflight(failed) or previously requested clean operation, first perform them - table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> { - LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant); - try { - runClean(table, hoodieInstant); - } catch (Exception e) { - LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e); - } - }); - - Option cleanerPlanOpt = scheduleClean(startCleanTime); - - if (cleanerPlanOpt.isPresent()) { - HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get(); - if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) - && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { - final HoodieTable hoodieTable = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - return runClean(hoodieTable, HoodieTimeline.getCleanRequestedInstant(startCleanTime), cleanerPlan); - } - } - return null; - } - - /** - * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file. - * - * @param startCleanTime Cleaner Instant Time - * @return Cleaner Plan if generated - */ - protected Option scheduleClean(String startCleanTime) { - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - - HoodieCleanerPlan cleanerPlan = table.scheduleClean(jsc); - - if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) - && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { - - HoodieInstant cleanInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime); - // Save to both aux and timeline folder - try { - table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); - LOG.info("Requesting Cleaning with instant time " + cleanInstant); - } catch (IOException e) { - LOG.error("Got exception when saving cleaner requested file", e); - throw new HoodieIOException(e.getMessage(), e); - } - return Option.of(cleanerPlan); - } - return Option.empty(); - } - - /** - * Executes the Cleaner plan stored in the instant metadata. - * - * @param table Hoodie Table - * @param cleanInstant Cleaner Instant - */ - public HoodieCleanMetadata runClean(HoodieTable table, HoodieInstant cleanInstant) { - try { - HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant); - return runClean(table, cleanInstant, cleanerPlan); - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - } - - private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstant cleanInstant, - HoodieCleanerPlan cleanerPlan) { - ValidationUtils.checkArgument( - cleanInstant.getState().equals(State.REQUESTED) || cleanInstant.getState().equals(State.INFLIGHT)); - - try { - LOG.info("Cleaner started"); - final Timer.Context context = metrics.getCleanCtx(); - - if (!cleanInstant.isInflight()) { - // Mark as inflight first - cleanInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant, - TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); - } - - List cleanStats = table.clean(jsc, cleanInstant, cleanerPlan); - - if (cleanStats.isEmpty()) { - return HoodieCleanMetadata.newBuilder().build(); - } - - // Emit metrics (duration, numFilesDeleted) if needed - Option durationInMs = Option.empty(); - if (context != null) { - durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); - } - - HoodieTableMetaClient metaClient = createMetaClient(true); - // Create the metadata and save it - HoodieCleanMetadata metadata = - CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats); - LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain()); - metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); - - table.getActiveTimeline().transitionCleanInflightToComplete( - new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()), - TimelineMetadataUtils.serializeCleanMetadata(metadata)); - LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete"); - return metadata; - } catch (IOException e) { - throw new HoodieIOException("Failed to clean up after commit", e); - } - } -} diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java index 2dedfaf3f..4b97c4ae1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java @@ -96,7 +96,7 @@ public class HoodieReadClient implements Serializ final String basePath = clientConfig.getBasePath(); // Create a Hoodie table which encapsulated the commits and files visible HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc); + this.hoodieTable = HoodieTable.create(metaClient, clientConfig, jsc); this.index = HoodieIndex.createIndex(clientConfig, jsc); this.sqlContextOpt = Option.empty(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 2a25a769f..e69eef271 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -98,7 +98,6 @@ public class HoodieWriteClient extends AbstractHo private static final String LOOKUP_STR = "lookup"; private final boolean rollbackPending; private final transient HoodieMetrics metrics; - private final transient HoodieCleanClient cleanClient; private transient Timer.Context compactionTimer; /** @@ -139,7 +138,6 @@ public class HoodieWriteClient extends AbstractHo super(jsc, index, clientConfig, timelineService); this.metrics = new HoodieMetrics(config, config.getTableName()); this.rollbackPending = rollbackPending; - this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService); } /** @@ -161,7 +159,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD> filterExists(JavaRDD> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); Timer.Context indexTimer = metrics.getIndexCtx(); JavaRDD> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); @@ -539,7 +537,7 @@ public class HoodieWriteClient extends AbstractHo * @return true if the savepoint was created successfully */ public boolean savepoint(String user, String comment) { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } @@ -567,7 +565,7 @@ public class HoodieWriteClient extends AbstractHo * @return true if the savepoint was created successfully */ public boolean savepoint(String instantTime, String user, String comment) { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); } @@ -628,7 +626,7 @@ public class HoodieWriteClient extends AbstractHo * @return true if the savepoint was deleted successfully */ public void deleteSavepoint(String savepointTime) { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); } @@ -655,7 +653,7 @@ public class HoodieWriteClient extends AbstractHo * @param compactionTime - delete the compaction time */ private void deleteRequestedCompaction(String compactionTime) { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieInstant compactionRequestedInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime); @@ -682,7 +680,7 @@ public class HoodieWriteClient extends AbstractHo * @return true if the savepoint was rollecback to successfully */ public boolean rollbackToSavepoint(String savepointTime) { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); // Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected @@ -737,7 +735,7 @@ public class HoodieWriteClient extends AbstractHo public void restoreToInstant(final String instantTime) throws HoodieRollbackException { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); // Get all the commits on the timeline after the provided commit time List instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline() .getReverseOrderedInstants() @@ -788,7 +786,7 @@ public class HoodieWriteClient extends AbstractHo private void finishRestore(final Timer.Context context, Map> commitToStats, List commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); Option durationInMs = Option.empty(); long numFilesDeleted = 0L; for (Map.Entry> commitToStat : commitToStats.entrySet()) { @@ -821,7 +819,6 @@ public class HoodieWriteClient extends AbstractHo public void close() { // Stop timeline-server if running super.close(); - this.cleanClient.close(); } /** @@ -829,20 +826,25 @@ public class HoodieWriteClient extends AbstractHo * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * cleaned) */ - public void clean() throws HoodieIOException { - cleanClient.clean(); + public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException { + LOG.info("Cleaner started"); + final Timer.Context context = metrics.getCleanCtx(); + + HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime); + + if (context != null) { + long durationMs = metrics.getDurationInMs(context.stop()); + metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); + LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" + + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + + " cleanerElaspsedMs" + durationMs); + } + + return metadata; } - /** - * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the - * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be - * cleaned) - * - * @param startCleanTime Cleaner Instant Timestamp - * @throws HoodieIOException in case of any IOException - */ - protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException { - return cleanClient.clean(startCleanTime); + public HoodieCleanMetadata clean() { + return clean(HoodieActiveTimeline.createNewInstantTime()); } /** @@ -882,7 +884,7 @@ public class HoodieWriteClient extends AbstractHo HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" + latestPending + ", Ingesting at " + instantTime)); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitActionType = table.getMetaClient().getCommitActionType(); activeTimeline.createNewInstant(new HoodieInstant(State.REQUESTED, commitActionType, instantTime)); @@ -924,7 +926,7 @@ public class HoodieWriteClient extends AbstractHo ValidationUtils.checkArgument(conflictingInstants.isEmpty(), "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" + conflictingInstants); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime); if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) { extraMetadata.ifPresent(workload::setExtraMetadata); @@ -957,7 +959,7 @@ public class HoodieWriteClient extends AbstractHo public void commitCompaction(String compactionInstantTime, JavaRDD writeStatuses, Option> extraMetadata) throws IOException { HoodieTableMetaClient metaClient = createMetaClient(true); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan( timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); @@ -1020,7 +1022,7 @@ public class HoodieWriteClient extends AbstractHo * Cleanup all pending commits. */ private void rollbackPendingCommits() { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.create(config, jsc); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); List commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); @@ -1038,7 +1040,7 @@ public class HoodieWriteClient extends AbstractHo private JavaRDD compact(String compactionInstantTime, boolean autoCommit) throws IOException { // Create a Hoodie table which encapsulated the commits and files visible HoodieTableMetaClient metaClient = createMetaClient(true); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { @@ -1046,7 +1048,7 @@ public class HoodieWriteClient extends AbstractHo rollbackInflightCompaction(inflightInstant, table); // refresh table metaClient = createMetaClient(true); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, jsc); pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); } @@ -1076,7 +1078,7 @@ public class HoodieWriteClient extends AbstractHo activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); compactionTimer = metrics.getCompactionCtx(); // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); JavaRDD statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); // Force compaction action statuses.persist(SparkConfigUtils.getWriteStatusStorageLevel(config.getProps())); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java index e8fdb774f..635e96b2b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java @@ -138,7 +138,7 @@ public class HoodieCommitArchiveLog { int maxCommitsToKeep = config.getMaxCommitsToKeep(); int minCommitsToKeep = config.getMinCommitsToKeep(); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); // GroupBy each action and limit each action timeline to maxCommitsToKeep // TODO: Handle ROLLBACK_ACTION in future diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index de43900b9..f01cdaa01 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -18,22 +18,22 @@ package org.apache.hudi.table; -import org.apache.hudi.avro.model.HoodieActionInstant; -import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.ParquetReaderIterator; -import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRollingStatMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; @@ -45,20 +45,15 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable; import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; +import org.apache.hudi.table.action.clean.CleanActionExecutor; import org.apache.hudi.table.rollback.RollbackHelper; import org.apache.hudi.table.rollback.RollbackRequest; - -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; @@ -67,9 +62,9 @@ import org.apache.parquet.hadoop.ParquetReader; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -81,9 +76,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import org.apache.spark.api.java.function.PairFunction; -import scala.Tuple2; - /** * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with * zero read amplification. @@ -97,49 +89,8 @@ public class HoodieCopyOnWriteTable extends Hoodi private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class); - public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) { - super(config, jsc); - } - - private static PairFlatMapFunction>, String, PartitionCleanStat> deleteFilesFunc( - HoodieTable table) { - return (PairFlatMapFunction>, String, PartitionCleanStat>) iter -> { - Map partitionCleanStatMap = new HashMap<>(); - - FileSystem fs = table.getMetaClient().getFs(); - Path basePath = new Path(table.getMetaClient().getBasePath()); - while (iter.hasNext()) { - Tuple2 partitionDelFileTuple = iter.next(); - String partitionPath = partitionDelFileTuple._1(); - String delFileName = partitionDelFileTuple._2(); - Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName); - String deletePathStr = deletePath.toString(); - Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); - if (!partitionCleanStatMap.containsKey(partitionPath)) { - partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); - } - PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); - partitionCleanStat.addDeleteFilePatterns(deletePath.getName()); - partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult); - } - return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue())) - .collect(Collectors.toList()).iterator(); - }; - } - - private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { - Path deletePath = new Path(deletePathStr); - LOG.debug("Working on delete path :" + deletePath); - try { - boolean deleteResult = fs.delete(deletePath, false); - if (deleteResult) { - LOG.debug("Cleaned file at path :" + deletePath); - } - return deleteResult; - } catch (FileNotFoundException fio) { - // With cleanPlan being used for retried cleaning operations, its possible to clean a file twice - return false; - } + public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) { + super(config, jsc, metaClient); } @Override @@ -278,77 +229,9 @@ public class HoodieCopyOnWriteTable extends Hoodi return handleUpsertPartition(instantTime, partition, recordItr, partitioner); } - /** - * Generates List of files to be cleaned. - * - * @param jsc JavaSparkContext - * @return Cleaner Plan - */ @Override - public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) { - try { - CleanHelper cleaner = new CleanHelper(this, config); - Option earliestInstant = cleaner.getEarliestCommitToRetain(); - - List partitionsToClean = cleaner.getPartitionPathsToClean(earliestInstant); - - if (partitionsToClean.isEmpty()) { - LOG.info("Nothing to clean here. It is already clean"); - return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); - } - LOG.info( - "Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); - int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); - LOG.info("Using cleanerParallelism: " + cleanerParallelism); - - Map> cleanOps = jsc.parallelize(partitionsToClean, cleanerParallelism) - .map(partitionPathToClean -> Pair.of(partitionPathToClean, cleaner.getDeletePaths(partitionPathToClean))) - .collect().stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - return new HoodieCleanerPlan(earliestInstant - .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), - config.getCleanerPolicy().name(), cleanOps, 1); - } catch (IOException e) { - throw new HoodieIOException("Failed to schedule clean operation", e); - } - } - - /** - * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles - * skews in partitions to clean by making files to clean as the unit of task distribution. - * - * @throws IllegalArgumentException if unknown cleaning policy is provided - */ - @Override - public List clean(JavaSparkContext jsc, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) { - int cleanerParallelism = Math.min( - (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), - config.getCleanerParallelism()); - LOG.info("Using cleanerParallelism: " + cleanerParallelism); - List> partitionCleanStats = jsc - .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() - .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y))) - .collect(Collectors.toList()), cleanerParallelism) - .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey(PartitionCleanStat::merge).collect(); - - Map partitionCleanStatsMap = - partitionCleanStats.stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); - - // Return PartitionCleanStat for each partition passed. - return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { - PartitionCleanStat partitionCleanStat = - (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap.get(partitionPath) - : new PartitionCleanStat(partitionPath); - HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); - return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) - .withEarliestCommitRetained(Option.ofNullable( - actionInstant != null - ? new HoodieInstant(State.valueOf(actionInstant.getState()), - actionInstant.getAction(), actionInstant.getTimestamp()) - : null)) - .withDeletePathPattern(partitionCleanStat.deletePathPatterns) - .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles) - .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build(); - }).collect(Collectors.toList()); + public HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime) { + return new CleanActionExecutor(jsc, config, this, cleanInstantTime).execute(); } @Override @@ -446,40 +329,7 @@ public class HoodieCopyOnWriteTable extends Hoodi } } - private static class PartitionCleanStat implements Serializable { - private final String partitionPath; - private final List deletePathPatterns = new ArrayList<>(); - private final List successDeleteFiles = new ArrayList<>(); - private final List failedDeleteFiles = new ArrayList<>(); - - private PartitionCleanStat(String partitionPath) { - this.partitionPath = partitionPath; - } - - private void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) { - if (deletedFileResult) { - successDeleteFiles.add(deletePathStr); - } else { - failedDeleteFiles.add(deletePathStr); - } - } - - private void addDeleteFilePatterns(String deletePathStr) { - deletePathPatterns.add(deletePathStr); - } - - private PartitionCleanStat merge(PartitionCleanStat other) { - if (!this.partitionPath.equals(other.partitionPath)) { - throw new RuntimeException( - String.format("partitionPath is not a match: (%s, %s)", partitionPath, other.partitionPath)); - } - successDeleteFiles.addAll(other.successDeleteFiles); - deletePathPatterns.addAll(other.deletePathPatterns); - failedDeleteFiles.addAll(other.failedDeleteFiles); - return this; - } - } /** * Helper class for a small file's location and its actual size on disk. diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 7e96eb687..d7783cd48 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -84,8 +85,8 @@ public class HoodieMergeOnReadTable extends Hoodi // UpsertPartitioner for MergeOnRead table type private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner; - public HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc) { - super(config, jsc); + HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) { + super(config, jsc, metaClient); } @Override diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index e3c134c10..50ec45f63 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,13 +18,14 @@ package org.apache.hudi.table; -import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.utils.ClientUtils; -import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuard; @@ -39,22 +40,20 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -87,12 +86,12 @@ public abstract class HoodieTable implements Seri protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier(); - protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc) { + protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) { this.config = config; this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration()); this.viewManager = FileSystemViewManager.createViewManager(new SerializableConfiguration(jsc.hadoopConfiguration()), config.getViewStorageConfig()); - this.metaClient = ClientUtils.createMetaClient(jsc, config, true); + this.metaClient = metaClient; this.index = HoodieIndex.createIndex(config, jsc); } @@ -103,13 +102,25 @@ public abstract class HoodieTable implements Seri return viewManager; } - public static HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, - HoodieWriteConfig config, JavaSparkContext jsc) { + public static HoodieTable create(HoodieWriteConfig config, JavaSparkContext jsc) { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient( + jsc.hadoopConfiguration(), + config.getBasePath(), + true, + config.getConsistencyGuardConfig(), + Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())) + ); + return HoodieTable.create(metaClient, config, jsc); + } + + public static HoodieTable create(HoodieTableMetaClient metaClient, + HoodieWriteConfig config, + JavaSparkContext jsc) { switch (metaClient.getTableType()) { case COPY_ON_WRITE: - return new HoodieCopyOnWriteTable<>(config, jsc); + return new HoodieCopyOnWriteTable<>(config, jsc, metaClient); case MERGE_ON_READ: - return new HoodieMergeOnReadTable<>(config, jsc); + return new HoodieMergeOnReadTable<>(config, jsc, metaClient); default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } @@ -280,23 +291,11 @@ public abstract class HoodieTable implements Seri HoodieCompactionPlan compactionPlan); /** - * Generates list of files that are eligible for cleaning. - * - * @param jsc Java Spark Context - * @return Cleaner Plan containing list of files to be deleted. + * Executes a new clean action. + * + * @return information on cleaned file slices */ - public abstract HoodieCleanerPlan scheduleClean(JavaSparkContext jsc); - - /** - * Cleans the files listed in the cleaner plan associated with clean instant. - * - * @param jsc Java Spark Context - * @param cleanInstant Clean Instant - * @param cleanerPlan Cleaner Plan - * @return list of Clean Stats - */ - public abstract List clean(JavaSparkContext jsc, HoodieInstant cleanInstant, - HoodieCleanerPlan cleanerPlan); + public abstract HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime); /** * Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java new file mode 100644 index 000000000..e6ac2e9b0 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.JavaSparkContext; + +public abstract class BaseActionExecutor { + + protected final JavaSparkContext jsc; + + protected final HoodieWriteConfig config; + + protected final HoodieTable table; + + protected final String instantTime; + + public BaseActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, String instantTime) { + this.jsc = jsc; + this.config = config; + this.table = table; + this.instantTime = instantTime; + } + + public abstract R execute(); +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java new file mode 100644 index 000000000..48fe71da4 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.clean; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import scala.Tuple2; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CleanActionExecutor extends BaseActionExecutor { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class); + + /** + * Generates List of files to be cleaned. + * + * @param jsc JavaSparkContext + * @return Cleaner Plan + */ + HoodieCleanerPlan requestClean(JavaSparkContext jsc) { + try { + CleanPlanner planner = new CleanPlanner<>(table, config); + Option earliestInstant = planner.getEarliestCommitToRetain(); + List partitionsToClean = planner.getPartitionPathsToClean(earliestInstant); + + if (partitionsToClean.isEmpty()) { + LOG.info("Nothing to clean here. It is already clean"); + return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); + } + LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); + int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); + LOG.info("Using cleanerParallelism: " + cleanerParallelism); + + Map> cleanOps = jsc + .parallelize(partitionsToClean, cleanerParallelism) + .map(partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean))) + .collect().stream() + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + return new HoodieCleanerPlan(earliestInstant + .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), + config.getCleanerPolicy().name(), cleanOps, 1); + } catch (IOException e) { + throw new HoodieIOException("Failed to schedule clean operation", e); + } + } + + private static PairFlatMapFunction>, String, PartitionCleanStat> deleteFilesFunc( + HoodieTable table) { + return (PairFlatMapFunction>, String, PartitionCleanStat>) iter -> { + Map partitionCleanStatMap = new HashMap<>(); + + FileSystem fs = table.getMetaClient().getFs(); + Path basePath = new Path(table.getMetaClient().getBasePath()); + while (iter.hasNext()) { + Tuple2 partitionDelFileTuple = iter.next(); + String partitionPath = partitionDelFileTuple._1(); + String delFileName = partitionDelFileTuple._2(); + Path deletePath = FSUtils.getPartitionPath(FSUtils.getPartitionPath(basePath, partitionPath), delFileName); + String deletePathStr = deletePath.toString(); + Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); + if (!partitionCleanStatMap.containsKey(partitionPath)) { + partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); + } + PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); + partitionCleanStat.addDeleteFilePatterns(deletePath.getName()); + partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult); + } + return partitionCleanStatMap.entrySet().stream().map(e -> new Tuple2<>(e.getKey(), e.getValue())) + .collect(Collectors.toList()).iterator(); + }; + } + + private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { + Path deletePath = new Path(deletePathStr); + LOG.debug("Working on delete path :" + deletePath); + try { + boolean deleteResult = fs.delete(deletePath, false); + if (deleteResult) { + LOG.debug("Cleaned file at path :" + deletePath); + } + return deleteResult; + } catch (FileNotFoundException fio) { + // With cleanPlan being used for retried cleaning operations, its possible to clean a file twice + return false; + } + } + + /** + * Performs cleaning of partition paths according to cleaning policy and returns the number of files cleaned. Handles + * skews in partitions to clean by making files to clean as the unit of task distribution. + * + * @throws IllegalArgumentException if unknown cleaning policy is provided + */ + List clean(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan) { + int cleanerParallelism = Math.min( + (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), + config.getCleanerParallelism()); + LOG.info("Using cleanerParallelism: " + cleanerParallelism); + List> partitionCleanStats = jsc + .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() + .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y))) + .collect(Collectors.toList()), cleanerParallelism) + .mapPartitionsToPair(deleteFilesFunc(table)) + .reduceByKey(PartitionCleanStat::merge).collect(); + + Map partitionCleanStatsMap = partitionCleanStats.stream() + .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); + + // Return PartitionCleanStat for each partition passed. + return cleanerPlan.getFilesToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { + PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) + ? partitionCleanStatsMap.get(partitionPath) + : new PartitionCleanStat(partitionPath); + HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); + return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) + .withEarliestCommitRetained(Option.ofNullable( + actionInstant != null + ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), + actionInstant.getAction(), actionInstant.getTimestamp()) + : null)) + .withDeletePathPattern(partitionCleanStat.deletePathPatterns()) + .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) + .withFailedDeletes(partitionCleanStat.failedDeleteFiles()) + .build(); + }).collect(Collectors.toList()); + } + + public CleanActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, String instantTime) { + super(jsc, config, table, instantTime); + } + + /** + * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file. + * + * @param startCleanTime Cleaner Instant Time + * @return Cleaner Plan if generated + */ + Option requestClean(String startCleanTime) { + final HoodieCleanerPlan cleanerPlan = requestClean(jsc); + if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) + && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { + + final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime); + // Save to both aux and timeline folder + try { + table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); + LOG.info("Requesting Cleaning with instant time " + cleanInstant); + } catch (IOException e) { + LOG.error("Got exception when saving cleaner requested file", e); + throw new HoodieIOException(e.getMessage(), e); + } + return Option.of(cleanerPlan); + } + return Option.empty(); + } + + /** + * Executes the Cleaner plan stored in the instant metadata. + */ + void runPendingClean(HoodieTable table, HoodieInstant cleanInstant) { + try { + HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant); + runClean(table, cleanInstant, cleanerPlan); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) { + ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED) + || cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT)); + + try { + final HoodieInstant inflightInstant; + final HoodieTimer timer = new HoodieTimer(); + timer.startTimer(); + if (cleanInstant.isRequested()) { + inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant, + TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); + } else { + inflightInstant = cleanInstant; + } + + List cleanStats = clean(jsc, cleanerPlan); + if (cleanStats.isEmpty()) { + return HoodieCleanMetadata.newBuilder().build(); + } + + table.getMetaClient().reloadActiveTimeline(); + HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata( + inflightInstant.getTimestamp(), + Option.of(timer.endTimer()), + cleanStats + ); + + table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant, + TimelineMetadataUtils.serializeCleanMetadata(metadata)); + LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete"); + return metadata; + } catch (IOException e) { + throw new HoodieIOException("Failed to clean up after commit", e); + } + } + + @Override + public HoodieCleanMetadata execute() { + // If there are inflight(failed) or previously requested clean operation, first perform them + List pendingCleanInstants = table.getCleanTimeline() + .filterInflightsAndRequested().getInstants().collect(Collectors.toList()); + if (pendingCleanInstants.size() > 0) { + pendingCleanInstants.forEach(hoodieInstant -> { + LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant); + try { + runPendingClean(table, hoodieInstant); + } catch (Exception e) { + LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e); + } + }); + table.getMetaClient().reloadActiveTimeline(); + } + + // Plan and execute a new clean action + Option cleanerPlanOpt = requestClean(instantTime); + if (cleanerPlanOpt.isPresent()) { + table.getMetaClient().reloadActiveTimeline(); + HoodieCleanerPlan cleanerPlan = cleanerPlanOpt.get(); + if ((cleanerPlan.getFilesToBeDeletedPerPartition() != null) && !cleanerPlan.getFilesToBeDeletedPerPartition().isEmpty()) { + return runClean(table, HoodieTimeline.getCleanRequestedInstant(instantTime), cleanerPlan); + } + } + return null; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java similarity index 97% rename from hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 00029cf08..62203a382 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/CleanHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table; +package org.apache.hudi.table.action.clean; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.common.fs.FSUtils; @@ -39,6 +39,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -56,12 +57,10 @@ import java.util.stream.Collectors; * 1) It provides sufficient time for existing queries running on older versions, to close *

* 2) It bounds the growth of the files in the file system - *

- * TODO: Should all cleaning be done based on {@link HoodieCommitMetadata} */ -public class CleanHelper> implements Serializable { +public class CleanPlanner> implements Serializable { - private static final Logger LOG = LogManager.getLogger(CleanHelper.class); + private static final Logger LOG = LogManager.getLogger(CleanPlanner.class); private final SyncableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; @@ -69,7 +68,7 @@ public class CleanHelper> implements Serializab private HoodieTable hoodieTable; private HoodieWriteConfig config; - public CleanHelper(HoodieTable hoodieTable, HoodieWriteConfig config) { + public CleanPlanner(HoodieTable hoodieTable, HoodieWriteConfig config) { this.hoodieTable = hoodieTable; this.fileSystemView = hoodieTable.getHoodieView(); this.commitTimeline = hoodieTable.getCompletedCommitTimeline(); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java new file mode 100644 index 000000000..3493ad610 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/PartitionCleanStat.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.clean; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +class PartitionCleanStat implements Serializable { + + private final String partitionPath; + private final List deletePathPatterns = new ArrayList<>(); + private final List successDeleteFiles = new ArrayList<>(); + private final List failedDeleteFiles = new ArrayList<>(); + + PartitionCleanStat(String partitionPath) { + this.partitionPath = partitionPath; + } + + void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) { + if (deletedFileResult) { + successDeleteFiles.add(deletePathStr); + } else { + failedDeleteFiles.add(deletePathStr); + } + } + + void addDeleteFilePatterns(String deletePathStr) { + deletePathPatterns.add(deletePathStr); + } + + PartitionCleanStat merge(PartitionCleanStat other) { + if (!this.partitionPath.equals(other.partitionPath)) { + throw new RuntimeException( + String.format("partitionPath is not a match: (%s, %s)", partitionPath, other.partitionPath)); + } + successDeleteFiles.addAll(other.successDeleteFiles); + deletePathPatterns.addAll(other.deletePathPatterns); + failedDeleteFiles.addAll(other.failedDeleteFiles); + return this; + } + + public List deletePathPatterns() { + return deletePathPatterns; + } + + public List successDeleteFiles() { + return successDeleteFiles; + } + + public List failedDeleteFiles() { + return failedDeleteFiles; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java index d2f2bb919..f71ab9518 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/compact/HoodieMergeOnReadTableCompactor.java @@ -90,7 +90,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor { } HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); // Compacting is very similar to applying updates to existing file - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc, metaClient); List operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); LOG.info("Compactor compacting " + operations + " files"); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 0b682619a..4b3a9edf1 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -99,7 +99,7 @@ public class TestClientRollback extends TestHoodieClientBase { List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc); final BaseFileOnlyView view1 = table.getBaseFileOnlyView(); List dataFiles = partitionPaths.stream().flatMap(s -> { @@ -124,7 +124,7 @@ public class TestClientRollback extends TestHoodieClientBase { assertNoWriteErrors(statuses); metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); + table = HoodieTable.create(metaClient, getConfig(), jsc); final BaseFileOnlyView view2 = table.getBaseFileOnlyView(); dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList()); @@ -143,7 +143,7 @@ public class TestClientRollback extends TestHoodieClientBase { client.rollbackToSavepoint(savepoint.getTimestamp()); metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); + table = HoodieTable.create(metaClient, getConfig(), jsc); final BaseFileOnlyView view3 = table.getBaseFileOnlyView(); dataFiles = partitionPaths.stream().flatMap(s -> { return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002")); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java index b0dca5f81..c5b52fa3c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java @@ -42,7 +42,6 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.FileSystem; @@ -85,10 +84,6 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { cleanupResources(); } - protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) { - return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName())); - } - protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { return getHoodieWriteClient(cfg, false); } @@ -161,7 +156,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { } protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); ((SyncableFileSystemView) (table.getSliceView())).reset(); return table; } @@ -255,7 +250,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); List records = recordGenFunction.apply(commit, numRecords); final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); + HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc); JavaRDD taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table); return taggedRecords.collect(); }; @@ -276,7 +271,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); List records = keyGenFunction.apply(numRecords); final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); + HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc); JavaRDD recordsToDelete = jsc.parallelize(records, 1) .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); JavaRDD taggedRecords = index.tagLocation(recordsToDelete, jsc, table); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index f3b9c65aa..49311f7e1 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -800,7 +800,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable table = HoodieTable.create(metaClient, cfg, jsc); String instantTime = "000"; client.startCommitWithTime(instantTime); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index 0cfad6cfa..1b93ae8d5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -31,11 +31,11 @@ import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.HoodieTable; import org.apache.parquet.avro.AvroReadSupport; import org.junit.After; import org.junit.Assert; @@ -67,7 +67,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { public void testSchemaEvolutionOnUpdate() throws Exception { // Create a bunch of records with a old version of schema final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt"); - final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + final HoodieTable table = HoodieTable.create(config, jsc); final List statuses = jsc.parallelize(Arrays.asList(1)).map(x -> { String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," @@ -102,7 +102,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { final WriteStatus insertResult = statuses.get(0); String fileId = insertResult.getFileId(); - final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config2, jsc); + final HoodieTable table2 = HoodieTable.create(config2, jsc); Assert.assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> { // New content with values for the newly added field String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java index 8cde8b78b..803f1be63 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java @@ -142,7 +142,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { HBaseIndex index = new HBaseIndex(config); try (HoodieWriteClient writeClient = getWriteClient(config);) { metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); // Test tagLocation without any entries in index JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -162,7 +162,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, jsc); javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); @@ -183,7 +183,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); writeClient.startCommitWithTime(newCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); JavaRDD javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -201,7 +201,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, jsc); JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); @@ -227,7 +227,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // commit this upsert writeClient.commit(newCommitTime, writeStatues); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); // Now tagLocation for these records, hbaseIndex should tag them JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200); @@ -242,7 +242,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // Rollback the last commit writeClient.rollback(newCommitTime); - hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, jsc); // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled // back commit javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -271,7 +271,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { List records = dataGen.generateInserts(newCommitTime, 250); JavaRDD writeRecords = jsc.parallelize(records, 1); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); // Insert 250 records JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); @@ -296,7 +296,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { List records = dataGen.generateInserts(newCommitTime, 250); JavaRDD writeRecords = jsc.parallelize(records, 1); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); // Insert 200 records JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); @@ -408,7 +408,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { HBaseIndex index = new HBaseIndex(config); try (HoodieWriteClient writeClient = getWriteClient(config);) { metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); // Test tagLocation without any entries in index JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -428,7 +428,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, jsc); javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); @@ -448,7 +448,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { HBaseIndex index = new HBaseIndex(config); try (HoodieWriteClient writeClient = getWriteClient(config);) { metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); // Test tagLocation without any entries in index JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); @@ -462,7 +462,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + hoodieTable = HoodieTable.create(metaClient, config, jsc); javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); @@ -499,7 +499,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness { assertTrue(index.canIndexLogFiles()); try { - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); index.fetchRecordLocation(jsc.parallelize(new ArrayList(), 1), jsc, hoodieTable); fail("HbaseIndex supports fetchRecordLocation"); } catch (UnsupportedOperationException ex) { diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 1339b44ed..ce29a92c5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -160,7 +160,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); List> filesList = index.loadInvolvedFiles(partitions, jsc, table); // Still 0, as no valid commit assertEquals(filesList.size(), 0); @@ -170,7 +170,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { new File(basePath + "/.hoodie/20160401010101.commit").createNewFile(); new File(basePath + "/.hoodie/20150312101010.commit").createNewFile(); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, jsc); filesList = index.loadInvolvedFiles(partitions, jsc, table); assertEquals(filesList.size(), 4); @@ -284,7 +284,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Also create the metadata and config HoodieWriteConfig config = makeConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -324,7 +324,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Also create the metadata and config HoodieWriteConfig config = makeConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -345,7 +345,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, jsc); taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); @@ -394,7 +394,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Also create the metadata and config HoodieWriteConfig config = makeConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); @@ -416,7 +416,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, jsc); taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table); // Check results @@ -465,7 +465,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2)); HoodieWriteConfig config = makeConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 67832d23a..1daf33610 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -129,7 +129,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up List partitions = Arrays.asList("2016/01/21", "2016/04/01"); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); // partitions will NOT be respected by this loadInvolvedFiles(...) call List> filesList = index.loadInvolvedFiles(partitions, jsc, table); // Still 0, as no valid commit @@ -140,7 +140,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { new File(basePath + "/.hoodie/20160401010101.commit").createNewFile(); new File(basePath + "/.hoodie/20150312101010.commit").createNewFile(); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, jsc); filesList = index.loadInvolvedFiles(partitions, jsc, table); assertEquals(filesList.size(), 4); @@ -259,7 +259,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); // Add some commits new File(basePath + "/.hoodie").mkdirs(); @@ -344,7 +344,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); // Add some commits new File(basePath + "/.hoodie").mkdirs(); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java index a1492dd43..50e8467b0 100755 --- a/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/storage/TestHoodieStorageWriterFactory.java @@ -44,7 +44,7 @@ public class TestHoodieStorageWriterFactory extends TestHoodieClientBase { final String instantTime = "100"; final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"); final HoodieWriteConfig cfg = getConfig(); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable table = HoodieTable.create(metaClient, cfg, jsc); SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); HoodieStorageWriter parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index a23d0e0dd..d8b315d47 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -21,7 +21,6 @@ package org.apache.hudi.table; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.HoodieCleanClient; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.TestHoodieClientBase; import org.apache.hudi.client.WriteStatus; @@ -104,7 +103,7 @@ public class TestCleaner extends TestHoodieClientBase { * @param insertFn Insertion API for testing * @throws Exception in case of error */ - private String insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client, + private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, HoodieWriteClient client, Function2, String, Integer> recordGenFunction, Function3, HoodieWriteClient, JavaRDD, String> insertFn) throws Exception { @@ -126,7 +125,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); // Should have 100 records in table (check using Index), all in locations marked at commit - HoodieTable table = HoodieTable.getHoodieTable(metaClient, client.getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), jsc); assertFalse(table.getCompletedCommitsTimeline().empty()); String instantTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); @@ -137,7 +136,6 @@ public class TestCleaner extends TestHoodieClientBase { HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect(); checkTaggedRecords(taggedRecords, newCommitTime); - return newCommitTime; } /** @@ -207,7 +205,7 @@ public class TestCleaner extends TestHoodieClientBase { Map compactionFileIdToLatestFileSlice = new HashMap<>(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc); for (String partitionPath : dataGen.getPartitionPaths()) { TableFileSystemView fsView = table.getFileSystemView(); Option added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> { @@ -244,7 +242,7 @@ public class TestCleaner extends TestHoodieClientBase { assertNoWriteErrors(statuses); metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); + table = HoodieTable.create(metaClient, getConfig(), jsc); HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline(); TableFileSystemView fsView = table.getFileSystemView(); @@ -376,7 +374,7 @@ public class TestCleaner extends TestHoodieClientBase { assertNoWriteErrors(statuses); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc); HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1); Set acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); @@ -412,7 +410,7 @@ public class TestCleaner extends TestHoodieClientBase { * * @param config HoodieWriteConfig */ - private List runCleaner(HoodieWriteConfig config) { + private List runCleaner(HoodieWriteConfig config) throws IOException { return runCleaner(config, false); } @@ -421,9 +419,8 @@ public class TestCleaner extends TestHoodieClientBase { * * @param config HoodieWriteConfig */ - private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) { - HoodieCleanClient writeClient = getHoodieCleanClient(config); - + private List runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException { + HoodieWriteClient writeClient = getHoodieWriteClient(config); String cleanInstantTs = getNextInstant(); HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs); @@ -432,18 +429,16 @@ public class TestCleaner extends TestHoodieClientBase { } if (simulateRetryFailure) { - metaClient.reloadActiveTimeline() - .revertToInflight(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs)); - final HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - HoodieCleanMetadata cleanMetadata2 = writeClient.runClean(table, - HoodieTimeline.getCleanInflightInstant(cleanInstantTs)); + HoodieInstant completedCleanInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, cleanInstantTs); + metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant); + HoodieCleanMetadata cleanMetadata2 = writeClient.clean(getNextInstant()); Assert.assertEquals(cleanMetadata1.getEarliestCommitToRetain(), cleanMetadata2.getEarliestCommitToRetain()); Assert.assertEquals(new Integer(0), cleanMetadata2.getTotalFilesDeleted()); - Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), - cleanMetadata2.getPartitionMetadata().keySet()); + Assert.assertEquals(cleanMetadata1.getPartitionMetadata().keySet(), cleanMetadata2.getPartitionMetadata().keySet()); + final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant); cleanMetadata1.getPartitionMetadata().keySet().forEach(k -> { HoodieCleanPartitionMetadata p1 = cleanMetadata1.getPartitionMetadata().get(k); - HoodieCleanPartitionMetadata p2 = cleanMetadata2.getPartitionMetadata().get(k); + HoodieCleanPartitionMetadata p2 = retriedCleanMetadata.getPartitionMetadata().get(k); Assert.assertEquals(p1.getDeletePathPatterns(), p2.getDeletePathPatterns()); Assert.assertEquals(p1.getSuccessDeleteFiles(), p2.getFailedDeleteFiles()); Assert.assertEquals(p1.getPartitionPath(), p2.getPartitionPath()); @@ -636,8 +631,11 @@ public class TestCleaner extends TestHoodieClientBase { Collections.singletonList(fileName2))); newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2)); - HoodieCleanMetadata metadata = - CleanerUtils.convertCleanMetadata(metaClient, instantTime, Option.of(0L), Arrays.asList(cleanStat1, cleanStat2)); + HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata( + instantTime, + Option.of(0L), + Arrays.asList(cleanStat1, cleanStat2) + ); metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1); // NOw upgrade and check @@ -896,7 +894,7 @@ public class TestCleaner extends TestHoodieClientBase { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000")); table.getActiveTimeline().transitionRequestedToInflight( @@ -988,7 +986,7 @@ public class TestCleaner extends TestHoodieClientBase { * Test clean previous corrupted cleanFiles. */ @Test - public void testCleanPreviousCorruptedCleanFiles() { + public void testCleanPreviousCorruptedCleanFiles() throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder() .withPath(basePath).withAssumeDatePartitioning(true) @@ -1042,7 +1040,7 @@ public class TestCleaner extends TestHoodieClientBase { if (j == i && j <= maxNumFileIdsForCompaction) { expFileIdToPendingCompaction.put(fileId, compactionInstants[j]); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); FileSlice slice = table.getSliceView().getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) .filter(fs -> fs.getFileId().equals(fileId)).findFirst().get(); @@ -1084,7 +1082,7 @@ public class TestCleaner extends TestHoodieClientBase { // Test for safety final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + final HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc); expFileIdToPendingCompaction.forEach((fileId, value) -> { String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index f670b868f..11e94e474 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -98,7 +98,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { String instantTime = HoodieTestUtils.makeNewCommitTime(); HoodieWriteConfig config = makeHoodieClientConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); Pair newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> { HoodieRecord record = mock(HoodieRecord.class); @@ -132,7 +132,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); String partitionPath = "/2016/01/31"; - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); // Get some records belong to the same partition (2016/01/31) String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," @@ -207,7 +207,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { Thread.sleep(1000); String newCommitTime = HoodieTestUtils.makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc); + final HoodieCopyOnWriteTable newTable = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); List statuses = jsc.parallelize(Arrays.asList(1)).map(x -> { return newTable.handleUpdate(newCommitTime, updatedRecord1.getPartitionPath(), updatedRecord1.getCurrentLocation().getFileId(), updatedRecords.iterator()); @@ -274,7 +274,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); // Get some records belong to the same partition (2016/01/31) String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," @@ -309,8 +309,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { public void testInsertRecords() throws Exception { HoodieWriteConfig config = makeHoodieClientConfig(); String instantTime = HoodieTestUtils.makeNewCommitTime(); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); // Case 1: // 10 records for partition 1, 1 record for partition 2. @@ -363,7 +363,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { .limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build(); String instantTime = HoodieTestUtils.makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); List records = new ArrayList<>(); // Approx 1150 records are written for block size of 64KB @@ -400,7 +400,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { HoodieClientTestUtils.fakeCommitFile(basePath, "001"); HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); List insertRecords = dataGenerator.generateInserts("001", numInserts); @@ -472,7 +472,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { HoodieWriteConfig config = makeHoodieClientConfigBuilder() .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); metaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); + final HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc); String instantTime = "000"; // Perform inserts of 100 records to test CreateHandle and BufferedExecutor final List inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100); @@ -483,9 +483,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness { WriteStatus writeStatus = ws.get(0).get(0); String fileId = writeStatus.getFileId(); metaClient.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close(); - final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc); - final List updates = - dataGen.generateUpdatesWithHoodieAvroPayload(instantTime, inserts); + final List updates = dataGen.generateUpdatesWithHoodieAvroPayload(instantTime, inserts); String partitionPath = updates.get(0).getPartitionPath(); long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count(); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index b48ad3f16..2c2722c02 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -120,7 +120,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -228,7 +228,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -337,7 +337,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.rollback(newCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); @@ -369,7 +369,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -450,7 +450,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { .filter(file -> file.getPath().getName().contains(commitTime2)).count(), 0); metaClient = HoodieTableMetaClient.reload(metaClient); - hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + hoodieTable = HoodieTable.create(metaClient, cfg, jsc); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); @@ -516,7 +516,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -679,7 +679,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); @@ -769,7 +769,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // Verify that all data file has one log file metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state ((SyncableFileSystemView) (table.getSliceView())).reset(); @@ -793,7 +793,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { // Verify that recently written compacted data file has no log file metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, jsc); HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); assertTrue("Compaction commit should be > than last insert", HoodieTimeline @@ -826,7 +826,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.commit(newCommitTime, statuses); HoodieTable table = - HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); + HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -902,7 +902,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.rollback(newCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -939,7 +939,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { statuses.collect(); HoodieTable table = - HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); + HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); SliceView tableRTFileSystemView = table.getSliceView(); long numLogFiles = 0; @@ -960,7 +960,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); // Trigger a rollback of compaction writeClient.rollback(newCommitTime); - table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); + table = HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); tableRTFileSystemView = table.getSliceView(); ((SyncableFileSystemView) tableRTFileSystemView).reset(); Option lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant(); @@ -981,7 +981,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getWriteClient(cfg);) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable table = HoodieTable.create(metaClient, cfg, jsc); // Create a commit without rolling stats in metadata to test backwards compatibility HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); @@ -1002,10 +1002,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue("Commit should succeed", client.commit(instantTime, statuses)); // Read from commit file - table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + table = HoodieTable.create(cfg, jsc); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes( - table.getActiveTimeline() - .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), + table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes( metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), @@ -1027,7 +1026,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue("Commit should succeed", client.commit(instantTime, statuses)); // Read from commit file - table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + table = HoodieTable.create(cfg, jsc); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1051,7 +1050,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.rollback(instantTime); // Read from commit file - table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + table = HoodieTable.create(cfg, jsc); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1078,7 +1077,6 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { */ @Test public void testRollingStatsWithSmallFileHandling() throws Exception { - HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); try (HoodieWriteClient client = getWriteClient(cfg);) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); @@ -1095,7 +1093,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue("Commit should succeed", client.commit(instantTime, statuses)); // Read from commit file - HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable table = HoodieTable.create(cfg, jsc); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1124,7 +1122,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue("Commit should succeed", client.commit(instantTime, statuses)); // Read from commit file - table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + table = HoodieTable.create(cfg, jsc); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1155,7 +1153,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { client.commitCompaction(instantTime, statuses, Option.empty()); // Read from commit file - table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + table = HoodieTable.create(cfg, jsc); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(), @@ -1183,7 +1181,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertTrue("Commit should succeed", client.commit(instantTime, statuses)); // Read from commit file - table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + table = HoodieTable.create(cfg, jsc); metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), @@ -1229,7 +1227,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness { assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, jsc); Option deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java index 2c706bae0..b5151b303 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java @@ -119,7 +119,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Reload and rollback inflight compaction metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc); // hoodieTable.rollback(jsc, // new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java index b335c17e8..d77d3f83f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java @@ -100,7 +100,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { @Test(expected = HoodieNotSupportedException.class) public void testCompactionOnCopyOnWriteFail() throws Exception { metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); + HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc); String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); } @@ -109,7 +109,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { public void testCompactionEmpty() throws Exception { HoodieWriteConfig config = getConfig(); metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); try (HoodieWriteClient writeClient = getWriteClient(config);) { String newCommitTime = writeClient.startCommit(); @@ -138,7 +138,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { // Update all the 100 records metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTable table = HoodieTable.create(metaClient, config, jsc); newCommitTime = "101"; writeClient.startCommitWithTime(newCommitTime); @@ -154,7 +154,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { // Verify that all data file has one log file metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, jsc); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); @@ -165,7 +165,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness { // Do a compaction metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + table = HoodieTable.create(metaClient, config, jsc); String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); JavaRDD result = diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index 3d76e503a..96ac4ca41 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -39,8 +39,9 @@ public class CleanerUtils { public static final Integer CLEAN_METADATA_VERSION_2 = CleanV2MigrationHandler.VERSION; public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2; - public static HoodieCleanMetadata convertCleanMetadata(HoodieTableMetaClient metaClient, - String startCleanTime, Option durationInMs, List cleanStats) { + public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, + Option durationInMs, + List cleanStats) { Map partitionMetadataMap = new HashMap<>(); int totalDeleted = 0; String earliestCommitToRetain = null; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java index 4e8ff7489..86b2303fd 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java @@ -328,7 +328,7 @@ public class HoodieTestUtils { // Create the clean metadata HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata(metaClient, instantTime, Option.of(0L), Collections.singletonList(cleanStats)); + CleanerUtils.convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); // Write empty clean metadata os.write(TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata).get()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index 7a273a263..e6c45a248 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -414,8 +414,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { HoodieInstant cleanInflightInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant); metaClient.getActiveTimeline().createNewInstant(cleanInflightInstant); - HoodieCleanMetadata cleanMetadata = CleanerUtils - .convertCleanMetadata(metaClient, cleanInstant, Option.empty(), cleanStats); + HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata(cleanInstant, Option.empty(), cleanStats); metaClient.getActiveTimeline().saveAsComplete(cleanInflightInstant, TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata)); } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index a9e7389d6..f0891e3eb 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -54,7 +54,7 @@ class IncrementalRelation(val sqlContext: SQLContext, throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables") } // TODO : Figure out a valid HoodieWriteConfig - private val hoodieTable = HoodieTable.getHoodieTable(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(), + private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(), sqlContext.sparkContext) val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() if (commitTimeline.empty()) {