diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9351ccf17..4d64e5834 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -108,6 +108,13 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad List preppedRecordList = HoodieList.getList(preppedRecords); try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) { + if (canTriggerTableService) { + // trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table, + // we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata + // table (since reader will filter out only from delta commits) + compactIfNecessary(writeClient, instantTime); + } + if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) { // if this is a new commit being applied to metadata for the first time writeClient.startCommitWithTime(instantTime); @@ -146,7 +153,6 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad // reload timeline metadataMetaClient.reloadActiveTimeline(); if (canTriggerTableService) { - compactIfNecessary(writeClient, instantTime); cleanIfNecessary(writeClient, instantTime); writeClient.archive(); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 814a8f19e..7a8aeff97 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -207,8 +207,8 @@ public class TestStreamWriteOperatorCoordinator { assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); // test metadata table compaction - // write another 3 commits - for (int i = 1; i < 4; i++) { + // write another 4 commits + for (int i = 1; i < 5; i++) { instant = mockWriteWithMetadata(); metadataTableMetaClient.reloadActiveTimeline(); completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); @@ -216,14 +216,14 @@ public class TestStreamWriteOperatorCoordinator { assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); } // the 5th commit triggers the compaction - instant = mockWriteWithMetadata(); + mockWriteWithMetadata(); metadataTableMetaClient.reloadActiveTimeline(); completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(6L)); - assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001")); - assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(7L)); + assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "001")); + assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); // write another 2 commits - for (int i = 6; i < 8; i++) { + for (int i = 7; i < 8; i++) { instant = mockWriteWithMetadata(); metadataTableMetaClient.reloadActiveTimeline(); completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); @@ -241,13 +241,15 @@ public class TestStreamWriteOperatorCoordinator { // write another commit mockWriteWithMetadata(); - // write another commit to trigger compaction + // write another commit instant = mockWriteWithMetadata(); + // write another commit to trigger compaction + mockWriteWithMetadata(); metadataTableMetaClient.reloadActiveTimeline(); completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(13L)); - assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001")); - assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(14L)); + assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "001")); + assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); } // -------------------------------------------------------------------------