1
0

[HUDI-3799] Fixing not deleting empty instants w/o archiving (#5261)

This commit is contained in:
Sivabalan Narayanan
2022-04-11 21:02:43 -07:00
committed by GitHub
parent 3d8fc78c66
commit f91e9e63e1
7 changed files with 88 additions and 22 deletions

View File

@@ -588,19 +588,16 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
try {
if (table.getActiveTimeline().isEmpty(hoodieInstant)
&& (
hoodieInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)
|| (hoodieInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && hoodieInstant.isCompleted())
)
) {
table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
deleteAnyLeftOverMarkers(context, hoodieInstant);
// in local FS and HDFS, there could be empty completed instants due to crash.
if (table.getActiveTimeline().isEmpty(hoodieInstant) && hoodieInstant.isCompleted()) {
// lets add an entry to the archival, even if not for the plan.
records.add(createAvroRecordFromEmptyInstant(hoodieInstant));
} else {
deleteAnyLeftOverMarkers(context, hoodieInstant);
records.add(convertToAvroRecord(hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
}
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
} catch (Exception e) {
LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e);
@@ -637,4 +634,8 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
throws IOException {
return MetadataConversionUtils.createMetaWrapper(hoodieInstant, metaClient);
}
private IndexedRecord createAvroRecordFromEmptyInstant(HoodieInstant hoodieInstant) throws IOException {
return MetadataConversionUtils.createMetaWrapperForEmptyInstant(hoodieInstant);
}
}

View File

@@ -125,6 +125,46 @@ public class MetadataConversionUtils {
return archivedMetaWrapper;
}
public static HoodieArchivedMetaEntry createMetaWrapperForEmptyInstant(HoodieInstant hoodieInstant) throws IOException {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
switch (hoodieInstant.getAction()) {
case HoodieTimeline.CLEAN_ACTION: {
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
case HoodieTimeline.COMMIT_ACTION: {
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
}
case HoodieTimeline.DELTA_COMMIT_ACTION: {
archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
break;
}
case HoodieTimeline.REPLACE_COMMIT_ACTION: {
archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
break;
}
case HoodieTimeline.ROLLBACK_ACTION: {
archivedMetaWrapper.setActionType(ActionType.rollback.name());
break;
}
case HoodieTimeline.SAVEPOINT_ACTION: {
archivedMetaWrapper.setActionType(ActionType.savepoint.name());
break;
}
case HoodieTimeline.COMPACTION_ACTION: {
archivedMetaWrapper.setActionType(ActionType.compaction.name());
break;
}
default: {
throw new UnsupportedOperationException("Action not fully supported yet");
}
}
return archivedMetaWrapper;
}
public static Option<HoodieCommitMetadata> getInflightReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant) throws IOException {
Option<byte[]> inflightContent = metaClient.getActiveTimeline().getInstantDetails(instant);
if (!inflightContent.isPresent() || inflightContent.get().length == 0) {