From 57612c5c323a2caab5e9c25d2b6617b0105d38e8 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 15 Apr 2022 14:47:53 -0400 Subject: [PATCH] [HUDI-3848] Fixing restore with cleaned up commits (#5288) --- .../ListingBasedRollbackStrategy.java | 10 ++- ...stHoodieSparkMergeOnReadTableRollback.java | 88 +++++++++++++++++++ 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index ed3779860..e3159abad 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -239,7 +239,15 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp()); Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath); - return fs.listStatus(filePaths, pathFilter); + return fs.listStatus(Arrays.stream(filePaths).filter(entry -> { + try { + return fs.exists(entry); + } catch (IOException e) { + LOG.error("Exists check failed for " + entry.toString(), e); + } + // if IOException is thrown, do not ignore. lets try to add the file of interest to be deleted. we can't miss any files to be rolled back. + return false; + }).toArray(Path[]::new), pathFilter); } private FileStatus[] fetchFilesFromListFiles(HoodieInstant instantToRollback, String partitionPath, String basePath, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 339e9e119..043697f66 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -500,6 +500,94 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction } } + @Test + void testRestoreWithCleanedUpCommits() throws Exception { + boolean populateMetaFields = true; + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false) + // Timeline-server-based markers are not used for multi-rollback tests + .withMarkersType(MarkerType.DIRECT.name()); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg = cfgBuilder.build(); + + Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + /* + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc().parallelize(records, 1); + JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + List statuses = writeStatusJavaRDD.collect(); + assertNoWriteErrors(statuses); + client.commit(newCommitTime, jsc().parallelize(statuses)); + + upsertRecords(client, "002", records, dataGen); + + client.savepoint("002","user1","comment1"); + + upsertRecords(client, "003", records, dataGen); + upsertRecords(client, "004", records, dataGen); + + // Compaction commit + String compactionInstantTime = "006"; + client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + HoodieWriteMetadata> compactionMetadata = client.compact(compactionInstantTime); + client.commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); + + upsertRecords(client, "007", records, dataGen); + upsertRecords(client, "008", records, dataGen); + + // Compaction commit + String compactionInstantTime1 = "009"; + client.scheduleCompactionAtInstant(compactionInstantTime1, Option.empty()); + HoodieWriteMetadata> compactionMetadata1 = client.compact(compactionInstantTime1); + client.commitCompaction(compactionInstantTime1, compactionMetadata1.getCommitMetadata().get(), Option.empty()); + + upsertRecords(client, "010", records, dataGen); + + // trigger clean. creating a new client with aggresive cleaner configs so that clean will kick in immediately. + cfgBuilder = getConfigBuilder(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).build()) + // Timeline-server-based markers are not used for multi-rollback tests + .withMarkersType(MarkerType.DIRECT.name()); + addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); + HoodieWriteConfig cfg1 = cfgBuilder.build(); + final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg1); + client1.clean(); + client1.close(); + + metaClient = HoodieTableMetaClient.reload(metaClient); + upsertRecords(client, "011", records, dataGen); + + // Rollback to 002 + client.restoreToInstant("002", cfg.isMetadataTableEnabled()); + + // verify that no files are present after 002. every data file should have been cleaned up + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = tableView.getLatestBaseFiles(); + assertFalse(dataFilesToRead.anyMatch(file -> HoodieTimeline.compareTimestamps("002", HoodieTimeline.GREATER_THAN, file.getCommitTime()))); + } + } + + private void upsertRecords(SparkRDDWriteClient client, String commitTime, List records, HoodieTestDataGenerator dataGen) throws IOException { + client.startCommitWithTime(commitTime); + List copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(commitTime, copyOfRecords); + List statuses = client.upsert(jsc().parallelize(copyOfRecords, 1), commitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + client.commit(commitTime, jsc().parallelize(statuses)); + } + private long getTotalRecordsWritten(HoodieCommitMetadata commitMetadata) { return commitMetadata.getPartitionToWriteStats().values().stream() .flatMap(Collection::stream)