[HUDI-2675] Fix the exception 'Not an Avro data file' when archive and clean (#4016)
This commit is contained in:
@@ -156,7 +156,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
|
||||
private Stream<HoodieInstant> getCommitInstantsToArchive() {
|
||||
// TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
|
||||
// with logic above to avoid Stream.concats
|
||||
// with logic above to avoid Stream.concat
|
||||
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
|
||||
|
||||
Option<HoodieInstant> oldestPendingCompactionAndReplaceInstant = table.getActiveTimeline()
|
||||
@@ -176,7 +176,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
// Actually do the commits
|
||||
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
|
||||
.filter(s -> {
|
||||
// if no savepoint present, then dont filter
|
||||
// if no savepoint present, then don't filter
|
||||
return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
|
||||
}).filter(s -> {
|
||||
// Ensure commits >= oldest pending compaction commit is retained
|
||||
@@ -233,9 +233,9 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
|
||||
LOG.info("Deleting instants " + archivedInstants);
|
||||
boolean success = true;
|
||||
List<String> instantFiles = archivedInstants.stream().map(archivedInstant -> {
|
||||
return new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
|
||||
}).map(Path::toString).collect(Collectors.toList());
|
||||
List<String> instantFiles = archivedInstants.stream().map(archivedInstant ->
|
||||
new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
|
||||
).map(Path::toString).collect(Collectors.toList());
|
||||
|
||||
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
|
||||
Map<String, Boolean> resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context,
|
||||
@@ -265,7 +265,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
|
||||
LOG.info("Latest Committed Instant=" + latestCommitted);
|
||||
if (latestCommitted.isPresent()) {
|
||||
success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get());
|
||||
success &= deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
|
||||
}
|
||||
return success;
|
||||
}
|
||||
@@ -277,7 +277,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
* @return success if all eligible file deleted successfully
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) throws IOException {
|
||||
private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) throws IOException {
|
||||
List<HoodieInstant> instants = null;
|
||||
boolean success = true;
|
||||
try {
|
||||
@@ -291,12 +291,12 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
* On some FSs deletion of all files in the directory can auto remove the directory itself.
|
||||
* GCS is one example, as it doesn't have real directories and subdirectories. When client
|
||||
* removes all the files from a "folder" on GCS is has to create a special "/" to keep the folder
|
||||
* around. If this doesn't happen (timeout, misconfigured client, ...) folder will be deleted and
|
||||
* around. If this doesn't happen (timeout, mis configured client, ...) folder will be deleted and
|
||||
* in this case we should not break when aux folder is not found.
|
||||
* GCS information: (https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork)
|
||||
*/
|
||||
LOG.warn("Aux path not found. Skipping: " + metaClient.getMetaAuxiliaryPath());
|
||||
return success;
|
||||
return true;
|
||||
}
|
||||
|
||||
List<HoodieInstant> instantsToBeDeleted =
|
||||
@@ -308,7 +308,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName());
|
||||
if (metaClient.getFs().exists(metaFile)) {
|
||||
success &= metaClient.getFs().delete(metaFile, false);
|
||||
LOG.info("Deleted instant file in auxiliary metapath : " + metaFile);
|
||||
LOG.info("Deleted instant file in auxiliary meta path : " + metaFile);
|
||||
}
|
||||
}
|
||||
return success;
|
||||
@@ -321,10 +321,19 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
List<IndexedRecord> records = new ArrayList<>();
|
||||
for (HoodieInstant hoodieInstant : instants) {
|
||||
try {
|
||||
deleteAnyLeftOverMarkers(context, hoodieInstant);
|
||||
records.add(convertToAvroRecord(hoodieInstant));
|
||||
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
|
||||
writeToFile(wrapperSchema, records);
|
||||
if (table.getActiveTimeline().isEmpty(hoodieInstant)
|
||||
&& (
|
||||
hoodieInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)
|
||||
|| (hoodieInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && hoodieInstant.isCompleted())
|
||||
)
|
||||
) {
|
||||
table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
|
||||
} else {
|
||||
deleteAnyLeftOverMarkers(context, hoodieInstant);
|
||||
records.add(convertToAvroRecord(hoodieInstant));
|
||||
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
|
||||
writeToFile(wrapperSchema, records);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e);
|
||||
|
||||
@@ -230,11 +230,15 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
|
||||
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
|
||||
if (pendingCleanInstants.size() > 0) {
|
||||
pendingCleanInstants.forEach(hoodieInstant -> {
|
||||
LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
|
||||
try {
|
||||
cleanMetadataList.add(runPendingClean(table, hoodieInstant));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
|
||||
if (table.getCleanTimeline().isEmpty(hoodieInstant)) {
|
||||
table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
|
||||
} else {
|
||||
LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
|
||||
try {
|
||||
cleanMetadataList.add(runPendingClean(table, hoodieInstant));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
table.getMetaClient().reloadActiveTimeline();
|
||||
|
||||
@@ -146,11 +146,15 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
if (config.incrementalCleanerModeEnabled()) {
|
||||
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
|
||||
if (lastClean.isPresent()) {
|
||||
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
|
||||
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
|
||||
if ((cleanMetadata.getEarliestCommitToRetain() != null)
|
||||
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
|
||||
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
|
||||
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
|
||||
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
|
||||
} else {
|
||||
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
|
||||
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
|
||||
if ((cleanMetadata.getEarliestCommitToRetain() != null)
|
||||
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
|
||||
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user