From 98ec21507932ee942ca643c88ba9049746d44059 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sat, 8 Jan 2022 10:34:47 -0500 Subject: [PATCH] [HUDI-3178] Fixing metadata table compaction so as to not include uncommitted data (#4530) - There is a chance that the actual write eventually failed in data table but commit was successful in Metadata table, and if compaction was triggered in MDT, compaction could have included the uncommitted data. But once compacted, it may never be ignored while reading from metadata table. So, this patch fixes the bug. Metadata table compaction is triggered before applying the commit to metadata table to circumvent this issue. --- .../HoodieBackedTableMetadataWriter.java | 2 +- .../SparkHoodieBackedTableMetadataWriter.java | 8 ++- .../functional/TestHoodieBackedMetadata.java | 23 +++---- .../hudi/io/TestHoodieTimelineArchiveLog.java | 66 ++++++++++--------- ...arkCopyOnWriteTableArchiveWithReplace.java | 6 +- 5 files changed, 59 insertions(+), 46 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index f8a6154b6..a379b75ea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -689,7 +689,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() .get().getTimestamp(); List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() - .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList()); + .findInstantsBefore(instantTime).getInstants().collect(Collectors.toList()); if (!pendingInstants.isEmpty()) { LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s", diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 5329751b2..ccb258a8c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -128,6 +128,13 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad JavaRDD recordRDD = prepRecords(records, partitionName, 1); try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { + if (canTriggerTableService) { + // trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table, + // we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata + // table (since reader will filter out only from delta commits) + compactIfNecessary(writeClient, instantTime); + } + if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { // if this is a new commit being applied to metadata for the first time writeClient.startCommitWithTime(instantTime); @@ -153,7 +160,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // reload timeline metadataMetaClient.reloadActiveTimeline(); if (canTriggerTableService) { - compactIfNecessary(writeClient, instantTime); cleanIfNecessary(writeClient, instantTime); writeClient.archive(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 2051696ab..e3db3914a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -289,15 +289,15 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { for (; i <= 2; i++) { doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); } - // expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction. + // expected num commits = 1 (bootstrap) + 2 (writes) HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3); // trigger a async table service, archival should not kick in, even though conditions are met. doCluster(testTable, "000000" + commitTime.getAndIncrement()); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4); // trigger a regular write operation. archival should kick in. doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); @@ -371,7 +371,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // this should have triggered compaction in metadata table tableMetadata = metadata(writeConfig, context); assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); - assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); + assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001"); } @@ -402,7 +402,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { HoodieTableMetadata tableMetadata = metadata(writeConfig, context); assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); - assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); + assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001"); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig); @@ -447,6 +447,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // this new write is expected to trigger metadata table compaction String commitInstant = "0000002"; doWriteOperation(testTable, commitInstant, INSERT); + doWriteOperation(testTable, "0000003", INSERT); HoodieTableMetadata tableMetadata = metadata(writeConfig, context); String metadataCompactionInstant = commitInstant + "001"; @@ -467,7 +468,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { if (simulateFailedCompaction) { // this should retry the compaction in metadata table. - doWriteOperation(testTable, "0000003", INSERT); + doWriteOperation(testTable, "0000004", INSERT); } else { // let the compaction succeed in metadata and validation should succeed. FileCreateUtils.renameTempToMetaFile(tempFilePath, metaFilePath); @@ -476,8 +477,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(testTable); // add few more write and validate - doWriteOperation(testTable, "0000004", INSERT); - doWriteOperation(testTable, "0000005", UPSERT); + doWriteOperation(testTable, "0000005", INSERT); + doWriteOperation(testTable, "0000006", UPSERT); validateMetadata(testTable); if (simulateFailedCompaction) { @@ -496,13 +497,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(testTable); // this should retry the failed compaction in metadata table. - doWriteOperation(testTable, "0000006", INSERT); + doWriteOperation(testTable, "0000007", INSERT); validateMetadata(testTable); // add few more write and validate - doWriteOperation(testTable, "0000007", INSERT); - doWriteOperation(testTable, "0000008", UPSERT); + doWriteOperation(testTable, "0000008", INSERT); + doWriteOperation(testTable, "0000009", UPSERT); validateMetadata(testTable); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index faa06210d..64ad2d6f7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -301,21 +301,25 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); List originalCommits = commitsList.getKey(); List commitsAfterArchival = commitsList.getValue(); - if (i != 6) { + if (enableMetadata) { assertEquals(originalCommits, commitsAfterArchival); } else { - // on 6th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight. - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1); - for (int j = 1; j <= 6; j++) { - if (j == 1) { - // first commit should be archived - assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); - } else if (j == 2) { - // 2nd compaction should not be archived - assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j))); - } else { - // every other commit should not be archived - assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); + if (i != 6) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + // on 7th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight. + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1); + for (int j = 1; j <= 6; j++) { + if (j == 1) { + // first commit should be archived + assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); + } else if (j == 2) { + // 2nd compaction should not be archived + assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j))); + } else { + // every other commit should not be archived + assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); + } } } } @@ -578,37 +582,39 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { assertEquals(originalCommits, commitsAfterArchival); } - // one more commit will trigger compaction in metadata table and will let archival move forward. + // two more commits will trigger compaction in metadata table and will let archival move forward. testTable.doWriteOperation("00000006", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // trigger archival Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); List originalCommits = commitsList.getKey(); List commitsAfterArchival = commitsList.getValue(); - // before archival 1,2,3,4,5,6 - // after archival 5,6 - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 4); - verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004")), getActiveCommitInstants(Arrays.asList("00000005", "00000006")), commitsAfterArchival); + // before archival 1,2,3,4,5,6,7 + // after archival 6,7 + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 5); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005")), + getActiveCommitInstants(Arrays.asList("00000006", "00000007")), commitsAfterArchival); - // 3 more commits, 5 and 6 will be archived. but will not move after 6 since compaction has to kick in in metadata table. - testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // 3 more commits, 6 and 7 will be archived. but will not move after 6 since compaction has to kick in metadata table. testTable.doWriteOperation("00000008", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doWriteOperation("00000009", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // trigger archival commitsList = archiveAndGetCommitsList(writeConfig); originalCommits = commitsList.getKey(); commitsAfterArchival = commitsList.getValue(); assertEquals(originalCommits, commitsAfterArchival); - // ideally, this will archive commits 5, 6, 7, but since compaction in metadata is until 6, only 5 and 6 will get archived, - testTable.doWriteOperation("00000009", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // ideally, this will archive commits 6, 7, 8 but since compaction in metadata is until 6, only 6 will get archived, + testTable.doWriteOperation("00000010", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); commitsList = archiveAndGetCommitsList(writeConfig); originalCommits = commitsList.getKey(); commitsAfterArchival = commitsList.getValue(); - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 2); + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1); verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006")), - getActiveCommitInstants(Arrays.asList("00000007", "00000008", "00000009")), commitsAfterArchival); + getActiveCommitInstants(Arrays.asList("00000007", "00000008", "00000009", "00000010")), commitsAfterArchival); // and then 2nd compaction will take place at 12th commit - for (int i = 10; i < 13; i++) { + for (int i = 11; i < 14; i++) { testTable.doWriteOperation("000000" + i, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // trigger archival commitsList = archiveAndGetCommitsList(writeConfig); @@ -618,16 +624,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { } // one more commit will trigger compaction in metadata table and will let archival move forward. - testTable.doWriteOperation("00000013", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doWriteOperation("00000014", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // trigger archival commitsList = archiveAndGetCommitsList(writeConfig); originalCommits = commitsList.getKey(); commitsAfterArchival = commitsList.getValue(); - // before archival 5,6,7,8,9,10,11,12,13 - // after archival 12,13 - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 5); + // before archival 7,8,9,10,11,12,13,14 + // after archival 13,14 + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 6); verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006", "00000007", "00000008", - "00000009", "00000010", "00000011")), getActiveCommitInstants(Arrays.asList("00000012", "00000013")), commitsAfterArchival); + "00000009", "00000010", "00000011", "00000012")), getActiveCommitInstants(Arrays.asList("00000013", "00000014")), commitsAfterArchival); } private Pair, List> archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java index 1c6602348..acd7e835e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java @@ -79,8 +79,8 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4); - // 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit - for (int i = 5; i < 8; i++) { + // 2nd write batch; 4 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit + for (int i = 5; i < 9; i++) { String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000); client.startCommitWithTime(instantTime); client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime); @@ -96,7 +96,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie // verify records final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants(); - assertEquals(4, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()), + assertEquals(5, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()), "should only have the 4 records from the 3rd partition."); } }