[HUDI-1644] Do not delete older rollback instants as part of rollback. Archival can take care of removing old instants cleanly (#2610)
This commit is contained in:
@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.restore;
|
|||||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
@@ -96,17 +95,6 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
|
|||||||
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
|
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
|
||||||
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
|
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
|
||||||
LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime);
|
LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime);
|
||||||
|
|
||||||
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
|
|
||||||
LOG.info("Cleaning up older restore meta files");
|
|
||||||
// Cleanup of older cleaner meta files
|
|
||||||
// TODO - make the commit archival generic and archive rollback metadata
|
|
||||||
FSUtils.deleteOlderRollbackMetaFiles(
|
|
||||||
table.getMetaClient().getFs(),
|
|
||||||
table.getMetaClient().getMetaPath(),
|
|
||||||
table.getActiveTimeline().getRestoreTimeline().getInstants()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return restoreMetadata;
|
return restoreMetadata;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
|
|||||||
import org.apache.hudi.common.HoodieRollbackStat;
|
import org.apache.hudi.common.HoodieRollbackStat;
|
||||||
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
|
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
@@ -38,7 +37,6 @@ import org.apache.hudi.exception.HoodieRollbackException;
|
|||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.table.MarkerFiles;
|
import org.apache.hudi.table.MarkerFiles;
|
||||||
import org.apache.hudi.table.action.BaseActionExecutor;
|
import org.apache.hudi.table.action.BaseActionExecutor;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@@ -201,12 +199,6 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
|
|||||||
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, instantTime),
|
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, instantTime),
|
||||||
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
|
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
|
||||||
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
|
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
|
||||||
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
|
|
||||||
LOG.info("Cleaning up older rollback meta files");
|
|
||||||
FSUtils.deleteOlderRollbackMetaFiles(table.getMetaClient().getFs(),
|
|
||||||
table.getMetaClient().getMetaPath(),
|
|
||||||
table.getActiveTimeline().getRollbackTimeline().getInstants());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
|
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -505,30 +505,6 @@ public class FSUtils {
|
|||||||
return recovered;
|
return recovered;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void deleteOlderCleanMetaFiles(FileSystem fs, String metaPath, Stream<HoodieInstant> instants) {
|
|
||||||
// TODO - this should be archived when archival is made general for all meta-data
|
|
||||||
// skip MIN_CLEAN_TO_KEEP and delete rest
|
|
||||||
instants.skip(MIN_CLEAN_TO_KEEP).forEach(s -> {
|
|
||||||
try {
|
|
||||||
fs.delete(new Path(metaPath, s.getFileName()), false);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Could not delete clean meta files" + s.getFileName(), e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void deleteOlderRollbackMetaFiles(FileSystem fs, String metaPath, Stream<HoodieInstant> instants) {
|
|
||||||
// TODO - this should be archived when archival is made general for all meta-data
|
|
||||||
// skip MIN_ROLLBACK_TO_KEEP and delete rest
|
|
||||||
instants.skip(MIN_ROLLBACK_TO_KEEP).forEach(s -> {
|
|
||||||
try {
|
|
||||||
fs.delete(new Path(metaPath, s.getFileName()), false);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Could not delete rollback meta files " + s.getFileName(), e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) {
|
public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) {
|
||||||
try {
|
try {
|
||||||
LOG.warn("try to delete instant file: " + instant);
|
LOG.warn("try to delete instant file: " + instant);
|
||||||
|
|||||||
@@ -18,22 +18,18 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.fs;
|
package org.apache.hudi.common.fs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|
||||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.contrib.java.lang.system.EnvironmentVariables;
|
import org.junit.contrib.java.lang.system.EnvironmentVariables;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
@@ -48,7 +44,6 @@ import java.util.stream.Stream;
|
|||||||
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
|
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@@ -273,51 +268,6 @@ public class TestFSUtils extends HoodieCommonTestHarness {
|
|||||||
return "." + String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
|
return "." + String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDeleteOlderRollbackFiles() throws Exception {
|
|
||||||
String[] instantTimes = new String[]{"20160501010101", "20160501020101", "20160501030101", "20160501040101",
|
|
||||||
"20160502020601", "20160502030601", "20160502040601", "20160502050601", "20160506030611",
|
|
||||||
"20160506040611", "20160506050611", "20160506060611"};
|
|
||||||
List<HoodieInstant> hoodieInstants = new ArrayList<>();
|
|
||||||
// create rollback files
|
|
||||||
for (String instantTime : instantTimes) {
|
|
||||||
Files.createFile(Paths.get(basePath,
|
|
||||||
HoodieTableMetaClient.METAFOLDER_NAME,
|
|
||||||
instantTime + HoodieTimeline.ROLLBACK_EXTENSION));
|
|
||||||
hoodieInstants.add(new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, instantTime));
|
|
||||||
}
|
|
||||||
|
|
||||||
String metaPath = Paths.get(basePath, ".hoodie").toString();
|
|
||||||
FSUtils.deleteOlderRollbackMetaFiles(FSUtils.getFs(basePath, new Configuration()),
|
|
||||||
metaPath, hoodieInstants.stream());
|
|
||||||
File[] rollbackFiles = new File(metaPath).listFiles((dir, name)
|
|
||||||
-> name.contains(HoodieTimeline.ROLLBACK_EXTENSION));
|
|
||||||
assertNotNull(rollbackFiles);
|
|
||||||
assertEquals(rollbackFiles.length, minRollbackToKeep);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDeleteOlderCleanMetaFiles() throws Exception {
|
|
||||||
String[] instantTimes = new String[]{"20160501010101", "20160501020101", "20160501030101", "20160501040101",
|
|
||||||
"20160502020601", "20160502030601", "20160502040601", "20160502050601", "20160506030611",
|
|
||||||
"20160506040611", "20160506050611", "20160506060611"};
|
|
||||||
List<HoodieInstant> hoodieInstants = new ArrayList<>();
|
|
||||||
// create rollback files
|
|
||||||
for (String instantTime : instantTimes) {
|
|
||||||
Files.createFile(Paths.get(basePath,
|
|
||||||
HoodieTableMetaClient.METAFOLDER_NAME,
|
|
||||||
instantTime + HoodieTimeline.CLEAN_EXTENSION));
|
|
||||||
hoodieInstants.add(new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, instantTime));
|
|
||||||
}
|
|
||||||
String metaPath = Paths.get(basePath, ".hoodie").toString();
|
|
||||||
FSUtils.deleteOlderCleanMetaFiles(FSUtils.getFs(basePath, new Configuration()),
|
|
||||||
metaPath, hoodieInstants.stream());
|
|
||||||
File[] cleanFiles = new File(metaPath).listFiles((dir, name)
|
|
||||||
-> name.contains(HoodieTimeline.CLEAN_EXTENSION));
|
|
||||||
assertNotNull(cleanFiles);
|
|
||||||
assertEquals(cleanFiles.length, minCleanToKeep);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileNameRelatedFunctions() throws Exception {
|
public void testFileNameRelatedFunctions() throws Exception {
|
||||||
String instantTime = "20160501010101";
|
String instantTime = "20160501010101";
|
||||||
|
|||||||
Reference in New Issue
Block a user