diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 73c66d803..6d3a170f4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -191,6 +191,15 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.10.0") .withDocumentation("Enable data skipping by collecting statistics once layout optimization is complete."); + public static final ConfigProperty ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT = ConfigProperty + .key("hoodie.clustering.rollback.pending.replacecommit.on.conflict") + .defaultValue(false) + .sinceVersion("0.10.0") + .withDocumentation("If updates are allowed to file groups pending clustering, then set this config to rollback failed or pending clustering instants. " + + "Pending clustering will be rolled back ONLY IF there is conflict between incoming upsert and filegroup to be clustered. " + + "Please exercise caution while setting this config, especially when clustering is done very frequently. This could lead to race condition in " + + "rare scenarios, for example, when the clustering completes after instants are fetched but before rollback completed."); + /** * @deprecated Use {@link #PLAN_STRATEGY_CLASS_NAME} and its methods instead */ @@ -404,6 +413,11 @@ public class HoodieClusteringConfig extends HoodieConfig { return this; } + public Builder withRollbackPendingClustering(Boolean rollbackPendingClustering) { + clusteringConfig.setValue(ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT, String.valueOf(rollbackPendingClustering)); + return this; + } + public Builder withSpaceFillingCurveDataOptimizeEnable(Boolean enable) { clusteringConfig.setValue(LAYOUT_OPTIMIZE_ENABLE, String.valueOf(enable)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7d96ec388..d4966104d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1166,6 +1166,10 @@ public class HoodieWriteConfig extends HoodieConfig { return inlineClusteringEnabled() || isAsyncClusteringEnabled(); } + public boolean isRollbackPendingClustering() { + return getBoolean(HoodieClusteringConfig.ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT); + } + public int getInlineClusterMaxCommits() { return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java index 3ce4f04e3..009790812 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.action.cluster.strategy; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; import java.util.Set; @@ -41,8 +42,8 @@ public abstract class UpdateStrategy, I> { * Check the update records to the file group in clustering. * @param taggedRecordsRDD the records to write, tagged with target file id, * future can update tagged records location to a different fileId. - * @return the recordsRDD strategy updated + * @return the recordsRDD strategy updated and a set of file groups to be updated while pending clustering. */ - public abstract I handleUpdate(I taggedRecordsRDD); + public abstract Pair> handleUpdate(I taggedRecordsRDD); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java index 1547e8c24..403a0c2e1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java @@ -22,10 +22,15 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; + import org.apache.spark.api.java.JavaRDD; import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * Allow ingestion commits during clustering job. @@ -37,8 +42,19 @@ public class SparkAllowUpdateStrategy> extends super(engineContext, fileGroupsInPendingClustering); } + private List getGroupIdsWithUpdate(JavaRDD> inputRecords) { + List fileGroupIdsWithUpdates = inputRecords + .filter(record -> record.getCurrentLocation() != null) + .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect(); + return fileGroupIdsWithUpdates; + } + @Override - public JavaRDD> handleUpdate(JavaRDD> taggedRecordsRDD) { - return taggedRecordsRDD; + public Pair>, Set> handleUpdate(JavaRDD> taggedRecordsRDD) { + List fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD); + Set fileGroupIdsWithUpdatesAndPendingClustering = fileGroupIdsWithRecordUpdate.stream() + .filter(f -> fileGroupsInPendingClustering.contains(f)) + .collect(Collectors.toSet()); + return Pair.of(taggedRecordsRDD, fileGroupIdsWithUpdatesAndPendingClustering); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java index 134e49024..b12d9ad43 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java @@ -22,14 +22,18 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieClusteringUpdateException; import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Update strategy based on following. @@ -50,7 +54,7 @@ public class SparkRejectUpdateStrategy> extends } @Override - public JavaRDD> handleUpdate(JavaRDD> taggedRecordsRDD) { + public Pair>, Set> handleUpdate(JavaRDD> taggedRecordsRDD) { List fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD); fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> { if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) { @@ -61,7 +65,7 @@ public class SparkRejectUpdateStrategy> extends throw new HoodieClusteringUpdateException(msg); } }); - return taggedRecordsRDD; + return Pair.of(taggedRecordsRDD, Collections.emptySet()); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 2bcd6d787..af819bc9c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -77,6 +77,8 @@ import java.util.List; import java.util.Set; import java.util.Map; +import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans; + public abstract class BaseSparkCommitActionExecutor extends BaseCommitActionExecutor>, JavaRDD, JavaRDD, HoodieWriteMetadata> { @@ -118,7 +120,27 @@ public abstract class BaseSparkCommitActionExecutor entry.getKey()).collect(Collectors.toSet()); UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); - return (JavaRDD>)updateStrategy.handleUpdate(inputRecordsRDD); + Pair>, Set> recordsAndPendingClusteringFileGroups = + (Pair>, Set>)updateStrategy.handleUpdate(inputRecordsRDD); + Set fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight(); + if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) { + return recordsAndPendingClusteringFileGroups.getLeft(); + } + // there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants + // there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed + if (config.isRollbackPendingClustering()) { + Set pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream() + .filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toSet()); + pendingClusteringInstantsToRollback.forEach(instant -> { + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers()); + table.rollback(context, commitTime, instant, true, true); + }); + table.getMetaClient().reloadActiveTimeline(); + } + return recordsAndPendingClusteringFileGroups.getLeft(); } else { return inputRecordsRDD; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 86d18fe28..7e3f8f329 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -129,6 +129,14 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0; import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; @@ -586,16 +594,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false); List instants = activeTimeline.getCommitTimeline().getInstants().collect(Collectors.toList()); assertEquals(5, instants.size()); - assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"), + assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), instants.get(0)); - assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "004"), + assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "004"), instants.get(1)); // New Format should have all states of instants - assertEquals(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "006"), + assertEquals(new HoodieInstant(REQUESTED, COMMIT_ACTION, "006"), instants.get(2)); - assertEquals(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "006"), + assertEquals(new HoodieInstant(INFLIGHT, COMMIT_ACTION, "006"), instants.get(3)); - assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "006"), + assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "006"), instants.get(4)); final HoodieWriteConfig cfg = hoodieWriteConfig; @@ -1403,7 +1411,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieInstant pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft(); // complete another commit after pending clustering - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig config = cfgBuilder.build(); SparkRDDWriteClient client = getHoodieWriteClient(config); @@ -1419,6 +1427,41 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count()); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean rollbackPendingClustering) throws Exception { + // setup clustering config with update strategy to allow updates during ingestion + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder() + .withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0) + .withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy") + .withRollbackPendingClustering(rollbackPendingClustering) + .withInlineClustering(true).withInlineClusteringNumCommits(1).build(); + + // start clustering, but don't commit keep it inflight + List allRecords = testInsertAndClustering(clusteringConfig, true, false); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); + List> pendingClusteringPlans = + ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList()); + assertEquals(1, pendingClusteringPlans.size()); + HoodieInstant pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft(); + assertEquals(pendingClusteringInstant.getState(), INFLIGHT); + + // make an update to a filegroup within the partition that is pending clustering + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER); + addConfigsForPopulateMetaFields(cfgBuilder, true); + cfgBuilder.withClusteringConfig(clusteringConfig); + HoodieWriteConfig config = cfgBuilder.build(); + SparkRDDWriteClient client = getHoodieWriteClient(config); + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + allRecords.addAll(dataGen.generateUpdates(commitTime, 200)); + writeAndVerifyBatch(client, allRecords, commitTime, true); + + // verify inflight clustering was rolled back + metaClient.reloadActiveTimeline(); + pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList()); + assertEquals(config.isRollbackPendingClustering() ? 0 : 1, pendingClusteringPlans.size()); + } + @Test public void testClusteringWithFailingValidator() throws Exception { // setup clustering config. @@ -1622,7 +1665,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // Do Insert Overwrite String commitTime2 = "002"; - client.startCommitWithTime(commitTime2, HoodieTimeline.REPLACE_COMMIT_ACTION); + client.startCommitWithTime(commitTime2, REPLACE_COMMIT_ACTION); List inserts2 = dataGen.generateInserts(commitTime2, batch2RecordsCount); List insertsAndUpdates2 = new ArrayList<>(); insertsAndUpdates2.addAll(inserts2); @@ -1678,7 +1721,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } private Set deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List deletePartitionPath) { - client.startCommitWithTime(commitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); + client.startCommitWithTime(commitTime, REPLACE_COMMIT_ACTION); HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime); Set deletePartitionReplaceFileIds = writeResult.getPartitionToReplaceFileIds().entrySet() @@ -2124,7 +2167,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); assertTrue(metaClient.getActiveTimeline().getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 0); + CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0); assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2); assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1); // Await till enough time passes such that the first 2 failed commits heartbeats are expired @@ -2143,26 +2186,26 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { if (cleaningPolicy.isLazy()) { assertTrue( timeline - .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)) + .getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION)) .countInstants() == 2); // Since we write rollbacks not clean, there should be no clean action on the timeline assertTrue( timeline - .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)) + .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION)) .countInstants() == 0); assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); } else if (cleaningPolicy.isNever()) { assertTrue( timeline - .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)) + .getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION)) .countInstants() == 0); // There should be no clean or rollback action on the timeline assertTrue( timeline - .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)) + .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION)) .countInstants() == 0); assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); @@ -2173,7 +2216,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @MethodSource("populateMetaFieldsParams") public void testRollbackFailedCommitsToggleCleaningPolicy(boolean populateMetaFields) throws Exception { HoodieTestUtils.init(hadoopConf, basePath); - HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER; + HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER; SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); // Perform 1 failed writes to table writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100", @@ -2202,7 +2245,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { client.clean(); HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload(); assertTrue(timeline.getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 3); + CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3); // Perform 2 failed commits client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400", @@ -2215,12 +2258,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { 0, false); client.close(); // Toggle cleaning policy to EAGER - cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER; + cleaningPolicy = EAGER; client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); client.startCommit(); timeline = metaClient.getActiveTimeline().reload(); assertTrue(timeline.getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 5); + CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 5); assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 0); } @@ -2250,7 +2293,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); assertTrue(metaClient.getActiveTimeline().getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 0); + CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0); assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2); assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)); @@ -2268,10 +2311,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { clean1.get(); HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload(); assertTrue(timeline.getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 2); + CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 2); // Since we write rollbacks not clean, there should be no clean action on the timeline assertTrue(timeline.getTimelineOfActions( - CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).countInstants() == 0); + CollectionUtils.createSet(CLEAN_ACTION)).countInstants() == 0); assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); } @@ -2432,7 +2475,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieClusteringPlan clusteringPlan = ClusteringUtils.createClusteringPlan(EXECUTION_STRATEGY_CLASS_NAME.defaultValue(), STRATEGY_PARAMS, fileSlices, Collections.emptyMap()); - HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime); + HoodieInstant clusteringInstant = new HoodieInstant(REQUESTED, REPLACE_COMMIT_ACTION, clusterTime); HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build(); metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));