diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 513bac14b..e12c8bc29 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -35,10 +35,12 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.HoodieTimer; @@ -857,6 +859,62 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { validateMetadata(unsyncedClient); } + /** + * Test that failure to perform deltacommit on the metadata table does not lead to missed sync. + */ + @Test + public void testMetdataTableCommitFailure() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + // Write 1 + String newCommitTime = "001"; + List records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + + // Write 2 + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateInserts(newCommitTime, 20); + writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + } + + // At this time both commits 001 and 002 must be synced to the metadata table + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieActiveTimeline timeline = metadataMetaClient.getActiveTimeline(); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); + + // Delete the 002 deltacommit completed instant to make it inflight + FileCreateUtils.deleteDeltaCommit(metadataTableBasePath, "002"); + timeline = metadataMetaClient.reloadActiveTimeline(); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); + assertTrue(timeline.containsInstant(new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); + + // In this commit deltacommit "002" will be rolled back and attempted again. + String latestCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + String newCommitTime = "003"; + List records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + + records = dataGen.generateInserts(latestCommitTime, 20); + client.startCommitWithTime(latestCommitTime); + List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), latestCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + } + + timeline = metadataMetaClient.reloadActiveTimeline(); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002"))); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, latestCommitTime))); + assertTrue(timeline.getRollbackTimeline().countInstants() == 1); + } /** * Validate the metadata tables contents to ensure it matches what is on the file system. diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 1f0338368..5234c4d89 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -299,9 +299,11 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { return Collections.EMPTY_LIST; } - // All instants on the data timeline, which are greater than the last instant on metadata timeline - // are candidates for sync. - String latestMetadataInstantTime = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); + // All instants on the data timeline, which are greater than the last deltacommit instant on metadata timeline + // are candidates for sync. We only consider delta-commit instants as each actions on dataset leads to a + // deltacommit on the metadata table. + String latestMetadataInstantTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() + .lastInstant().get().getTimestamp(); HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE); Option earliestIncompleteInstant = ignoreIncompleteInstants ? Option.empty() : candidateTimeline.filterInflightsAndRequested().firstInstant(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 6bf2e9fab..960ed2813 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -274,6 +274,30 @@ public class FileCreateUtils { return markerFilePath.toAbsolutePath().toString(); } + private static void removeMetaFile(String basePath, String instantTime, String suffix) throws IOException { + Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + Path metaFilePath = parentPath.resolve(instantTime + suffix); + if (Files.exists(metaFilePath)) { + Files.delete(metaFilePath); + } + } + + public static void deleteCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION); + } + + public static void deleteRequestedCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMMIT_EXTENSION); + } + + public static void deleteInflightCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION); + } + + public static void deleteDeltaCommit(String basePath, String instantTime) throws IOException { + removeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION); + } + public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException { Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); if (Files.notExists(parentPath)) {