diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index 8dac00573..416d2905c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -17,11 +17,13 @@ package com.uber.hoodie; import com.google.common.base.Optional; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.util.CompactionUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; @@ -32,6 +34,8 @@ import java.io.Serializable; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -183,4 +187,17 @@ public class HoodieReadClient implements Serializ throws HoodieIndexException { return index.tagLocation(hoodieRecords, jsc, hoodieTable); } + + /** + * Return all pending compactions with instant time for clients to decide what to compact next. + * @return + */ + public List> getPendingCompactions() { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), + hoodieTable.getMetaClient().getBasePath(), true); + return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream() + .map(instantWorkloadPair -> + Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue())) + .collect(Collectors.toList()); + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 8ff78619e..84bceac82 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -19,8 +19,10 @@ package com.uber.hoodie; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.avro.model.HoodieRollbackMetadata; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; @@ -97,6 +99,7 @@ public class HoodieWriteClient implements Seriali private final transient HoodieMetrics metrics; private final transient HoodieIndex index; private transient Timer.Context writeContext = null; + private transient Timer.Context compactionTimer; /** * @param jsc @@ -478,13 +481,13 @@ public class HoodieWriteClient implements Seriali * Commit changes performed at the given commitTime marker */ public boolean commit(String commitTime, JavaRDD writeStatuses, - Optional> extraMetadata) { + Optional> extraMetadata) { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true); return commit(commitTime, writeStatuses, extraMetadata, metaClient.getCommitActionType()); } private boolean commit(String commitTime, JavaRDD writeStatuses, - Optional> extraMetadata, String actionType) { + Optional> extraMetadata, String actionType) { logger.info("Commiting " + commitTime); // Create a Hoodie table which encapsulated the commits and files visible @@ -525,7 +528,7 @@ public class HoodieWriteClient implements Seriali // Do a inline compaction if enabled if (config.isInlineCompaction()) { metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); - forceCompact(); + forceCompact(extraMetadata); } else { metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false"); } @@ -685,7 +688,7 @@ public class HoodieWriteClient implements Seriali * Delete a compaction request that is pending. * * NOTE - This is an Admin operation. - * With async compaction, this is expected to be called with async compaction and ingestion shutdown. + * With async compaction, this is expected to be called with async compaction and write shutdown. * Otherwise, async compactor could fail with errors * * @param compactionTime - delete the compaction time @@ -729,7 +732,7 @@ public class HoodieWriteClient implements Seriali new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - // Rollback to savepoint is expected to be a manual operation and no concurrent ingestion or compaction is expected + // Rollback to savepoint is expected to be a manual operation and no concurrent write or compaction is expected // to be running. Rollback to savepoint also removes any pending compaction actions that are generated after // savepoint time. Allowing pending compaction to be retained is not safe as those workload could be referencing // file-slices that will be rolled-back as part of this operation @@ -811,8 +814,12 @@ public class HoodieWriteClient implements Seriali // Make sure only the last n commits are being rolled back // If there is a commit in-between or after that is not rolled back, then abort - String lastCommit = commitsToRollback.get(commitsToRollback.size() - 1); - if (!commitTimeline.empty() && !commitTimeline + String lastCommit = null; + if (!commitsToRollback.isEmpty()) { + lastCommit = commitsToRollback.get(commitsToRollback.size() - 1); + } + + if ((lastCommit != null) && !commitTimeline.empty() && !commitTimeline .findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) { throw new HoodieRollbackException( "Found commits after time :" + lastCommit + ", please rollback greater commits first"); @@ -820,7 +827,7 @@ public class HoodieWriteClient implements Seriali List inflights = inflightCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); - if (!inflights.isEmpty() && inflights.indexOf(lastCommit) != inflights.size() - 1) { + if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) { throw new HoodieRollbackException("Found in-flight commits after time :" + lastCommit + ", please rollback greater commits first"); } @@ -940,141 +947,109 @@ public class HoodieWriteClient implements Seriali return commitTime; } - public void startCommitWithTime(String commitTime) { + public void startCommitWithTime(String instantTime) { if (rollbackInFlight) { // Only rollback inflight commit/delta-commits. Do not touch compaction commits rollbackInflightCommits(); } - logger.info("Generate a new commit time " + commitTime); - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + logger.info("Generate a new instant time " + instantTime); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath()); + // if there are pending compactions, their instantTime must not be greater than that of this instant time + metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> { + Preconditions.checkArgument( + HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), + "Latest pending compaction instant time must be earlier " + + "than this instant time. Latest Compaction :" + latestPending + ", Ingesting at " + instantTime); + }); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitActionType = table.getMetaClient().getCommitActionType(); - activeTimeline.createInflight(new HoodieInstant(true, commitActionType, commitTime)); + activeTimeline.createInflight(new HoodieInstant(true, commitActionType, instantTime)); + } + + + /** + * Schedules a new compaction instant + * @param extraMetadata + * @return + */ + public Optional scheduleCompaction(Optional> extraMetadata) throws IOException { + String instantTime = HoodieActiveTimeline.createNewCommitTime(); + logger.info("Generate a new instant time " + instantTime); + boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata); + return notEmpty ? Optional.of(instantTime) : Optional.empty(); } /** - * Provides a new commit time for a compaction (commit) operation + * Schedules a new compaction instant with passed-in instant time + * @param instantTime Compaction Instant Time + * @param extraMetadata Extra Metadata to be stored */ - public String startCompaction() { - String commitTime = HoodieActiveTimeline.createNewCommitTime(); - logger.info("Generate a new commit time " + commitTime); - startCompactionWithTime(commitTime); - return commitTime; + public boolean scheduleCompactionAtInstant(String instantTime, Optional> extraMetadata) + throws IOException { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), + config.getBasePath(), true); + // if there are inflight writes, their instantTime must not be less than that of compaction instant time + metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> { + Preconditions.checkArgument( + HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER), + "Earliest write inflight instant time must be later " + + "than compaction time. Earliest :" + earliestInflight + ", Compaction scheduled at " + instantTime); + }); + // Committed and pending compaction instants should have strictly lower timestamps + List conflictingInstants = + metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> + HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime, + HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList()); + Preconditions.checkArgument(conflictingInstants.isEmpty(), + "Following instants have timestamps >= compactionInstant. Instants :" + + conflictingInstants); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieCompactionPlan workload = table.scheduleCompaction(jsc, instantTime); + if (workload != null && (workload.getOperations() != null) && (!workload.getOperations().isEmpty())) { + extraMetadata.ifPresent(workload::setExtraMetadata); + HoodieInstant compactionInstant = + new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); + metaClient.getActiveTimeline().saveToCompactionRequested(compactionInstant, + AvroUtils.serializeCompactionPlan(workload)); + return true; + } + return false; } /** - * Since MOR tableType default to {@link HoodieTimeline#DELTA_COMMIT_ACTION}, we need to - * explicitly set to {@link HoodieTimeline#COMMIT_ACTION} for compaction + * Performs Compaction for the workload stored in instant-time + * @param compactionInstantTime Compaction Instant Time + * @return + * @throws IOException */ - public void startCompactionWithTime(String commitTime) { - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - String commitActionType = HoodieTimeline.COMMIT_ACTION; - activeTimeline.createInflight(new HoodieInstant(true, commitActionType, commitTime)); + public JavaRDD compact(String compactionInstantTime) throws IOException { + return compact(compactionInstantTime, config.shouldAutoCommit()); } /** - * Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed - * asynchronously. Please always use this serially before or after an insert/upsert action. + * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file. */ - public JavaRDD compact(String commitTime) throws IOException { - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - // TODO : Fix table.getActionType for MOR table type to return different actions based on delta or compaction - writeContext = metrics.getCommitCtx(); - JavaRDD statuses = table.compact(jsc, commitTime); - // Trigger the insert and collect statuses - statuses = statuses.persist(config.getWriteStatusStorageLevel()); - String actionType = HoodieActiveTimeline.COMMIT_ACTION; - commitOnAutoCommit(commitTime, statuses, actionType); - return statuses; - } - - /** - * Commit a compaction operation - */ - public void commitCompaction(String commitTime, JavaRDD writeStatuses, - Optional> extraMetadata) { - String commitCompactionActionType = HoodieActiveTimeline.COMMIT_ACTION; - commit(commitTime, writeStatuses, extraMetadata, commitCompactionActionType); - } - - /** - * Commit a compaction operation - */ - public void commitCompaction(String commitTime, JavaRDD writeStatuses) { - String commitCompactionActionType = HoodieActiveTimeline.COMMIT_ACTION; - commit(commitTime, writeStatuses, Optional.empty(), commitCompactionActionType); - } - - /** - * Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed - * asynchronously. Please always use this serially before or after an insert/upsert action. - */ - private void forceCompact(String compactionCommitTime) throws IOException { - // Create a Hoodie table which encapsulated the commits and files visible + public void commitCompaction(String compactionInstantTime, JavaRDD writeStatuses, + Optional> extraMetadata) throws IOException { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - // TODO : Fix table.getActionType for MOR table type to return different actions based on delta or compaction and - // then use getTableAndInitCtx - Timer.Context writeContext = metrics.getCommitCtx(); - JavaRDD compactedStatuses = table.compact(jsc, compactionCommitTime); - if (!compactedStatuses.isEmpty()) { - HoodieCommitMetadata metadata = commitForceCompaction(compactedStatuses, metaClient, compactionCommitTime); - long durationInMs = metrics.getDurationInMs(writeContext.stop()); - try { - metrics - .updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), - durationInMs, metadata, HoodieActiveTimeline.COMMIT_ACTION); - } catch (ParseException e) { - throw new HoodieCommitException( - "Commit time is not of valid format.Failed to commit " + config.getBasePath() - + " at time " + compactionCommitTime, e); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( + timeline.getInstantAuxiliaryDetails(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); + // Merge extra meta-data passed by user with the one already in inflight compaction + Optional> mergedMetaData = extraMetadata.map(m -> { + Map merged = new HashMap<>(); + Map extraMetaDataFromInstantFile = compactionPlan.getExtraMetadata(); + if (extraMetaDataFromInstantFile != null) { + merged.putAll(extraMetaDataFromInstantFile); } - logger.info("Compacted successfully on commit " + compactionCommitTime); - } else { - logger.info("Compaction did not run for commit " + compactionCommitTime); - } - } - - /** - * Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed - * asynchronously. Please always use this serially before or after an insert/upsert action. - */ - private String forceCompact() throws IOException { - String compactionCommitTime = startCompaction(); - forceCompact(compactionCommitTime); - return compactionCommitTime; - } - - private HoodieCommitMetadata commitForceCompaction(JavaRDD writeStatuses, - HoodieTableMetaClient metaClient, String compactionCommitTime) { - List updateStatusMap = writeStatuses.map(writeStatus -> writeStatus.getStat()) - .collect(); - - HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); - for (HoodieWriteStat stat : updateStatusMap) { - metadata.addWriteStat(stat.getPartitionPath(), stat); - } - - logger.info("Compaction finished with result " + metadata); - - logger.info("Committing Compaction " + compactionCommitTime); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - - try { - activeTimeline.saveAsComplete( - new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, compactionCommitTime), - Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } catch (IOException e) { - throw new HoodieCompactionException( - "Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e); - } - return metadata; + // Overwrite/Merge with the user-passed meta-data + merged.putAll(m); + return Optional.of(merged); + }).orElseGet(() -> Optional.ofNullable(compactionPlan.getExtraMetadata())); + commitCompaction(writeStatuses, table, compactionInstantTime, true, mergedMetaData); } /** @@ -1127,4 +1102,158 @@ public class HoodieWriteClient implements Seriali } return table; } + + /** + * Compaction specific private methods + */ + + /** + * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time + * @param compactionInstantTime Compaction Instant Time + * @return + * @throws IOException + */ + private JavaRDD compact(String compactionInstantTime, boolean autoCommit) throws IOException { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), + config.getBasePath(), true); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + //inflight compaction - Needs to rollback first deleting new parquet files before we run compaction. + rollbackInflightCompaction(inflightInstant, table); + // refresh table + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true); + table = HoodieTable.getHoodieTable(metaClient, config, jsc); + pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); + } + + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + if (pendingCompactionTimeline.containsInstant(instant)) { + return runCompaction(instant, metaClient.getActiveTimeline(), autoCommit); + } else { + throw new IllegalStateException("No Compaction request available at " + compactionInstantTime + + " to run compaction"); + } + } + + /** + * Perform compaction operations as specified in the compaction commit file + * + * @param compactionInstant Compacton Instant time + * @param activeTimeline Active Timeline + * @param autoCommit Commit after compaction + * @return RDD of Write Status + */ + private JavaRDD runCompaction( + HoodieInstant compactionInstant, HoodieActiveTimeline activeTimeline, boolean autoCommit) throws IOException { + HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( + activeTimeline.getInstantAuxiliaryDetails(compactionInstant).get()); + // Mark instant as compaction inflight + activeTimeline.transitionCompactionRequestedToInflight(compactionInstant); + compactionTimer = metrics.getCompactionCtx(); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), + config.getBasePath(), true); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + JavaRDD statuses = table.compact(jsc, compactionInstant.getTimestamp(), compactionPlan); + // Force compaction action + statuses.persist(config.getWriteStatusStorageLevel()); + // pass extra-metada so that it gets stored in commit file automatically + commitCompaction(statuses, table, compactionInstant.getTimestamp(), autoCommit, + Optional.ofNullable(compactionPlan.getExtraMetadata())); + return statuses; + } + + /** + * Commit Compaction and track metrics + * + * @param compactedStatuses Compaction Write status + * @param table Hoodie Table + * @param compactionCommitTime Compaction Commit Time + * @param autoCommit Auto Commit + * @param extraMetadata Extra Metadata to store + */ + protected void commitCompaction(JavaRDD compactedStatuses, HoodieTable table, + String compactionCommitTime, boolean autoCommit, Optional> extraMetadata) { + if (autoCommit) { + HoodieCommitMetadata metadata = + doCompactionCommit(compactedStatuses, table.getMetaClient(), compactionCommitTime, extraMetadata); + if (compactionTimer != null) { + long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); + try { + metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), + durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException( + "Commit time is not of valid format.Failed to commit compaction " + config.getBasePath() + + " at time " + compactionCommitTime, e); + } + } + logger.info("Compacted successfully on commit " + compactionCommitTime); + } else { + logger.info("Compaction did not run for commit " + compactionCommitTime); + } + } + + /** + * Rollback partial compactions + * @param inflightInstant Inflight Compaction Instant + * @param table Hoodie Table + */ + private void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException { + table.rollback(jsc, ImmutableList.copyOf(new String[] { inflightInstant.getTimestamp() })); + // Revert instant state file + table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); + } + + private HoodieCommitMetadata doCompactionCommit(JavaRDD writeStatuses, + HoodieTableMetaClient metaClient, String compactionCommitTime, Optional> extraMetadata) { + List updateStatusMap = writeStatuses.map(writeStatus -> writeStatus.getStat()) + .collect(); + + HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); + for (HoodieWriteStat stat : updateStatusMap) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + + // Copy extraMetadata + extraMetadata.ifPresent(m -> { + m.entrySet().stream().forEach(e -> { + metadata.addMetadata(e.getKey(), e.getValue()); + }); + }); + + logger.info("Compaction finished with result " + metadata); + + logger.info("Committing Compaction " + compactionCommitTime); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + try { + activeTimeline.transitionCompactionInflightToComplete( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime), + Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException e) { + throw new HoodieCompactionException( + "Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e); + } + return metadata; + } + + /** + * Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed + * asynchronously. Please always use this serially before or after an insert/upsert action. + */ + private Optional forceCompact(Optional> extraMetadata) throws IOException { + Optional compactionInstantTimeOpt = scheduleCompaction(extraMetadata); + compactionInstantTimeOpt.ifPresent(compactionInstantTime -> { + try { + compact(compactionInstantTime); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }); + return compactionInstantTimeOpt; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionInstantWithPlan.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionInstantWithPlan.java deleted file mode 100644 index ac0b5b331..000000000 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionInstantWithPlan.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.io.compact; - -import com.uber.hoodie.avro.model.HoodieCompactionPlan; - -/** - * Contains Hoodie Compaction instant along with workload - */ -public class HoodieCompactionInstantWithPlan { - - private final String compactionInstantTime; - private final HoodieCompactionPlan compactionPlan; - - public HoodieCompactionInstantWithPlan(String compactionInstantTime, - HoodieCompactionPlan compactionPlan) { - this.compactionInstantTime = compactionInstantTime; - this.compactionPlan = compactionPlan; - } - - public String getCompactionInstantTime() { - return compactionInstantTime; - } - - public HoodieCompactionPlan getCompactionPlan() { - return compactionPlan; - } -} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index ad1c18e2f..27f04c426 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -22,6 +22,7 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; import java.io.Serializable; +import java.util.Set; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -30,15 +31,6 @@ import org.apache.spark.api.java.JavaSparkContext; */ public interface HoodieCompactor extends Serializable { - /** - * Compact the delta files with the data files - * - * @deprecated : Will be removed in next PR - */ - @Deprecated - JavaRDD compact(JavaSparkContext jsc, final HoodieWriteConfig config, - HoodieTable hoodieTable, String compactionCommitTime) throws Exception; - /** * Generate a new compaction plan for scheduling * @@ -50,7 +42,8 @@ public interface HoodieCompactor extends Serializable { * @throws IOException when encountering errors */ HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, - HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime) + HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, + Set fileIdsWithPendingCompactions) throws IOException; /** @@ -58,5 +51,5 @@ public interface HoodieCompactor extends Serializable { */ JavaRDD compact(JavaSparkContext jsc, HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, HoodieWriteConfig config, - String compactionCommitTime) throws IOException; + String compactionInstantTime) throws IOException; } \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 90a24b6d9..ef97abff8 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -45,6 +45,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.avro.Schema; @@ -73,31 +74,22 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // Accumulator to keep track of total log file slices for a dataset private AccumulatorV2 totalFileSlices; - @Override - public JavaRDD compact(JavaSparkContext jsc, HoodieWriteConfig config, - HoodieTable hoodieTable, String compactionCommitTime) throws IOException { - - HoodieCompactionPlan compactionPlan = generateCompactionPlan(jsc, hoodieTable, config, - compactionCommitTime); - List operations = compactionPlan.getOperations(); - if ((operations == null) || (operations.isEmpty())) { - return jsc.emptyRDD(); - } - return compact(jsc, compactionPlan, hoodieTable, config, compactionCommitTime); - } - @Override public JavaRDD compact(JavaSparkContext jsc, HoodieCompactionPlan compactionPlan, HoodieTable hoodieTable, HoodieWriteConfig config, - String compactionCommitTime) throws IOException { + String compactionInstantTime) throws IOException { + if (compactionPlan == null || (compactionPlan.getOperations() == null) + || (compactionPlan.getOperations().isEmpty())) { + return jsc.emptyRDD(); + } HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); // Compacting is very similar to applying updates to existing file HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); - List operations = compactionPlan.getOperations().stream() - .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); + List operations = compactionPlan.getOperations().stream().map( + CompactionOperation::convertFromAvroRecordInstance).collect(toList()); log.info("Compactor compacting " + operations + " files"); return jsc.parallelize(operations, operations.size()) - .map(s -> compact(table, metaClient, config, s, compactionCommitTime)) + .map(s -> compact(table, metaClient, config, s, compactionInstantTime)) .flatMap(writeStatusesItr -> writeStatusesItr.iterator()); } @@ -164,8 +156,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { @Override public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, - HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime) - throws IOException { + HoodieTable hoodieTable, HoodieWriteConfig config, String compactionCommitTime, + Set fileIdsWithPendingCompactions) throws IOException { totalLogFiles = new LongAccumulator(); totalFileSlices = new LongAccumulator(); @@ -191,7 +183,9 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction) partitionPath -> fileSystemView - .getLatestFileSlices(partitionPath).map( + .getLatestFileSlices(partitionPath) + .filter(slice -> !fileIdsWithPendingCompactions.contains(slice.getFileId())) + .map( s -> { List logFiles = s.getLogFiles().sorted(HoodieLogFile .getBaseInstantAndLogVersionComparator().reversed()).collect(Collectors.toList()); @@ -211,13 +205,16 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { log.info("Total number of latest files slices " + totalFileSlices.value()); log.info("Total number of log files " + totalLogFiles.value()); log.info("Total number of file slices " + totalFileSlices.value()); - // Filter the compactions with the passed in filter. This lets us choose most effective // compactions only - // TODO: In subsequent PRs, pending Compaction plans will be wired in. Strategy can look at pending compaction - // plans to schedule next compaction plan HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); + Preconditions.checkArgument(compactionPlan.getOperations().stream() + .filter(op -> fileIdsWithPendingCompactions.contains(op.getFileId())).count() == 0, + "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " + + "Please fix your strategy implementation." + + "FileIdsWithPendingCompactions :" + fileIdsWithPendingCompactions + + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java index 901bd236e..ead65d70d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java @@ -38,6 +38,7 @@ public class HoodieMetrics { public String commitTimerName = null; public String deltaCommitTimerName = null; public String finalizeTimerName = null; + public String compactionTimerName = null; private HoodieWriteConfig config = null; private String tableName = null; private Timer rollbackTimer = null; @@ -45,6 +46,7 @@ public class HoodieMetrics { private Timer commitTimer = null; private Timer deltaCommitTimer = null; private Timer finalizeTimer = null; + private Timer compactionTimer = null; public HoodieMetrics(HoodieWriteConfig config, String tableName) { this.config = config; @@ -56,6 +58,7 @@ public class HoodieMetrics { this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION); this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION); this.finalizeTimerName = getMetricsName("timer", "finalize"); + this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION); } } @@ -70,6 +73,13 @@ public class HoodieMetrics { return rollbackTimer == null ? null : rollbackTimer.time(); } + public Timer.Context getCompactionCtx() { + if (config.isMetricsOn() && compactionTimer == null) { + compactionTimer = createTimer(commitTimerName); + } + return compactionTimer == null ? null : compactionTimer.time(); + } + public Timer.Context getCleanCtx() { if (config.isMetricsOn() && cleanTimer == null) { cleanTimer = createTimer(cleanTimerName); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index b05ba602f..de9d84000 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -167,11 +167,6 @@ public class HoodieCopyOnWriteTable extends Hoodi throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); } - @Override - public JavaRDD compact(JavaSparkContext jsc, String commitTime) { - throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); - } - @Override public JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime, HoodieCompactionPlan compactionPlan) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index ef2ed7f0e..3ead5904c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -36,6 +36,7 @@ import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBl import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; @@ -51,6 +52,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -148,22 +150,14 @@ public class HoodieMergeOnReadTable extends logger.info("Compacting merge on read table " + config.getBasePath()); HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { - return compactor.generateCompactionPlan(jsc, this, config, instantTime); + return compactor.generateCompactionPlan(jsc, this, config, instantTime, + new HashSet<>(((HoodieTableFileSystemView)getRTFileSystemView()) + .getFileIdToPendingCompaction().keySet())); } catch (IOException e) { throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); } } - @Override - public JavaRDD compact(JavaSparkContext jsc, String compactionCommitTime) { - HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); - try { - return compactor.compact(jsc, config, this, compactionCommitTime); - } catch (IOException e) { - throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); - } - } - @Override public JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime, HoodieCompactionPlan compactionPlan) { @@ -185,7 +179,7 @@ public class HoodieMergeOnReadTable extends } Map commitsAndCompactions = this.getActiveTimeline() .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, - HoodieActiveTimeline.DELTA_COMMIT_ACTION)).getInstants() + HoodieActiveTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION)).getInstants() .filter(i -> commits.contains(i.getTimestamp())) .collect(Collectors.toMap(i -> i.getTimestamp(), i -> i)); @@ -219,6 +213,7 @@ public class HoodieMergeOnReadTable extends switch (instant.getAction()) { case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: try { Map results = super .deleteCleanedFiles(partitionPath, Arrays.asList(commit)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 3e83ca1a3..4ef33f9b1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -223,13 +223,6 @@ public abstract class HoodieTable implements Seri /** * Run Compaction on the table. Compaction arranges the data so that it is optimized for data * access - * @deprecated Will be replaced with newer APIs - */ - @Deprecated - public abstract JavaRDD compact(JavaSparkContext jsc, String commitTime); - - /** - * Run Compaction on the table. Compaction arranges the data so that it is optimized for data access * * @param jsc Spark Context * @param compactionInstantTime Instant Time diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java new file mode 100644 index 000000000..f1a1b8ea8 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java @@ -0,0 +1,508 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie; + +import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.uber.hoodie.avro.model.HoodieCompactionOperation; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.common.HoodieClientTestUtils; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.FileSlice; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; +import com.uber.hoodie.common.util.AvroUtils; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieStorageConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.table.HoodieTable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.spark.api.java.JavaRDD; +import org.junit.Test; + +/** + * Test Cases for Async Compaction and Ingestion interaction + */ +public class TestAsyncCompaction extends TestHoodieClientBase { + + private HoodieWriteConfig getConfig(Boolean autoCommit) { + return getConfigBuilder(autoCommit).build(); + } + + private HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withAutoCommit(autoCommit).withAssumeDatePartitioning(true).withCompactionConfig( + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024).withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build()) + .forTable("test-trip-table") + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); + } + + @Test + public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { + // Rollback inflight ingestion when there is pending compaction + HoodieWriteConfig cfg = getConfig(false); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String compactionInstantTime = "005"; + String inflightInstantTime = "006"; + String nextInflightInstantTime = "007"; + + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), + records, cfg, true, new ArrayList<>()); + + // Schedule compaction but do not run them + scheduleCompaction(compactionInstantTime, client, cfg); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true); + + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieInstant pendingCompactionInstant = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); + assertTrue("Pending Compaction instant has expected instant time", + pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + HoodieInstant inflightInstant = + metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); + assertTrue("inflight instant has expected instant time", + inflightInstant.getTimestamp().equals(inflightInstantTime)); + + //This should rollback + client.startCommitWithTime(nextInflightInstantTime); + + //Validate + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + inflightInstant = + metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); + assertTrue("inflight instant has expected instant time", + inflightInstant.getTimestamp().equals(nextInflightInstantTime)); + assertTrue("Expect only one inflight instant", + metaClient.getActiveTimeline().filterInflightsExcludingCompaction().getInstants().count() == 1); + //Expect pending Compaction to be present + pendingCompactionInstant = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); + assertTrue("Pending Compaction instant has expected instant time", + pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + } + + @Test + public void testInflightCompaction() throws Exception { + // There is inflight compaction. Subsequent compaction run must work correctly + HoodieWriteConfig cfg = getConfig(true); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String compactionInstantTime = "005"; + String thirdInstantTime = "006"; + String fourthInstantTime = "007"; + + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), + records, cfg, true, new ArrayList<>()); + + // Schedule and mark compaction instant as inflight + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + scheduleCompaction(compactionInstantTime, client, cfg); + moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg); + + // Complete ingestions + runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), + records, cfg, false, Arrays.asList(compactionInstantTime)); + + // execute inflight compaction + executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true); + } + + @Test + public void testScheduleIngestionBeforePendingCompaction() throws Exception { + // Case: Failure case. Latest pending compaction instant time must be earlier than this instant time + HoodieWriteConfig cfg = getConfig(false); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String failedInstantTime = "005"; + String compactionInstantTime = "006"; + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), + records, cfg, true, new ArrayList<>()); + + // Schedule compaction but do not run them + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + scheduleCompaction(compactionInstantTime, client, cfg); + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieInstant pendingCompactionInstant = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); + assertTrue("Pending Compaction instant has expected instant time", + pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + + boolean gotException = false; + try { + runNextDeltaCommits(client, Arrays.asList(failedInstantTime), + records, cfg, false, Arrays.asList(compactionInstantTime)); + } catch (IllegalArgumentException iex) { + // Latest pending compaction instant time must be earlier than this instant time. Should fail here + gotException = true; + } + assertTrue("Latest pending compaction instant time must be earlier than this instant time", gotException); + } + + @Test + public void testScheduleCompactionAfterPendingIngestion() throws Exception { + // Case: Failure case. Earliest ingestion inflight instant time must be later than compaction time + + HoodieWriteConfig cfg = getConfig(false); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String inflightInstantTime = "005"; + String compactionInstantTime = "006"; + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), + records, cfg, true, new ArrayList<>()); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true); + + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieInstant inflightInstant = + metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get(); + assertTrue("inflight instant has expected instant time", + inflightInstant.getTimestamp().equals(inflightInstantTime)); + + boolean gotException = false; + try { + // Schedule compaction but do not run them + scheduleCompaction(compactionInstantTime, client, cfg); + } catch (IllegalArgumentException iex) { + // Earliest ingestion inflight instant time must be later than compaction time. Should fail here + gotException = true; + } + assertTrue("Earliest ingestion inflight instant time must be later than compaction time", gotException); + } + + @Test + public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception { + // Case: Failure case. Earliest ingestion inflight instant time must be later than compaction time + + HoodieWriteConfig cfg = getConfig(false); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String compactionInstantTime = "002"; + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), + records, cfg, true, new ArrayList<>()); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + boolean gotException = false; + try { + // Schedule compaction but do not run them + scheduleCompaction(compactionInstantTime, client, cfg); + } catch (IllegalArgumentException iex) { + gotException = true; + } + assertTrue("Compaction Instant to be scheduled cannot have older timestamp", gotException); + + // Schedule with timestamp same as that of committed instant + gotException = false; + String dupCompactionInstantTime = secondInstantTime; + try { + // Schedule compaction but do not run them + scheduleCompaction(dupCompactionInstantTime, client, cfg); + } catch (IllegalArgumentException iex) { + gotException = true; + } + assertTrue("Compaction Instant to be scheduled cannot have same timestamp as committed instant", + gotException); + + compactionInstantTime = "006"; + scheduleCompaction(compactionInstantTime, client, cfg); + gotException = false; + try { + // Schedule compaction with the same times as a pending compaction + scheduleCompaction(dupCompactionInstantTime, client, cfg); + } catch (IllegalArgumentException iex) { + gotException = true; + } + assertTrue("Compaction Instant to be scheduled cannot have same timestamp as a pending compaction", + gotException); + } + + @Test + public void testCompactionAfterTwoDeltaCommits() throws Exception { + // No Delta Commits after compaction request + HoodieWriteConfig cfg = getConfig(true); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String compactionInstantTime = "005"; + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), + records, cfg, true, new ArrayList<>()); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false); + } + + @Test + public void testInterleavedCompaction() throws Exception { + //Case: Two delta commits before and after compaction schedule + HoodieWriteConfig cfg = getConfig(true); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String compactionInstantTime = "005"; + String thirdInstantTime = "006"; + String fourthInstantTime = "007"; + + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), + records, cfg, true, new ArrayList<>()); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + scheduleCompaction(compactionInstantTime, client, cfg); + + runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime), + records, cfg, false, Arrays.asList(compactionInstantTime)); + executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true); + } + + /** + * HELPER METHODS FOR TESTING + **/ + + private void validateDeltaCommit(String latestDeltaCommit, + final Map> fileIdToCompactionOperation, + HoodieWriteConfig cfg) throws IOException { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + List fileSliceList = getCurrentLatestFileSlices(table, cfg); + fileSliceList.forEach(fileSlice -> { + Pair opPair = fileIdToCompactionOperation.get(fileSlice.getFileId()); + if (opPair != null) { + System.out.println("FileSlice :" + fileSlice); + assertTrue("Expect baseInstant to match compaction Instant", + fileSlice.getBaseInstantTime().equals(opPair.getKey())); + assertTrue("Expect atleast one log file to be present where the latest delta commit was written", + fileSlice.getLogFiles().count() > 0); + assertFalse("Expect no data-file to be present", fileSlice.getDataFile().isPresent()); + } else { + assertTrue("Expect baseInstant to be less than or equal to latestDeltaCommit", + fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0); + } + }); + } + + private List runNextDeltaCommits(HoodieWriteClient client, List deltaInstants, + List records, HoodieWriteConfig cfg, boolean insertFirst, + List expPendingCompactionInstants) throws Exception { + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + List> pendingCompactions = + CompactionUtils.getAllPendingCompactionPlans(metaClient); + List gotPendingCompactionInstants = + pendingCompactions.stream().map(pc -> pc.getKey().getTimestamp()).sorted().collect(Collectors.toList()); + assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants); + + Map> fileIdToCompactionOperation = + CompactionUtils.getAllPendingCompactionOperations(metaClient); + + if (insertFirst) { + // Use first instant for inserting records + String firstInstant = deltaInstants.get(0); + deltaInstants = deltaInstants.subList(1, deltaInstants.size()); + JavaRDD writeRecords = jsc.parallelize(records, 1); + client.startCommitWithTime(firstInstant); + JavaRDD statuses = client.upsert(writeRecords, firstInstant); + List statusList = statuses.collect(); + + if (!cfg.shouldAutoCommit()) { + client.commit(firstInstant, statuses); + } + TestHoodieClientBase.assertNoWriteErrors(statusList); + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + List dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg); + assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", + dataFilesToRead.stream().findAny().isPresent()); + validateDeltaCommit(firstInstant, fileIdToCompactionOperation, cfg); + } + + int numRecords = records.size(); + for (String instantTime : deltaInstants) { + records = dataGen.generateUpdates(instantTime, numRecords); + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false); + validateDeltaCommit(instantTime, fileIdToCompactionOperation, cfg); + } + return records; + } + + private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteClient client, + HoodieWriteConfig cfg) throws IOException { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + HoodieCompactionPlan workload = AvroUtils.deserializeCompactionPlan( + metaClient.getActiveTimeline().getInstantAuxiliaryDetails(compactionInstant).get()); + metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant); + HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants() + .filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get(); + assertTrue("Instant must be marked inflight", instant.isInflight()); + } + + private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg) + throws IOException { + client.scheduleCompactionAtInstant(compactionInstantTime, Optional.empty()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get(); + assertEquals("Last compaction instant must be the one set", + instant.getTimestamp(), compactionInstantTime); + } + + private void scheduleAndExecuteCompaction(String compactionInstantTime, + HoodieWriteClient client, HoodieTable table, HoodieWriteConfig cfg, int expectedNumRecs, + boolean hasDeltaCommitAfterPendingCompaction) throws IOException { + scheduleCompaction(compactionInstantTime, client, cfg); + executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction); + } + + private void executeCompaction(String compactionInstantTime, + HoodieWriteClient client, HoodieTable table, HoodieWriteConfig cfg, int expectedNumRecs, + boolean hasDeltaCommitAfterPendingCompaction) throws IOException { + + client.compact(compactionInstantTime); + List fileSliceList = getCurrentLatestFileSlices(table, cfg); + assertTrue("Ensure latest file-slices are not empty", fileSliceList.stream().findAny().isPresent()); + assertFalse("Verify all file-slices have base-instant same as compaction instant", + fileSliceList.stream().filter(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)) + .findAny().isPresent()); + assertFalse("Verify all file-slices have data-files", + fileSliceList.stream().filter(fs -> !fs.getDataFile().isPresent()).findAny().isPresent()); + + if (hasDeltaCommitAfterPendingCompaction) { + assertFalse("Verify all file-slices have atleast one log-file", + fileSliceList.stream().filter(fs -> fs.getLogFiles().count() == 0).findAny().isPresent()); + } else { + assertFalse("Verify all file-slices have no log-files", + fileSliceList.stream().filter(fs -> fs.getLogFiles().count() > 0).findAny().isPresent()); + } + + // verify that there is a commit + table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true), cfg, jsc); + HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants(); + String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); + assertEquals("Expect compaction instant time to be the latest commit time", + latestCompactionCommitTime, compactionInstantTime); + assertEquals("Must contain expected records", expectedNumRecs, + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count()); + + } + + private List createNextDeltaCommit(String instantTime, List records, + HoodieWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) { + JavaRDD writeRecords = jsc.parallelize(records, 1); + + client.startCommitWithTime(instantTime); + + JavaRDD statuses = client.upsert(writeRecords, instantTime); + List statusList = statuses.collect(); + TestHoodieClientBase.assertNoWriteErrors(statusList); + if (!cfg.shouldAutoCommit() && !skipCommit) { + client.commit(instantTime, statuses); + } + + Optional deltaCommit = metaClient.getActiveTimeline().reload().getDeltaCommitTimeline() + .filterCompletedInstants().lastInstant(); + if (skipCommit && !cfg.shouldAutoCommit()) { + assertTrue("Delta commit should not be latest instant", + deltaCommit.get().getTimestamp().compareTo(instantTime) < 0); + } else { + assertTrue(deltaCommit.isPresent()); + assertEquals("Delta commit should be latest instant", instantTime, deltaCommit.get().getTimestamp()); + } + return statusList; + } + + private List getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException { + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); + HoodieTableFileSystemView + view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitTimeline(), allFiles); + List dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList()); + return dataFilesToRead; + } + + private List getCurrentLatestFileSlices(HoodieTable table, HoodieWriteConfig cfg) throws IOException { + HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(), + table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline()); + List fileSliceList = + Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).stream().flatMap(partition -> + view.getLatestFileSlices(partition)).collect(Collectors.toList()); + return fileSliceList; + } + + protected HoodieTableType getTableType() { + return HoodieTableType.MERGE_ON_READ; + } +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java index 0840f7ddc..5545992fa 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java @@ -26,6 +26,7 @@ import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -71,6 +73,7 @@ public class TestHoodieClientBase implements Serializable { public void init() throws IOException { // Initialize a local spark env jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieClient")); + jsc.setLogLevel("ERROR"); //SQLContext stuff sqlContext = new SQLContext(jsc); @@ -80,7 +83,14 @@ public class TestHoodieClientBase implements Serializable { folder.create(); basePath = folder.getRoot().getAbsolutePath(); fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); - HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); + if (fs instanceof LocalFileSystem) { + LocalFileSystem lfs = (LocalFileSystem) fs; + // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream + // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open + // So, for the tests, we enforce checksum verification to circumvent the problem + lfs.setVerifyChecksum(true); + } + HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, getTableType()); dataGen = new HoodieTestDataGenerator(); } @@ -112,7 +122,7 @@ public class TestHoodieClientBase implements Serializable { * * @param statuses List of Write Status */ - void assertNoWriteErrors(List statuses) { + static void assertNoWriteErrors(List statuses) { // Verify there are no errors for (WriteStatus status : statuses) { assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); @@ -433,4 +443,8 @@ public class TestHoodieClientBase implements Serializable { R apply(T1 v1, T2 v2, T3 v3) throws IOException; } + + protected HoodieTableType getTableType() { + return HoodieTableType.COPY_ON_WRITE; + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 8fd0d31dc..e1a6d66dc 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -35,6 +35,7 @@ import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieMemoryConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.bloom.HoodieBloomIndex; import com.uber.hoodie.io.compact.HoodieCompactor; @@ -90,7 +91,9 @@ public class TestHoodieCompactor { } private HoodieWriteConfig getConfig() { - return getConfigBuilder().build(); + return getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .build(); } private HoodieWriteConfig.Builder getConfigBuilder() { @@ -103,12 +106,14 @@ public class TestHoodieCompactor { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = HoodieNotSupportedException.class) public void testCompactionOnCopyOnWriteFail() throws Exception { HoodieTestUtils.initTableType(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); - compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); + String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime(); + table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); } @Test @@ -123,8 +128,9 @@ public class TestHoodieCompactor { JavaRDD recordsRDD = jsc.parallelize(records, 1); writeClient.insert(recordsRDD, newCommitTime).collect(); - JavaRDD result = compactor - .compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); + String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime(); + JavaRDD result = + table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); assertTrue("If there is nothing to compact, result will be empty", result.isEmpty()); } @@ -171,8 +177,9 @@ public class TestHoodieCompactor { metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); table = HoodieTable.getHoodieTable(metaClient, config, jsc); - JavaRDD result = compactor - .compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); + String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime(); + JavaRDD result = + table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); // Verify that all partition paths are present in the WriteStatus result for (String partitionPath : dataGen.getPartitionPaths()) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 980b3d077..fbcdac258 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -41,6 +41,7 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; @@ -202,7 +203,7 @@ public class TestMergeOnReadTable { commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - String compactionCommitTime = client.startCompaction(); + String compactionCommitTime = client.scheduleCompaction(Optional.empty()).get().toString(); client.compact(compactionCommitTime); allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); @@ -522,14 +523,15 @@ public class TestMergeOnReadTable { metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - String compactionCommit = client.startCompaction(); - JavaRDD writeStatus = client.compact(compactionCommit); - client.commitCompaction(compactionCommit, writeStatus); + String compactionInstantTime = client.scheduleCompaction(Optional.empty()).get().toString(); + JavaRDD ws = client.compact(compactionInstantTime); + client.commitCompaction(compactionInstantTime, ws, Optional.empty()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + List dataFiles2 = roView.getLatestDataFiles().collect(Collectors.toList()); final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get() .getTimestamp(); @@ -679,12 +681,13 @@ public class TestMergeOnReadTable { } } - // Do a compaction - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metaClient, config, jsc); + // Mark 2nd delta-instant as completed + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Optional.empty()); - String commitTime = writeClient.startCompaction(); - JavaRDD result = writeClient.compact(commitTime); + // Do a compaction + String compactionInstantTime = writeClient.scheduleCompaction(Optional.empty()).get().toString(); + JavaRDD result = writeClient.compact(compactionInstantTime); // Verify that recently written compacted data file has no log file metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); @@ -741,9 +744,9 @@ public class TestMergeOnReadTable { Assert.assertTrue(totalUpsertTime > 0); // Do a compaction - String commitTime = writeClient.startCompaction(); - statuses = writeClient.compact(commitTime); - writeClient.commitCompaction(commitTime, statuses); + String compactionInstantTime = writeClient.scheduleCompaction(Optional.empty()).get().toString(); + statuses = writeClient.compact(compactionInstantTime); + writeClient.commitCompaction(compactionInstantTime, statuses, Optional.empty()); // total time taken for scanning log files should be greater than 0 long timeTakenForScanner = statuses.map(writeStatus -> writeStatus.getStat().getRuntimeStats().getTotalScanTime()) .reduce((a, b) -> a + b).longValue(); @@ -782,11 +785,11 @@ public class TestMergeOnReadTable { Assert.assertTrue(numLogFiles > 0); // Do a compaction - String commitTime = writeClient.startCompaction(); + String commitTime = writeClient.scheduleCompaction(Optional.empty()).get().toString(); statuses = writeClient.compact(commitTime); Assert.assertTrue(statuses.map(status -> status.getStat().getPath().contains("parquet")).count() == numLogFiles); Assert.assertEquals(statuses.count(), numLogFiles); - writeClient.commitCompaction(commitTime, statuses); + writeClient.commitCompaction(commitTime, statuses, Optional.empty()); } @Test @@ -824,6 +827,8 @@ public class TestMergeOnReadTable { writeClient.commit(newCommitTime, statuses); // rollback a successful commit + // Sleep for small interval to force a new rollback start time. + Thread.sleep(5); writeClient.rollback(newCommitTime); final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); @@ -875,12 +880,12 @@ public class TestMergeOnReadTable { Assert.assertTrue(numLogFiles > 0); // Do a compaction - newCommitTime = writeClient.startCompaction(); + newCommitTime = writeClient.scheduleCompaction(Optional.empty()).get().toString(); statuses = writeClient.compact(newCommitTime); // Ensure all log files have been compacted into parquet files Assert.assertTrue(statuses.map(status -> status.getStat().getPath().contains("parquet")).count() == numLogFiles); Assert.assertEquals(statuses.count(), numLogFiles); - writeClient.commitCompaction(newCommitTime, statuses); + writeClient.commitCompaction(newCommitTime, statuses, Optional.empty()); // Trigger a rollback of compaction writeClient.rollback(newCommitTime); for (String partitionPath : dataGen.getPartitionPaths()) { diff --git a/hoodie-client/src/test/resources/log4j-surefire.properties b/hoodie-client/src/test/resources/log4j-surefire.properties index daf8d28c1..23ded09c3 100644 --- a/hoodie-client/src/test/resources/log4j-surefire.properties +++ b/hoodie-client/src/test/resources/log4j-surefire.properties @@ -14,12 +14,8 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=INFO -log4j.category.com.uber.hoodie.common.utils=WARN -log4j.category.com.uber.hoodie.io=WARN -log4j.category.com.uber.hoodie.common=WARN -log4j.category.com.uber.hoodie.table.log=WARN -log4j.category.org.apache.parquet.hadoop=WARN +log4j.category.com.uber=WARN +log4j.category.org.apache.parquet.hadoop=ERROR # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. diff --git a/hoodie-common/src/test/resources/log4j-surefire.properties b/hoodie-common/src/test/resources/log4j-surefire.properties index ea3e93545..23ded09c3 100644 --- a/hoodie-common/src/test/resources/log4j-surefire.properties +++ b/hoodie-common/src/test/resources/log4j-surefire.properties @@ -14,10 +14,8 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=INFO -log4j.category.com.uber.hoodie.table.log=WARN -log4j.category.com.uber.hoodie.common.util=WARN -log4j.category.org.apache.parquet.hadoop=WARN +log4j.category.com.uber=WARN +log4j.category.org.apache.parquet.hadoop=ERROR # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. diff --git a/hoodie-hadoop-mr/src/test/resources/log4j-surefire.properties b/hoodie-hadoop-mr/src/test/resources/log4j-surefire.properties index 3613e7d12..23ded09c3 100644 --- a/hoodie-hadoop-mr/src/test/resources/log4j-surefire.properties +++ b/hoodie-hadoop-mr/src/test/resources/log4j-surefire.properties @@ -14,8 +14,8 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=INFO -log4j.category.org.apache.parquet.hadoop=WARN +log4j.category.com.uber=WARN +log4j.category.org.apache.parquet.hadoop=ERROR # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. diff --git a/hoodie-hive/src/test/resources/log4j-surefire.properties b/hoodie-hive/src/test/resources/log4j-surefire.properties index 8027f04d8..1d7678513 100644 --- a/hoodie-hive/src/test/resources/log4j-surefire.properties +++ b/hoodie-hive/src/test/resources/log4j-surefire.properties @@ -14,9 +14,9 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=INFO -log4j.category.org.apache.parquet.hadoop=WARN -log4j.category.parquet.hadoop=WARN +log4j.category.com.uber=WARN +log4j.category.org.apache.parquet.hadoop=ERROR +log4j.category.parquet.hadoop=ERROR # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. diff --git a/hoodie-spark/src/test/resources/log4j-surefire.properties b/hoodie-spark/src/test/resources/log4j-surefire.properties index daf8d28c1..23ded09c3 100644 --- a/hoodie-spark/src/test/resources/log4j-surefire.properties +++ b/hoodie-spark/src/test/resources/log4j-surefire.properties @@ -14,12 +14,8 @@ # limitations under the License. # log4j.rootLogger=WARN, A1 -log4j.category.com.uber=INFO -log4j.category.com.uber.hoodie.common.utils=WARN -log4j.category.com.uber.hoodie.io=WARN -log4j.category.com.uber.hoodie.common=WARN -log4j.category.com.uber.hoodie.table.log=WARN -log4j.category.org.apache.parquet.hadoop=WARN +log4j.category.com.uber=WARN +log4j.category.org.apache.parquet.hadoop=ERROR # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout.