1
0

[HUDI-1937] Rollback unfinished replace commit to allow updates (#3869)

* [HUDI-1937] Rollback unfinished replace commit to allow updates while clustering

* Revert and delete requested replacecommit too

* Rollback pending clustering instants transactionally

* No double locking and add a config to enable rollback

* Update config to be clear about rollback only on conflict
This commit is contained in:
Sagar Sumit
2021-11-23 07:29:03 +05:30
committed by GitHub
parent 0d1e7ecdab
commit e22150fe15
7 changed files with 132 additions and 28 deletions

View File

@@ -191,6 +191,15 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("Enable data skipping by collecting statistics once layout optimization is complete.");
public static final ConfigProperty<Boolean> ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT = ConfigProperty
.key("hoodie.clustering.rollback.pending.replacecommit.on.conflict")
.defaultValue(false)
.sinceVersion("0.10.0")
.withDocumentation("If updates are allowed to file groups pending clustering, then set this config to rollback failed or pending clustering instants. "
+ "Pending clustering will be rolled back ONLY IF there is conflict between incoming upsert and filegroup to be clustered. "
+ "Please exercise caution while setting this config, especially when clustering is done very frequently. This could lead to race condition in "
+ "rare scenarios, for example, when the clustering completes after instants are fetched but before rollback completed.");
/**
* @deprecated Use {@link #PLAN_STRATEGY_CLASS_NAME} and its methods instead
*/
@@ -404,6 +413,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
public Builder withRollbackPendingClustering(Boolean rollbackPendingClustering) {
clusteringConfig.setValue(ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT, String.valueOf(rollbackPendingClustering));
return this;
}
public Builder withSpaceFillingCurveDataOptimizeEnable(Boolean enable) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_ENABLE, String.valueOf(enable));
return this;

View File

@@ -1166,6 +1166,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
}
public boolean isRollbackPendingClustering() {
return getBoolean(HoodieClusteringConfig.ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT);
}
public int getInlineClusterMaxCommits() {
return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS);
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.cluster.strategy;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import java.util.Set;
@@ -41,8 +42,8 @@ public abstract class UpdateStrategy<T extends HoodieRecordPayload<T>, I> {
* Check the update records to the file group in clustering.
* @param taggedRecordsRDD the records to write, tagged with target file id,
* future can update tagged records location to a different fileId.
* @return the recordsRDD strategy updated
* @return the recordsRDD strategy updated and a set of file groups to be updated while pending clustering.
*/
public abstract I handleUpdate(I taggedRecordsRDD);
public abstract Pair<I, Set<HoodieFileGroupId>> handleUpdate(I taggedRecordsRDD);
}

View File

@@ -22,10 +22,15 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.spark.api.java.JavaRDD;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Allow ingestion commits during clustering job.
@@ -37,8 +42,19 @@ public class SparkAllowUpdateStrategy<T extends HoodieRecordPayload<T>> extends
super(engineContext, fileGroupsInPendingClustering);
}
private List<HoodieFileGroupId> getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> inputRecords) {
List<HoodieFileGroupId> fileGroupIdsWithUpdates = inputRecords
.filter(record -> record.getCurrentLocation() != null)
.map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect();
return fileGroupIdsWithUpdates;
}
@Override
public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
return taggedRecordsRDD;
public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = fileGroupIdsWithRecordUpdate.stream()
.filter(f -> fileGroupsInPendingClustering.contains(f))
.collect(Collectors.toSet());
return Pair.of(taggedRecordsRDD, fileGroupIdsWithUpdatesAndPendingClustering);
}
}

View File

@@ -22,14 +22,18 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Update strategy based on following.
@@ -50,7 +54,7 @@ public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> extends
}
@Override
public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) {
@@ -61,7 +65,7 @@ public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> extends
throw new HoodieClusteringUpdateException(msg);
}
});
return taggedRecordsRDD;
return Pair.of(taggedRecordsRDD, Collections.emptySet());
}
}

View File

@@ -77,6 +77,8 @@ import java.util.List;
import java.util.Set;
import java.util.Map;
import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans;
public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> extends
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata> {
@@ -118,7 +120,27 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
return (JavaRDD<HoodieRecord<T>>)updateStrategy.handleUpdate(inputRecordsRDD);
Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
(Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>)updateStrategy.handleUpdate(inputRecordsRDD);
Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
return recordsAndPendingClusteringFileGroups.getLeft();
}
// there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants
// there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed
if (config.isRollbackPendingClustering()) {
Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
.filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
pendingClusteringInstantsToRollback.forEach(instant -> {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers());
table.rollback(context, commitTime, instant, true, true);
});
table.getMetaClient().reloadActiveTimeline();
}
return recordsAndPendingClusteringFileGroups.getLeft();
} else {
return inputRecordsRDD;
}

View File

