1
0

[HUDI-3848] Fixing restore with cleaned up commits (#5288)

This commit is contained in:
Sivabalan Narayanan
2022-04-15 14:47:53 -04:00
committed by GitHub
parent 9e8664f4d2
commit 57612c5c32
2 changed files with 97 additions and 1 deletions

View File

@@ -239,7 +239,15 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu
SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath);
return fs.listStatus(filePaths, pathFilter);
return fs.listStatus(Arrays.stream(filePaths).filter(entry -> {
try {
return fs.exists(entry);
} catch (IOException e) {
LOG.error("Exists check failed for " + entry.toString(), e);
}
// if IOException is thrown, do not ignore. lets try to add the file of interest to be deleted. we can't miss any files to be rolled back.
return false;
}).toArray(Path[]::new), pathFilter);
}
private FileStatus[] fetchFilesFromListFiles(HoodieInstant instantToRollback, String partitionPath, String basePath,

View File

@@ -500,6 +500,94 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
}
}
@Test
void testRestoreWithCleanedUpCommits() throws Exception {
boolean populateMetaFields = true;
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
// Timeline-server-based markers are not used for multi-rollback tests
.withMarkersType(MarkerType.DIRECT.name());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();
Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
/*
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
client.commit(newCommitTime, jsc().parallelize(statuses));
upsertRecords(client, "002", records, dataGen);
client.savepoint("002","user1","comment1");
upsertRecords(client, "003", records, dataGen);
upsertRecords(client, "004", records, dataGen);
// Compaction commit
String compactionInstantTime = "006";
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(compactionInstantTime);
client.commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
upsertRecords(client, "007", records, dataGen);
upsertRecords(client, "008", records, dataGen);
// Compaction commit
String compactionInstantTime1 = "009";
client.scheduleCompactionAtInstant(compactionInstantTime1, Option.empty());
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata1 = client.compact(compactionInstantTime1);
client.commitCompaction(compactionInstantTime1, compactionMetadata1.getCommitMetadata().get(), Option.empty());
upsertRecords(client, "010", records, dataGen);
// trigger clean. creating a new client with aggresive cleaner configs so that clean will kick in immediately.
cfgBuilder = getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).build())
// Timeline-server-based markers are not used for multi-rollback tests
.withMarkersType(MarkerType.DIRECT.name());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg1 = cfgBuilder.build();
final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg1);
client1.clean();
client1.close();
metaClient = HoodieTableMetaClient.reload(metaClient);
upsertRecords(client, "011", records, dataGen);
// Rollback to 002
client.restoreToInstant("002", cfg.isMetadataTableEnabled());
// verify that no files are present after 002. every data file should have been cleaned up
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.anyMatch(file -> HoodieTimeline.compareTimestamps("002", HoodieTimeline.GREATER_THAN, file.getCommitTime())));
}
}
private void upsertRecords(SparkRDDWriteClient client, String commitTime, List<HoodieRecord> records, HoodieTestDataGenerator dataGen) throws IOException {
client.startCommitWithTime(commitTime);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime, copyOfRecords);
List<WriteStatus> statuses = client.upsert(jsc().parallelize(copyOfRecords, 1), commitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
client.commit(commitTime, jsc().parallelize(statuses));
}
private long getTotalRecordsWritten(HoodieCommitMetadata commitMetadata) {
return commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream)