1
0

[HUDI-3178] Fixing metadata table compaction so as to not include uncommitted data (#4530)

- There is a chance that the actual write eventually failed in data table but commit was successful in Metadata table, and if compaction was triggered in MDT, compaction could have included the uncommitted data. But once compacted, it may never be ignored while reading from metadata table. So, this patch fixes the bug. Metadata table compaction is triggered before applying the commit to metadata table to circumvent this issue.
This commit is contained in:
Sivabalan Narayanan
2022-01-08 10:34:47 -05:00
committed by GitHub
parent 46bb00e4df
commit 98ec215079
5 changed files with 59 additions and 46 deletions

View File

@@ -689,7 +689,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
.get().getTimestamp();
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
.findInstantsBefore(instantTime).getInstants().collect(Collectors.toList());
if (!pendingInstants.isEmpty()) {
LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",

View File

@@ -128,6 +128,13 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
if (canTriggerTableService) {
// trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table,
// we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata
// table (since reader will filter out only from delta commits)
compactIfNecessary(writeClient, instantTime);
}
if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime);
@@ -153,7 +160,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
// reload timeline
metadataMetaClient.reloadActiveTimeline();
if (canTriggerTableService) {
compactIfNecessary(writeClient, instantTime);
cleanIfNecessary(writeClient, instantTime);
writeClient.archive();
}

View File

@@ -289,15 +289,15 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
for (; i <= 2; i++) {
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
}
// expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction.
// expected num commits = 1 (bootstrap) + 2 (writes)
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4);
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3);
// trigger a async table service, archival should not kick in, even though conditions are met.
doCluster(testTable, "000000" + commitTime.getAndIncrement());
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5);
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4);
// trigger a regular write operation. archival should kick in.
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
@@ -371,7 +371,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// this should have triggered compaction in metadata table
tableMetadata = metadata(writeConfig, context);
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001");
}
@@ -402,7 +402,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001");
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig);
@@ -447,6 +447,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// this new write is expected to trigger metadata table compaction
String commitInstant = "0000002";
doWriteOperation(testTable, commitInstant, INSERT);
doWriteOperation(testTable, "0000003", INSERT);
HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
String metadataCompactionInstant = commitInstant + "001";
@@ -467,7 +468,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
if (simulateFailedCompaction) {
// this should retry the compaction in metadata table.
doWriteOperation(testTable, "0000003", INSERT);
doWriteOperation(testTable, "0000004", INSERT);
} else {
// let the compaction succeed in metadata and validation should succeed.
FileCreateUtils.renameTempToMetaFile(tempFilePath, metaFilePath);
@@ -476,8 +477,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
validateMetadata(testTable);
// add few more write and validate
doWriteOperation(testTable, "0000004", INSERT);
doWriteOperation(testTable, "0000005", UPSERT);
doWriteOperation(testTable, "0000005", INSERT);
doWriteOperation(testTable, "0000006", UPSERT);
validateMetadata(testTable);
if (simulateFailedCompaction) {
@@ -496,13 +497,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
validateMetadata(testTable);
// this should retry the failed compaction in metadata table.
doWriteOperation(testTable, "0000006", INSERT);
doWriteOperation(testTable, "0000007", INSERT);
validateMetadata(testTable);
// add few more write and validate
doWriteOperation(testTable, "0000007", INSERT);
doWriteOperation(testTable, "0000008", UPSERT);
doWriteOperation(testTable, "0000008", INSERT);
doWriteOperation(testTable, "0000009", UPSERT);
validateMetadata(testTable);
}
}

View File

@@ -301,21 +301,25 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
if (i != 6) {
if (enableMetadata) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
// on 6th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight.
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1);
for (int j = 1; j <= 6; j++) {
if (j == 1) {
// first commit should be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
} else if (j == 2) {
// 2nd compaction should not be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j)));
} else {
// every other commit should not be archived
assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
if (i != 6) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
// on 7th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight.
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1);
for (int j = 1; j <= 6; j++) {
if (j == 1) {
// first commit should be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
} else if (j == 2) {
// 2nd compaction should not be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j)));
} else {
// every other commit should not be archived
assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
}
}
}
}
@@ -578,37 +582,39 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
assertEquals(originalCommits, commitsAfterArchival);
}
// one more commit will trigger compaction in metadata table and will let archival move forward.
// two more commits will trigger compaction in metadata table and will let archival move forward.
testTable.doWriteOperation("00000006", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// trigger archival
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
// before archival 1,2,3,4,5,6
// after archival 5,6
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 4);
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004")), getActiveCommitInstants(Arrays.asList("00000005", "00000006")), commitsAfterArchival);
// before archival 1,2,3,4,5,6,7
// after archival 6,7
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 5);
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005")),
getActiveCommitInstants(Arrays.asList("00000006", "00000007")), commitsAfterArchival);
// 3 more commits, 5 and 6 will be archived. but will not move after 6 since compaction has to kick in in metadata table.
testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// 3 more commits, 6 and 7 will be archived. but will not move after 6 since compaction has to kick in metadata table.
testTable.doWriteOperation("00000008", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
testTable.doWriteOperation("00000009", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// trigger archival
commitsList = archiveAndGetCommitsList(writeConfig);
originalCommits = commitsList.getKey();
commitsAfterArchival = commitsList.getValue();
assertEquals(originalCommits, commitsAfterArchival);
// ideally, this will archive commits 5, 6, 7, but since compaction in metadata is until 6, only 5 and 6 will get archived,
testTable.doWriteOperation("00000009", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// ideally, this will archive commits 6, 7, 8 but since compaction in metadata is until 6, only 6 will get archived,
testTable.doWriteOperation("00000010", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
commitsList = archiveAndGetCommitsList(writeConfig);
originalCommits = commitsList.getKey();
commitsAfterArchival = commitsList.getValue();
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 2);
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1);
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006")),
getActiveCommitInstants(Arrays.asList("00000007", "00000008", "00000009")), commitsAfterArchival);
getActiveCommitInstants(Arrays.asList("00000007", "00000008", "00000009", "00000010")), commitsAfterArchival);
// and then 2nd compaction will take place at 12th commit
for (int i = 10; i < 13; i++) {
for (int i = 11; i < 14; i++) {
testTable.doWriteOperation("000000" + i, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// trigger archival
commitsList = archiveAndGetCommitsList(writeConfig);
@@ -618,16 +624,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
}
// one more commit will trigger compaction in metadata table and will let archival move forward.
testTable.doWriteOperation("00000013", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
testTable.doWriteOperation("00000014", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// trigger archival
commitsList = archiveAndGetCommitsList(writeConfig);
originalCommits = commitsList.getKey();
commitsAfterArchival = commitsList.getValue();
// before archival 5,6,7,8,9,10,11,12,13
// after archival 12,13
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 5);
// before archival 7,8,9,10,11,12,13,14
// after archival 13,14
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 6);
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006", "00000007", "00000008",
"00000009", "00000010", "00000011")), getActiveCommitInstants(Arrays.asList("00000012", "00000013")), commitsAfterArchival);
"00000009", "00000010", "00000011", "00000012")), getActiveCommitInstants(Arrays.asList("00000013", "00000014")), commitsAfterArchival);
}
private Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException {

View File

@@ -79,8 +79,8 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);
// 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit
for (int i = 5; i < 8; i++) {
// 2nd write batch; 4 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit
for (int i = 5; i < 9; i++) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
client.startCommitWithTime(instantTime);
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime);
@@ -96,7 +96,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
// verify records
final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(4, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()),
assertEquals(5, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()),
"should only have the 4 records from the 3rd partition.");
}
}