@@ -129,6 +129,14 @@ import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
@@ -586,16 +594,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false);
List<HoodieInstant> instants = activeTimeline.getCommitTimeline().getInstants().collect(Collectors.toList());
assertEquals(5, instants.size());
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"),
assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
instants.get(0));
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "004"),
assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "004"),
instants.get(1));
// New Format should have all states of instants
assertEquals(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "006"),
assertEquals(new HoodieInstant(REQUESTED, COMMIT_ACTION, "006"),
instants.get(2));
assertEquals(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "006"),
assertEquals(new HoodieInstant(INFLIGHT, COMMIT_ACTION, "006"),
instants.get(3));
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "006"),
assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "006"),
instants.get(4));
final HoodieWriteConfig cfg = hoodieWriteConfig;
@@ -1403,7 +1411,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieInstant pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft();
// complete another commit after pending clustering
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER);
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(config);
@@ -1419,6 +1427,41 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean rollbackPendingClustering) throws Exception {
// setup clustering config with update strategy to allow updates during ingestion
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0)
.withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
.withRollbackPendingClustering(rollbackPendingClustering)
.withInlineClustering(true).withInlineClusteringNumCommits(1).build();
// start clustering, but don't commit keep it inflight
List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, true, false);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
List<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans =
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
assertEquals(1, pendingClusteringPlans.size());
HoodieInstant pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft();
assertEquals(pendingClusteringInstant.getState(), INFLIGHT);
// make an update to a filegroup within the partition that is pending clustering
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER);
addConfigsForPopulateMetaFields(cfgBuilder, true);
cfgBuilder.withClusteringConfig(clusteringConfig);
HoodieWriteConfig config = cfgBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(config);
String commitTime = HoodieActiveTimeline.createNewInstantTime();
allRecords.addAll(dataGen.generateUpdates(commitTime, 200));
writeAndVerifyBatch(client, allRecords, commitTime, true);
// verify inflight clustering was rolled back
metaClient.reloadActiveTimeline();
pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
assertEquals(config.isRollbackPendingClustering() ? 0 : 1, pendingClusteringPlans.size());
}
@Test
public void testClusteringWithFailingValidator() throws Exception {
// setup clustering config.
@@ -1622,7 +1665,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// Do Insert Overwrite
String commitTime2 = "002";
client.startCommitWithTime(commitTime2, HoodieTimeline.REPLACE_COMMIT_ACTION);
client.startCommitWithTime(commitTime2, REPLACE_COMMIT_ACTION);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, batch2RecordsCount);
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
insertsAndUpdates2.addAll(inserts2);
@@ -1678,7 +1721,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, String commitTime, List<String> deletePartitionPath) {
client.startCommitWithTime(commitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
client.startCommitWithTime(commitTime, REPLACE_COMMIT_ACTION);
HoodieWriteResult writeResult = client.deletePartitions(deletePartitionPath, commitTime);
Set<String> deletePartitionReplaceFileIds =
writeResult.getPartitionToReplaceFileIds().entrySet()
@@ -2124,7 +2167,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 0);
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
@@ -2143,26 +2186,26 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
if (cleaningPolicy.isLazy()) {
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION))
.getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
.countInstants()
== 2);
// Since we write rollbacks not clean, there should be no clean action on the timeline
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION))
.getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
.countInstants()
== 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
} else if (cleaningPolicy.isNever()) {
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION))
.getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION))
.countInstants()
== 0);
// There should be no clean or rollback action on the timeline
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION))
.getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION))
.countInstants()
== 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
@@ -2173,7 +2216,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
@MethodSource("populateMetaFieldsParams")
public void testRollbackFailedCommitsToggleCleaningPolicy(boolean populateMetaFields) throws Exception {
HoodieTestUtils.init(hadoopConf, basePath);
HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER;
HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER;
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 failed writes to table
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
@@ -2202,7 +2245,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 3);
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3);
// Perform 2 failed commits
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
@@ -2215,12 +2258,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
0, false);
client.close();
// Toggle cleaning policy to EAGER
cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER;
cleaningPolicy = EAGER;
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
client.startCommit();
timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 5);
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 5);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 0);
}
@@ -2250,7 +2293,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 0);
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true));
@@ -2268,10 +2311,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
clean1.get();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 2);
CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 2);
// Since we write rollbacks not clean, there should be no clean action on the timeline
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).countInstants() == 0);
CollectionUtils.createSet(CLEAN_ACTION)).countInstants() == 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
}
@@ -2432,7 +2475,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieClusteringPlan clusteringPlan =
ClusteringUtils.createClusteringPlan(EXECUTION_STRATEGY_CLASS_NAME.defaultValue(), STRATEGY_PARAMS, fileSlices, Collections.emptyMap());
HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime);
HoodieInstant clusteringInstant = new HoodieInstant(REQUESTED, REPLACE_COMMIT_ACTION, clusterTime);
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));