From aa11989eade1bd268d83e6aafe15f2ea60be67d2 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Wed, 11 Aug 2021 07:39:48 -0700 Subject: [PATCH] [HUDI-2286] Handle the case of failed deltacommit on the metadata table. (#3428) A failed deltacommit on the metadata table will be automatically rolled back. Assuming the failed commit was "t10", the rollback will happen the next time at "t11". Post rollback, when we try to sync the dataset to the metadata table, we should look for all unsynched instants including t11. Current code ignores t11 since the latest commit timestamp on metadata table is t11 (due to rollback). --- .../functional/TestHoodieBackedMetadata.java | 58 +++++++++++++++++++ .../metadata/HoodieBackedTableMetadata.java | 8 ++- .../common/testutils/FileCreateUtils.java | 24 ++++++++ 3 files changed, 87 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 513bac14b..e12c8bc29 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -35,10 +35,12 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.HoodieTimer; @@ -857,6 +859,62 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { validateMetadata(unsyncedClient); } + /** + * Test that failure to perform deltacommit on the metadata table does not lead to missed sync. + */ + @Test + public void testMetdataTableCommitFailure() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + // Write 1 + String newCommitTime = "001"; + List records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + + // Write 2 + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateInserts(newCommitTime, 20); + writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + } + + // At this time both commits 001 and 002 must be synced to the metadata table + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieActiveTimeline timeline = metadataMetaClient.getActiveTimeline(); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); + + // Delete the 002 deltacommit completed instant to make it inflight + FileCreateUtils.deleteDeltaCommit(metadataTableBasePath, "002"); + timeline = metadataMetaClient.reloadActiveTimeline(); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); + assertTrue(timeline.containsInstant(new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); + + // In this commit deltacommit "002" will be rolled back and attempted again. + String latestCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + String newCommitTime = "003"; + List records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + + records = dataGen.generateInserts(latestCommitTime, 20); + client.startCommitWithTime(latestCommitTime); + List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), latestCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + } + + timeline = metadataMetaClient.reloadActiveTimeline(); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, latestCommitTime))); + assertTrue(timeline.getRollbackTimeline().countInstants() == 1); + } /** * Validate the metadata tables contents to ensure it matches what is on the file system. diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 1f0338368..5234c4d89 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -299,9 +299,11 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { return Collections.EMPTY_LIST; } - // All instants on the data timeline, which are greater than the last instant on metadata timeline - // are candidates for sync. - String latestMetadataInstantTime = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); + // All instants on the data timeline, which are greater than the last deltacommit instant on metadata timeline + // are candidates for sync. We only consider delta-commit instants as each actions on dataset leads to a + // deltacommit on the metadata table. + String latestMetadataInstantTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() + .lastInstant().get().getTimestamp(); HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE); Option earliestIncompleteInstant = ignoreIncompleteInstants ? Option.empty() : candidateTimeline.filterInflightsAndRequested().firstInstant(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 6bf2e9fab..960ed2813 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -274,6 +274,30 @@ public class FileCreateUtils { return markerFilePath.toAbsolutePath().toString(); } + private static void removeMetaFile(String basePath, String instantTime, String suffix) throws IOException { + Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + Path metaFilePath = parentPath.resolve(instantTime + suffix); + if (Files.exists(metaFilePath)) { + Files.delete(metaFilePath); + } + } + + public static void deleteCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); + } + + public static void deleteRequestedCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMMIT_EXTENSION); + } + + public static void deleteInflightCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION); + } + + public static void deleteDeltaCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION); + } + public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException { Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); if (Files.notExists(parentPath)) {