From 0c09a973fb0470a47411f734d2d5e1429e3522e2 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Sat, 26 Mar 2022 11:42:54 +0800 Subject: [PATCH] [HUDI-3435] Do not throw exception when instant to rollback does not exist in metadata table active timeline (#4821) --- .../hudi/client/HoodieTimelineArchiver.java | 17 ++++++- .../functional/TestHoodieBackedMetadata.java | 21 +++++---- .../hudi/io/TestHoodieTimelineArchiver.java | 47 +++++++++++++++++-- .../hudi/metadata/HoodieTableMetadata.java | 13 +++++ .../metadata/HoodieTableMetadataUtil.java | 27 +++++++++-- 5 files changed, 108 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 66c89cfdc..0ccfa3c45 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -469,7 +469,22 @@ public class HoodieTimelineArchiver { throw new HoodieException("Error limiting instant archival based on metadata table", e); } } - + + // If this is a metadata table, do not archive the commits that live in data set + // active timeline. This is required by metadata table, + // see HoodieTableMetadataUtil#processRollbackMetadata for details. + if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) { + HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() + .setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath())) + .setConf(metaClient.getHadoopConf()) + .build(); + Option earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp); + if (earliestActiveDatasetCommit.isPresent()) { + instants = instants.filter(instant -> + HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get())); + } + } + return instants.flatMap(hoodieInstant -> groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 1c40bc808..d7d0d2631 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -294,24 +294,29 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { AtomicInteger commitTime = new AtomicInteger(1); // trigger 2 regular writes(1 bootstrap commit). just 1 before archival can get triggered. - int i = 1; - for (; i <= 2; i++) { + for (int i = 1; i <= 2; i++) { doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); } // expected num commits = 1 (bootstrap) + 2 (writes) HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3); + assertEquals(3, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants()); - // trigger a async table service, archival should not kick in, even though conditions are met. + // trigger an async table service, archival should not kick in, even though conditions are met. doCluster(testTable, "000000" + commitTime.getAndIncrement()); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4); + assertEquals(4, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants()); - // trigger a regular write operation. archival should kick in. + // start the timeline server for MARKERS cleaning up + getHoodieWriteClient(writeConfig); + // trigger a regular write operation. data set timeline archival should kick in. + doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); + archiveDataTable(writeConfig, HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build()); + + // trigger a regular write operation. metadata timeline archival should kick in. doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3); + assertEquals(4, metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants()); } @ParameterizedTest @@ -942,7 +947,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { } } - assertTrue(exceptionRaised, "Rollback of archived instants should fail"); + assertFalse(exceptionRaised, "Metadata table should not archive instants that are in dataset active timeline"); // Since each rollback also creates a deltacommit, we can only support rolling back of half of the original // instants present before rollback started. assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index aafc53821..0fe602cd0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -1054,7 +1054,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { .setBasePath(HoodieTableMetadata.getMetadataTableBasePath(basePath)) .setLoadActiveTimelineOnLoad(true).build(); - for (int i = 1; i <= 16; i++) { + for (int i = 1; i <= 17; i++) { testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); // archival @@ -1075,6 +1075,30 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { IntStream.range(1, i + 1).forEach(j -> assertTrue(metadataTableInstants.contains( new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)))); + } else if (i == 8) { + // i == 8 + // The instant "00000000000000" was archived since it's less than + // the earliest instant on the dataset active timeline, + // the dataset active timeline has instants of range [00000001 ~ 00000008] + // because when it does the archiving, no compaction instant on the + // metadata active timeline exists yet. + assertEquals(9, metadataTableInstants.size()); + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001"))); + IntStream.range(1, i + 1).forEach(j -> + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)))); + } else if (i <= 11) { + // In the metadata table timeline, the first delta commit is "00000007" + // because it equals with the earliest commit on the dataset timeline, after archival, + // delta commits "00000008" till "00000011" are added later on without archival or compaction + assertEquals(i - 5, metadataTableInstants.size()); + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001"))); + IntStream.range(7, i + 1).forEach(j -> + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, + "000000" + String.format("%02d", j))))); } else if (i <= 14) { // In the metadata table timeline, the first delta commit is "00000007001" // from metadata table compaction, after archival, delta commits "00000008" @@ -1095,14 +1119,27 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { assertTrue(metadataTableInstants.contains( new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "000000" + String.format("%02d", j))))); - } else { + } else if (i == 16) { // i == 16 - // Only commit "00000015001" and delta commit "00000016" are in the active timeline - assertEquals(2, metadataTableInstants.size()); + // dataset timeline has commits "00000015" and "00000016", + // the metadata timeline has commits [00000008, 00000016] and "00000015001" + assertEquals(10, metadataTableInstants.size()); assertTrue(metadataTableInstants.contains( new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000015001"))); + IntStream.range(8, 17).forEach(j -> + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, + "000000" + String.format("%02d", j))))); + } else { + // i == 17 + // Only commits [00000015, 00000017] and "00000015001" are on the metadata timeline + assertEquals(4, metadataTableInstants.size()); assertTrue(metadataTableInstants.contains( - new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000016"))); + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000015001"))); + IntStream.range(15, 18).forEach(j -> + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, + "000000" + String.format("%02d", j))))); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index 665eff3be..e20607286 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; + import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieMetadataException; @@ -36,6 +37,7 @@ import java.util.List; import java.util.Map; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.common.util.ValidationUtils.checkState; /** * Interface that supports querying various pieces of metadata about a hudi table. @@ -72,6 +74,17 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { return metadataTableBasePath.substring(0, metadataTableBasePath.lastIndexOf(HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH) - 1); } + /** + * Return the base path of the dataset. + * + * @param metadataTableBasePath The base path of the metadata table + */ + static String getDatasetBasePath(String metadataTableBasePath) { + int endPos = metadataTableBasePath.lastIndexOf(Path.SEPARATOR + HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH); + checkState(endPos != -1, metadataTableBasePath + " should be base path of the metadata table"); + return metadataTableBasePath.substring(0, endPos); + } + /** * Returns {@code True} if the given path contains a metadata table. * diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 4d6c602c0..8f543996b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -531,14 +531,35 @@ public class HoodieTableMetadataUtil { } // Case 2: The instant-to-rollback was never committed to Metadata Table. This can happen if the instant-to-rollback - // was a failed commit (never completed) as only completed instants are synced to Metadata Table. - // But the required Metadata Table instants should not have been archived + // was a failed commit (never completed). + // + // There are two cases for failed commit that we need to take care of: + // 1) The commit was synced to metadata table successfully but the dataset meta file switches state failed + // (from INFLIGHT to COMPLETED), the committed files should be rolled back thus the rollback metadata + // can not be skipped, usually a failover should be triggered and the metadata active timeline expects + // to contain the commit, we could check whether the commit was synced to metadata table + // through HoodieActiveTimeline#containsInstant. + // + // 2) The commit synced to metadata table failed or was never synced to metadata table, + // in this case, the rollback metadata should be skipped. + // + // And in which case, + // metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()) + // returns true ? + // It is most probably because of compaction rollback, we schedule a compaction plan early in the timeline (say t1) + // then after a long time schedule and execute the plan then try to rollback it. + // + // scheduled execution rollback compaction actions + // ----- t1 ----- t3 ----- t4 ----- dataset timeline + // + // ---------- t2 (archive) ----------- metadata timeline + // + // when at time t4, we commit the compaction rollback,the above check returns true. HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback); if (metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) { throw new HoodieMetadataException(String.format("The instant %s required to sync rollback of %s has been archived", syncedInstant, instantToRollback)); } - shouldSkip = !metadataTableTimeline.containsInstant(syncedInstant); if (!hasNonZeroRollbackLogFiles && shouldSkip) { LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, since this instant was never committed to Metadata Table",