[minor] Follow 3178, fix the flink metadata table compaction (#5175)
This commit is contained in:
@@ -108,6 +108,13 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
List<HoodieRecord> preppedRecordList = HoodieList.getList(preppedRecords);
|
List<HoodieRecord> preppedRecordList = HoodieList.getList(preppedRecords);
|
||||||
|
|
||||||
try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
|
try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
|
||||||
|
if (canTriggerTableService) {
|
||||||
|
// trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table,
|
||||||
|
// we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata
|
||||||
|
// table (since reader will filter out only from delta commits)
|
||||||
|
compactIfNecessary(writeClient, instantTime);
|
||||||
|
}
|
||||||
|
|
||||||
if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
|
if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
|
||||||
// if this is a new commit being applied to metadata for the first time
|
// if this is a new commit being applied to metadata for the first time
|
||||||
writeClient.startCommitWithTime(instantTime);
|
writeClient.startCommitWithTime(instantTime);
|
||||||
@@ -146,7 +153,6 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
// reload timeline
|
// reload timeline
|
||||||
metadataMetaClient.reloadActiveTimeline();
|
metadataMetaClient.reloadActiveTimeline();
|
||||||
if (canTriggerTableService) {
|
if (canTriggerTableService) {
|
||||||
compactIfNecessary(writeClient, instantTime);
|
|
||||||
cleanIfNecessary(writeClient, instantTime);
|
cleanIfNecessary(writeClient, instantTime);
|
||||||
writeClient.archive();
|
writeClient.archive();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -207,8 +207,8 @@ public class TestStreamWriteOperatorCoordinator {
|
|||||||
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
|
||||||
|
|
||||||
// test metadata table compaction
|
// test metadata table compaction
|
||||||
// write another 3 commits
|
// write another 4 commits
|
||||||
for (int i = 1; i < 4; i++) {
|
for (int i = 1; i < 5; i++) {
|
||||||
instant = mockWriteWithMetadata();
|
instant = mockWriteWithMetadata();
|
||||||
metadataTableMetaClient.reloadActiveTimeline();
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
|
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
|
||||||
@@ -216,14 +216,14 @@ public class TestStreamWriteOperatorCoordinator {
|
|||||||
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
|
||||||
}
|
}
|
||||||
// the 5th commit triggers the compaction
|
// the 5th commit triggers the compaction
|
||||||
instant = mockWriteWithMetadata();
|
mockWriteWithMetadata();
|
||||||
metadataTableMetaClient.reloadActiveTimeline();
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||||
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(6L));
|
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(7L));
|
||||||
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001"));
|
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "001"));
|
||||||
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
||||||
// write another 2 commits
|
// write another 2 commits
|
||||||
for (int i = 6; i < 8; i++) {
|
for (int i = 7; i < 8; i++) {
|
||||||
instant = mockWriteWithMetadata();
|
instant = mockWriteWithMetadata();
|
||||||
metadataTableMetaClient.reloadActiveTimeline();
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
|
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
|
||||||
@@ -241,13 +241,15 @@ public class TestStreamWriteOperatorCoordinator {
|
|||||||
|
|
||||||
// write another commit
|
// write another commit
|
||||||
mockWriteWithMetadata();
|
mockWriteWithMetadata();
|
||||||
// write another commit to trigger compaction
|
// write another commit
|
||||||
instant = mockWriteWithMetadata();
|
instant = mockWriteWithMetadata();
|
||||||
|
// write another commit to trigger compaction
|
||||||
|
mockWriteWithMetadata();
|
||||||
metadataTableMetaClient.reloadActiveTimeline();
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||||
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(13L));
|
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(14L));
|
||||||
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant + "001"));
|
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant + "001"));
|
||||||
assertThat(completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user