[HUDI-1118] Cleanup rollback files residing in .hoodie folder (#2205)
This commit is contained in:
@@ -48,6 +48,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|||||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||||
import org.apache.hudi.common.util.CleanerUtils;
|
import org.apache.hudi.common.util.CleanerUtils;
|
||||||
|
import org.apache.hudi.common.util.CollectionUtils;
|
||||||
import org.apache.hudi.common.util.CompactionUtils;
|
import org.apache.hudi.common.util.CompactionUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
@@ -62,7 +63,6 @@ import java.io.FileNotFoundException;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -147,7 +147,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
|||||||
|
|
||||||
private Stream<HoodieInstant> getCleanInstantsToArchive() {
|
private Stream<HoodieInstant> getCleanInstantsToArchive() {
|
||||||
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
|
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
|
||||||
.getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
|
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants();
|
||||||
return cleanAndRollbackTimeline.getInstants()
|
return cleanAndRollbackTimeline.getInstants()
|
||||||
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
|
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
|
||||||
.map(hoodieInstants -> {
|
.map(hoodieInstants -> {
|
||||||
@@ -187,7 +187,6 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Stream<HoodieInstant> getInstantsToArchive() {
|
private Stream<HoodieInstant> getInstantsToArchive() {
|
||||||
// TODO: Handle ROLLBACK_ACTION in future
|
|
||||||
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
|
Stream<HoodieInstant> instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
|
||||||
|
|
||||||
// For archiving and cleaning instants, we need to include intermediate state files if they exist
|
// For archiving and cleaning instants, we need to include intermediate state files if they exist
|
||||||
|
|||||||
Reference in New Issue
Block a user