diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 455cb644c..961965353 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -999,7 +999,6 @@ public abstract class BaseHoodieWriteClient getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) { + public Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) { return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty()); } @@ -1375,14 +1374,6 @@ public abstract class BaseHoodieWriteClient pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false); - String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); - table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); - table.rollback(context, commitTime, inflightInstant, false, false); - table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); - } - /** * Finalize Write operation. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index d006b52b3..a394c6d90 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -18,8 +18,6 @@ package org.apache.hudi.client; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -44,6 +42,9 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.action.compact.OperationResult; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -172,7 +173,7 @@ public class CompactionAdminClient extends BaseHoodieClient { Path inflightPath = new Path(metaClient.getMetaPath(), inflight.getFileName()); if (metaClient.getFs().exists(inflightPath)) { // revert if in inflight state - metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight); + metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight); } // Overwrite compaction plan with updated info metaClient.getActiveTimeline().saveToCompactionRequested( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 56526d23d..b6541ac66 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,11 +18,6 @@ package org.apache.hudi.table; -import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; @@ -65,6 +60,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -82,6 +78,12 @@ import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.storage.HoodieLayoutFactory; import org.apache.hudi.table.storage.HoodieStorageLayout; + +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -545,12 +547,37 @@ public abstract class HoodieTable implem * * @param inflightInstant Inflight Compaction Instant */ - public void rollbackInflightCompaction(HoodieInstant inflightInstant, Function> getPendingRollbackInstantFunc) { + public void rollbackInflightCompaction(HoodieInstant inflightInstant, + Function> getPendingRollbackInstantFunc) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc); + } + + /** + * Rollback inflight clustering instant to requested clustering instant + * + * @param inflightInstant Inflight clustering instant + * @param getPendingRollbackInstantFunc Function to get rollback instant + */ + public void rollbackInflightClustering(HoodieInstant inflightInstant, + Function> getPendingRollbackInstantFunc) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)); + rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc); + } + + /** + * Rollback inflight instant to requested instant + * + * @param inflightInstant Inflight instant + * @param getPendingRollbackInstantFunc Function to get rollback instant + */ + private void rollbackInflightInstant(HoodieInstant inflightInstant, + Function> getPendingRollbackInstantFunc) { final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); rollback(context, commitTime, inflightInstant, false, false); - getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); + getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index fe6ea975e..bdf478a8f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -355,7 +355,7 @@ public class SparkRDDWriteClient extends HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); if (pendingClusteringTimeline.containsInstant(inflightInstant)) { - rollbackInflightClustering(inflightInstant, table); + table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); table.getMetaClient().reloadActiveTimeline(); } clusteringTimer = metrics.getClusteringCtx(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index c069e41ad..6e7f6a243 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -18,10 +18,6 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -32,6 +28,11 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -347,16 +348,15 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } /** - * Revert compaction State from inflight to requested. + * Revert instant state from inflight to requested. * * @param inflightInstant Inflight Instant * @return requested instant */ - public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) { - ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + public HoodieInstant revertInstantFromInflightToRequested(HoodieInstant inflightInstant) { ValidationUtils.checkArgument(inflightInstant.isInflight()); HoodieInstant requestedInstant = - new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp()); + new HoodieInstant(State.REQUESTED, inflightInstant.getAction(), inflightInstant.getTimestamp()); if (metaClient.getTimelineLayoutVersion().isNullVersion()) { // Pass empty data since it is read from the corresponding .aux/.compaction instant file transitionState(inflightInstant, requestedInstant, Option.empty()); @@ -514,26 +514,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return commitInstant; } - /** - * Revert replace requested State from inflight to requested. - * - * @param inflightInstant Inflight Instant - * @return requested instant - */ - public HoodieInstant revertReplaceCommitInflightToRequested(HoodieInstant inflightInstant) { - ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)); - ValidationUtils.checkArgument(inflightInstant.isInflight()); - HoodieInstant requestedInstant = - new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION, inflightInstant.getTimestamp()); - if (metaClient.getTimelineLayoutVersion().isNullVersion()) { - // Pass empty data since it is read from the corresponding .aux/.compaction instant file - transitionState(inflightInstant, requestedInstant, Option.empty()); - } else { - deleteInflight(inflightInstant); - } - return requestedInstant; - } - private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data) { transitionState(fromInstant, toInstant, data, false); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 9ff17cdbd..55806bf1e 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -336,7 +336,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { timeline = timeline.reload(); assertFalse(timeline.containsInstant(compaction)); assertTrue(timeline.containsInstant(inflight)); - compaction = timeline.revertCompactionInflightToRequested(inflight); + compaction = timeline.revertInstantFromInflightToRequested(inflight); timeline = timeline.reload(); assertTrue(timeline.containsInstant(compaction)); assertFalse(timeline.containsInstant(inflight)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index f7c361533..b8ba8e438 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -114,7 +114,8 @@ public class HoodieFlinkClusteringJob { HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp()); if (timeline.containsInstant(inflightInstant)) { LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]"); - writeClient.rollbackInflightClustering(inflightInstant, table); + table.rollbackInflightClustering(inflightInstant, + commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); table.getMetaClient().reloadActiveTimeline(); }