[HUDI-3556] Re-use rollback instant for rolling back of clustering and compaction if rollback failed mid-way (#4971)
This commit is contained in:
committed by
GitHub
parent
e8918b6c2c
commit
e7bb0413af
@@ -957,8 +957,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
return inflightTimelineExcludeClusteringCommit;
|
return inflightTimelineExcludeClusteringCommit;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
|
protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
|
||||||
return getPendingRollbackInfos(metaClient).getOrDefault(commitToRollback, Option.empty());
|
return getPendingRollbackInfo(metaClient, commitToRollback, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
|
||||||
|
return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
|
||||||
|
return getPendingRollbackInfos(metaClient, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -966,21 +974,26 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
|
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
|
||||||
* @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
|
* @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
|
||||||
*/
|
*/
|
||||||
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
|
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
|
||||||
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
|
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||||
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
|
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
|
||||||
for (HoodieInstant instant : instants) {
|
for (HoodieInstant instant : instants) {
|
||||||
try {
|
try {
|
||||||
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
|
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
|
||||||
String action = rollbackPlan.getInstantToRollback().getAction();
|
String action = rollbackPlan.getInstantToRollback().getAction();
|
||||||
|
if (ignoreCompactionAndClusteringInstants) {
|
||||||
if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
|
if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
|
||||||
boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
|
boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
|
||||||
&& ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent();
|
&& ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(),
|
||||||
|
rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
|
||||||
if (!isClustering) {
|
if (!isClustering) {
|
||||||
String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
|
String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
|
||||||
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
|
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
|
||||||
}
|
}
|
||||||
@@ -1211,7 +1224,8 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
|
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
|
||||||
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
|
||||||
|
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
|
||||||
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
||||||
table.rollback(context, commitTime, inflightInstant, false, false);
|
table.rollback(context, commitTime, inflightInstant, false, false);
|
||||||
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
|
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan;
|
|||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
||||||
|
import org.apache.hudi.common.HoodiePendingRollbackInfo;
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
@@ -89,6 +90,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@@ -519,14 +521,19 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
String restoreInstantTime,
|
String restoreInstantTime,
|
||||||
String instantToRestore);
|
String instantToRestore);
|
||||||
|
|
||||||
|
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
|
||||||
|
rollbackInflightCompaction(inflightInstant, s -> Option.empty());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
|
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file
|
||||||
* to the .requested file.
|
* to the .requested file.
|
||||||
*
|
*
|
||||||
* @param inflightInstant Inflight Compaction Instant
|
* @param inflightInstant Inflight Compaction Instant
|
||||||
*/
|
*/
|
||||||
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
|
public void rollbackInflightCompaction(HoodieInstant inflightInstant, Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
|
||||||
String commitTime = HoodieActiveTimeline.createNewInstantTime();
|
final String commitTime = getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
|
||||||
|
-> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
|
||||||
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
|
||||||
rollback(context, commitTime, inflightInstant, false, false);
|
rollback(context, commitTime, inflightInstant, false, false);
|
||||||
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
|
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
|
||||||
|
|||||||
@@ -325,7 +325,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
||||||
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
|
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
|
||||||
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
|
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
|
||||||
table.rollbackInflightCompaction(inflightInstant);
|
table.rollbackInflightCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
}
|
}
|
||||||
compactionTimer = metrics.getCompactionCtx();
|
compactionTimer = metrics.getCompactionCtx();
|
||||||
|
|||||||
@@ -56,6 +56,8 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
|||||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||||
|
import org.apache.hudi.common.testutils.ClusteringTestUtils;
|
||||||
|
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||||
@@ -1438,9 +1440,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@MethodSource("populateMetaFieldsParams")
|
public void testPendingClusteringRollback() throws Exception {
|
||||||
public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception {
|
boolean populateMetaFields = true;
|
||||||
// setup clustering config.
|
// setup clustering config.
|
||||||
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
|
||||||
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
|
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true).build();
|
||||||
@@ -1467,6 +1469,33 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
// verify there are no pending clustering instants
|
// verify there are no pending clustering instants
|
||||||
assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
|
assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
|
||||||
|
|
||||||
|
// delete rollback.completed instant to mimic failed rollback of clustering. and then trigger rollback of clustering again. same rollback instant should be used.
|
||||||
|
HoodieInstant rollbackInstant = metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
|
||||||
|
FileCreateUtils.deleteRollbackCommit(metaClient.getBasePath(), rollbackInstant.getTimestamp());
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
|
// create replace commit requested meta file so that rollback will not throw FileNotFoundException
|
||||||
|
// create file slice with instantTime 001 and build clustering plan including this created 001 file slice.
|
||||||
|
HoodieClusteringPlan clusteringPlan = ClusteringTestUtils.createClusteringPlan(metaClient, pendingClusteringInstant.getTimestamp(), "1");
|
||||||
|
// create requested replace commit
|
||||||
|
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
|
||||||
|
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
|
||||||
|
|
||||||
|
FileCreateUtils.createRequestedReplaceCommit(metaClient.getBasePath(), pendingClusteringInstant.getTimestamp(), Option.of(requestedReplaceMetadata));
|
||||||
|
|
||||||
|
// trigger clustering again. no new rollback instants should be generated.
|
||||||
|
try {
|
||||||
|
client.cluster(pendingClusteringInstant.getTimestamp(), false);
|
||||||
|
// new replace commit metadata generated is fake one. so, clustering will fail. but the intention of test is ot check for duplicate rollback instants.
|
||||||
|
} catch (Exception e) {
|
||||||
|
//ignore.
|
||||||
|
}
|
||||||
|
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
// verify that there is no new rollback instant generated
|
||||||
|
HoodieInstant newRollbackInstant = metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
|
||||||
|
assertEquals(rollbackInstant.getTimestamp(), newRollbackInstant.getTimestamp());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||||
|
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
@@ -180,6 +181,61 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRepeatedRollbackOfCompaction() throws Exception {
|
||||||
|
boolean scheduleInlineCompaction = false;
|
||||||
|
HoodieFileFormat fileFormat = HoodieFileFormat.PARQUET;
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), fileFormat.toString());
|
||||||
|
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
|
||||||
|
|
||||||
|
HoodieWriteConfig cfg = getConfigBuilder(false)
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
|
||||||
|
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
|
||||||
|
.build();
|
||||||
|
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
|
||||||
|
|
||||||
|
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
|
||||||
|
/*
|
||||||
|
* Write 1 (only inserts)
|
||||||
|
*/
|
||||||
|
String newCommitTime = "001";
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||||
|
Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records, client, cfg, newCommitTime, true);
|
||||||
|
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Write 2 (updates)
|
||||||
|
*/
|
||||||
|
newCommitTime = "004";
|
||||||
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
records = dataGen.generateUpdates(newCommitTime, 100);
|
||||||
|
updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, true);
|
||||||
|
|
||||||
|
Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
|
||||||
|
client.compact(compactionInstant.get());
|
||||||
|
|
||||||
|
// trigger compaction again.
|
||||||
|
client.compact(compactionInstant.get());
|
||||||
|
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
// verify that there is no new rollback instant generated
|
||||||
|
HoodieInstant rollbackInstant = metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
|
||||||
|
FileCreateUtils.deleteRollbackCommit(metaClient.getBasePath().substring(metaClient.getBasePath().indexOf(":") + 1),
|
||||||
|
rollbackInstant.getTimestamp());
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
|
||||||
|
// trigger compaction again.
|
||||||
|
client1.compact(compactionInstant.get());
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
// verify that there is no new rollback instant generated
|
||||||
|
HoodieInstant newRollbackInstant = metaClient.getActiveTimeline().getRollbackTimeline().lastInstant().get();
|
||||||
|
assertEquals(rollbackInstant.getTimestamp(), newRollbackInstant.getTimestamp());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = {true, false})
|
@ValueSource(booleans = {true, false})
|
||||||
public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception {
|
public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception {
|
||||||
|
|||||||
Reference in New Issue
Block a user