1
0

[HUDI-2796] Metadata table support for Restore action to first commit (#4039)

- Adding support for the metadata table to restore to first commit and
   take proper action for the bootstrap on subequent commits.
This commit is contained in:
Manoj Govindassamy
2021-11-19 17:02:57 -08:00
committed by GitHub
parent c8617d9390
commit 0230d40b74
3 changed files with 59 additions and 18 deletions

View File

@@ -353,21 +353,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
return false; return false;
} }
boolean isRollbackAction = false; // Detect the commit gaps if any from the data and the metadata active timeline
List<String> rollbackedTimestamps = Collections.emptyList();
if (actionMetadata.isPresent() && actionMetadata.get() instanceof HoodieRollbackMetadata) {
isRollbackAction = true;
List<HoodieInstantInfo> rollbackedInstants =
((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback();
rollbackedTimestamps = rollbackedInstants.stream().map(instant -> {
return instant.getCommitTime().toString();
}).collect(Collectors.toList());
}
if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts( if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
latestMetadataInstant.get().getTimestamp()) latestMetadataInstant.get().getTimestamp())
&& (!isRollbackAction || !rollbackedTimestamps.contains(latestMetadataInstantTimestamp))) { && !isCommitRevertedByInFlightAction(actionMetadata, latestMetadataInstantTimestamp)) {
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived." LOG.error("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
+ ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()); + ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
return true; return true;
@@ -376,6 +366,55 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
return false; return false;
} }
/**
* Is the latest commit instant reverted by the in-flight instant action?
*
* @param actionMetadata - In-flight instant action metadata
* @param latestMetadataInstantTimestamp - Metadata table latest instant timestamp
* @param <T> - ActionMetadata type
* @return True if the latest instant action is reverted by the action
*/
private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction(Option<T> actionMetadata,
final String latestMetadataInstantTimestamp) {
if (!actionMetadata.isPresent()) {
return false;
}
final String INSTANT_ACTION = (actionMetadata.get() instanceof HoodieRollbackMetadata
? HoodieTimeline.ROLLBACK_ACTION
: (actionMetadata.get() instanceof HoodieRestoreMetadata ? HoodieTimeline.RESTORE_ACTION : ""));
List<String> affectedInstantTimestamps;
switch (INSTANT_ACTION) {
case HoodieTimeline.ROLLBACK_ACTION:
List<HoodieInstantInfo> rollbackedInstants =
((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback();
affectedInstantTimestamps = rollbackedInstants.stream().map(instant -> {
return instant.getCommitTime().toString();
}).collect(Collectors.toList());
if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) {
return true;
}
break;
case HoodieTimeline.RESTORE_ACTION:
List<HoodieInstantInfo> restoredInstants =
((HoodieRestoreMetadata) actionMetadata.get()).getRestoreInstantInfo();
affectedInstantTimestamps = restoredInstants.stream().map(instant -> {
return instant.getCommitTime().toString();
}).collect(Collectors.toList());
if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) {
return true;
}
break;
default:
return false;
}
return false;
}
/** /**
* Initialize the Metadata Table by listing files and partitions from the file system. * Initialize the Metadata Table by listing files and partitions from the file system.
* *

View File

@@ -81,6 +81,6 @@ public abstract class BaseActionExecutor<T extends HoodieRecordPayload, I, K, O,
* @param metadata restore metadata of interest. * @param metadata restore metadata of interest.
*/ */
protected final void writeTableMetadata(HoodieRestoreMetadata metadata) { protected final void writeTableMetadata(HoodieRestoreMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime)); table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
} }
} }

View File

@@ -139,7 +139,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception { void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE); HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
addConfigsForPopulateMetaFields(cfgBuilder, true); addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build(); HoodieWriteConfig cfg = cfgBuilder.build();
@@ -294,7 +295,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
@Test @Test
void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
boolean populateMetaFields = true; boolean populateMetaFields = true;
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()); HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build(); HoodieWriteConfig cfg = cfgBuilder.build();
@@ -344,7 +346,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
newCommitTime = "002"; newCommitTime = "002";
// WriteClient with custom config (disable small file handling) // WriteClient with custom config (disable small file handling)
HoodieWriteConfig smallFileWriteConfig = getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields) HoodieWriteConfig smallFileWriteConfig = getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build(); .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
try (SparkRDDWriteClient nClient = getHoodieWriteClient(smallFileWriteConfig)) { try (SparkRDDWriteClient nClient = getHoodieWriteClient(smallFileWriteConfig)) {
nClient.startCommitWithTime(newCommitTime); nClient.startCommitWithTime(newCommitTime);