diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index f8a6154b6..a379b75ea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -689,7 +689,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() .get().getTimestamp(); List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() - .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList()); + .findInstantsBefore(instantTime).getInstants().collect(Collectors.toList()); if (!pendingInstants.isEmpty()) { LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s", diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 5329751b2..ccb258a8c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -128,6 +128,13 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad JavaRDD recordRDD = prepRecords(records, partitionName, 1); try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { + if (canTriggerTableService) { + // trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table, + // we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata + // table (since reader will filter out only from delta commits) + compactIfNecessary(writeClient, instantTime); + } + if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { // if this is a new commit being applied to metadata for the first time writeClient.startCommitWithTime(instantTime); @@ -153,7 +160,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // reload timeline metadataMetaClient.reloadActiveTimeline(); if (canTriggerTableService) { - compactIfNecessary(writeClient, instantTime); cleanIfNecessary(writeClient, instantTime); writeClient.archive(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 2051696ab..e3db3914a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -289,15 +289,15 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { for (; i <= 2; i++) { doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); } - // expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction. + // expected num commits = 1 (bootstrap) + 2 (writes) HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3); // trigger a async table service, archival should not kick in, even though conditions are met. doCluster(testTable, "000000" + commitTime.getAndIncrement()); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4); // trigger a regular write operation. archival should kick in. doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); @@ -371,7 +371,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // this should have triggered compaction in metadata table tableMetadata = metadata(writeConfig, context); assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); - assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); + assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001"); } @@ -402,7 +402,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { HoodieTableMetadata tableMetadata = metadata(writeConfig, context); assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); - assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001"); + assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001"); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig); @@ -447,6 +447,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // this new write is expected to trigger metadata table compaction String commitInstant = "0000002"; doWriteOperation(testTable, commitInstant, INSERT); + doWriteOperation(testTable, "0000003", INSERT); HoodieTableMetadata tableMetadata = metadata(writeConfig, context); String metadataCompactionInstant = commitInstant + "001"; @@ -467,7 +468,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { if (simulateFailedCompaction) { // this should retry the compaction in metadata table. - doWriteOperation(testTable, "0000003", INSERT); + doWriteOperation(testTable, "0000004", INSERT); } else { // let the compaction succeed in metadata and validation should succeed. FileCreateUtils.renameTempToMetaFile(tempFilePath, metaFilePath); @@ -476,8 +477,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(testTable); // add few more write and validate - doWriteOperation(testTable, "0000004", INSERT); - doWriteOperation(testTable, "0000005", UPSERT); + doWriteOperation(testTable, "0000005", INSERT); + doWriteOperation(testTable, "0000006", UPSERT); validateMetadata(testTable); if (simulateFailedCompaction) { @@ -496,13 +497,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(testTable); // this should retry the failed compaction in metadata table. - doWriteOperation(testTable, "0000006", INSERT); + doWriteOperation(testTable, "0000007", INSERT); validateMetadata(testTable); // add few more write and validate - doWriteOperation(testTable, "0000007", INSERT); - doWriteOperation(testTable, "0000008", UPSERT); + doWriteOperation(testTable, "0000008", INSERT); + doWriteOperation(testTable, "0000009", UPSERT); validateMetadata(testTable); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index faa06210d..64ad2d6f7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -301,21 +301,25 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); List originalCommits = commitsList.getKey(); List commitsAfterArchival = commitsList.getValue(); - if (i != 6) { + if (enableMetadata) { assertEquals(originalCommits, commitsAfterArchival); } else { - // on 6th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight. - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1); - for (int j = 1; j <= 6; j++) { - if (j == 1) { - // first commit should be archived - assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); - } else if (j == 2) { - // 2nd compaction should not be archived - assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j))); - } else { - // every other commit should not be archived - assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); + if (i != 6) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + // on 7th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight. + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1); + for (int j = 1; j <= 6; j++) { + if (j == 1) { + // first commit should be archived + assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); + } else if (j == 2) { + // 2nd compaction should not be archived + assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j))); + } else { + // every other commit should not be archived + assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j))); + } } } } @@ -578,37 +582,39 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { assertEquals(originalCommits, commitsAfterArchival); } - // one more commit will trigger compaction in metadata table and will let archival move forward. + // two more commits will trigger compaction in metadata table and will let archival move forward. testTable.doWriteOperation("00000006", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // trigger archival Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); List originalCommits = commitsList.getKey(); List commitsAfterArchival = commitsList.getValue(); - // before archival 1,2,3,4,5,6 - // after archival 5,6 - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 4); - verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004")), getActiveCommitInstants(Arrays.asList("00000005", "00000006")), commitsAfterArchival); + // before archival 1,2,3,4,5,6,7 + // after archival 6,7 + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 5); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005")), + getActiveCommitInstants(Arrays.asList("00000006", "00000007")), commitsAfterArchival); - // 3 more commits, 5 and 6 will be archived. but will not move after 6 since compaction has to kick in in metadata table. - testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // 3 more commits, 6 and 7 will be archived. but will not move after 6 since compaction has to kick in metadata table. testTable.doWriteOperation("00000008", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doWriteOperation("00000009", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // trigger archival commitsList = archiveAndGetCommitsList(writeConfig); originalCommits = commitsList.getKey(); commitsAfterArchival = commitsList.getValue(); assertEquals(originalCommits, commitsAfterArchival); - // ideally, this will archive commits 5, 6, 7, but since compaction in metadata is until 6, only 5 and 6 will get archived, - testTable.doWriteOperation("00000009", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // ideally, this will archive commits 6, 7, 8 but since compaction in metadata is until 6, only 6 will get archived, + testTable.doWriteOperation("00000010", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); commitsList = archiveAndGetCommitsList(writeConfig); originalCommits = commitsList.getKey(); commitsAfterArchival = commitsList.getValue(); - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 2); + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1); verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006")), - getActiveCommitInstants(Arrays.asList("00000007", "00000008", "00000009")), commitsAfterArchival); + getActiveCommitInstants(Arrays.asList("00000007", "00000008", "00000009", "00000010")), commitsAfterArchival); // and then 2nd compaction will take place at 12th commit - for (int i = 10; i < 13; i++) { + for (int i = 11; i < 14; i++) { testTable.doWriteOperation("000000" + i, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // trigger archival commitsList = archiveAndGetCommitsList(writeConfig); @@ -618,16 +624,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { } // one more commit will trigger compaction in metadata table and will let archival move forward. - testTable.doWriteOperation("00000013", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doWriteOperation("00000014", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // trigger archival commitsList = archiveAndGetCommitsList(writeConfig); originalCommits = commitsList.getKey(); commitsAfterArchival = commitsList.getValue(); - // before archival 5,6,7,8,9,10,11,12,13 - // after archival 12,13 - assertEquals(originalCommits.size() - commitsAfterArchival.size(), 5); + // before archival 7,8,9,10,11,12,13,14 + // after archival 13,14 + assertEquals(originalCommits.size() - commitsAfterArchival.size(), 6); verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006", "00000007", "00000008", - "00000009", "00000010", "00000011")), getActiveCommitInstants(Arrays.asList("00000012", "00000013")), commitsAfterArchival); + "00000009", "00000010", "00000011", "00000012")), getActiveCommitInstants(Arrays.asList("00000013", "00000014")), commitsAfterArchival); } private Pair, List> archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java index 1c6602348..acd7e835e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java @@ -79,8 +79,8 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4); - // 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit - for (int i = 5; i < 8; i++) { + // 2nd write batch; 4 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit + for (int i = 5; i < 9; i++) { String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000); client.startCommitWithTime(instantTime); client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime); @@ -96,7 +96,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie // verify records final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants(); - assertEquals(4, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()), + assertEquals(5, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()), "should only have the 4 records from the 3rd partition."); } }