diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 70682c2b7..65c51f0bd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -137,6 +137,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfig) { int parallelism = writeConfig.getMetadataInsertParallelism(); + int minCommitsToKeep = Math.max(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMinCommitsToKeep()); + int maxCommitsToKeep = Math.max(writeConfig.getMetadataMaxCommitsToKeep(), writeConfig.getMaxCommitsToKeep()); + // Create the write config for the metadata table by borrowing options from the main write config. HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) @@ -162,7 +165,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .retainCommits(writeConfig.getMetadataCleanerCommitsRetained()) - .archiveCommitsWith(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMetadataMaxCommitsToKeep()) + .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) // we will trigger compaction manually, to control the instant times .withInlineCompaction(false) .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) @@ -416,7 +419,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta for (HoodieInstant instant : instantsToSync) { LOG.info("Syncing instant " + instant + " to metadata table"); - Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime()); + Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, + metaClient.getActiveTimeline(), instant, metadata.getUpdateTime()); if (records.isPresent()) { commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp()); } @@ -478,7 +482,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { if (enabled) { - List records = HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime, metadata.getSyncedInstantTime()); + List records = HoodieTableMetadataUtil.convertMetadataToRecords(metaClient.getActiveTimeline(), + restoreMetadata, instantTime, metadata.getUpdateTime()); commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } } @@ -492,7 +497,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta @Override public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { if (enabled) { - List records = HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime, metadata.getSyncedInstantTime()); + List records = HoodieTableMetadataUtil.convertMetadataToRecords(metaClient.getActiveTimeline(), + rollbackMetadata, instantTime, metadata.getUpdateTime()); commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime); } } @@ -504,6 +510,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta } } + public HoodieBackedTableMetadata getMetadataReader() { + return metadata; + } + /** * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 1b02a3b92..02c5b9e64 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -23,7 +23,6 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.util.Option; import java.io.Serializable; @@ -41,9 +40,4 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { void update(HoodieRestoreMetadata restoreMetadata, String instantTime); void update(HoodieRollbackMetadata rollbackMetadata, String instantTime); - - /** - * Return the timestamp of the latest instant synced to the metadata table. - */ - Option getLatestSyncedInstantTime(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index b9e7dd419..c63a68347 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -205,7 +205,7 @@ public class HoodieTimelineArchiveLog { if (config.isMetadataTableEnabled()) { try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(), config.getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) { - Option lastSyncedInstantTime = tableMetadata.getSyncedInstantTime(); + Option lastSyncedInstantTime = tableMetadata.getUpdateTime(); if (lastSyncedInstantTime.isPresent()) { LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 298113054..458af1a40 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -31,7 +31,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -125,23 +124,6 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad }); } - /** - * Return the timestamp of the latest instant synced. - *

- * To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest - * delta-commit. - */ - @Override - public Option getLatestSyncedInstantTime() { - if (!enabled) { - return Option.empty(); - } - - HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); - return timeline.getDeltaCommitTimeline().filterCompletedInstants() - .lastInstant().map(HoodieInstant::getTimestamp); - } - /** * Tag each record with the location. *

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index c014e8be5..7c12a9e00 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -29,8 +29,6 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -134,23 +132,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad }); } - /** - * Return the timestamp of the latest instant synced. - * - * To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest - * delta-commit. - */ - @Override - public Option getLatestSyncedInstantTime() { - if (!enabled) { - return Option.empty(); - } - - HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); - return timeline.getDeltaCommitTimeline().filterCompletedInstants() - .lastInstant().map(HoodieInstant::getTimestamp); - } - /** * Tag each record with the location. * diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 22110d5ff..e78c2c8ea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,6 +18,24 @@ package org.apache.hudi.client.functional; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -50,9 +68,11 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieMetadataMetrics; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -61,9 +81,6 @@ import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -76,22 +93,6 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - @Tag("functional") public class TestHoodieBackedMetadata extends HoodieClientTestHarness { @@ -515,6 +516,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { assertNoWriteErrors(writeStatuses); validateMetadata(client); } + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { // Commit with metadata disabled @@ -530,6 +532,144 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { client.syncTableMetadata(); validateMetadata(client); } + + // If an unsynced commit is automatically rolled back during next commit, the rollback commit gets a timestamp + // greater than than the new commit which is started. Ensure that in this case the rollback is not processed + // as the earlier failed commit would not have been committed. + // + // Dataset: C1 C2 C3.inflight[failed] C4 R5[rolls back C3] + // Metadata: C1.delta C2.delta + // + // When R5 completes, C3.xxx will be deleted. When C4 completes, C4 and R5 will be committed to Metadata Table in + // that order. R5 should be neglected as C3 was never committed to metadata table. + newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) { + // Metadata disabled and no auto-commit + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateUpdates(newCommitTime, 10); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + // Not committed so left in inflight state + client.syncTableMetadata(); + assertTrue(metadata(client).isInSync()); + validateMetadata(client); + } + + newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) { + // Metadata enabled + // The previous commit will be rolled back automatically + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateUpdates(newCommitTime, 10); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + assertTrue(metadata(client).isInSync()); + validateMetadata(client); + } + + // In this scenario an async operations is started and completes around the same time of the failed commit. + // Rest of the reasoning is same as above test. + // C4.clean was an asynchronous clean started along with C3. The clean completed but C3 commit failed. + // + // Dataset: C1 C2 C3.inflight[failed] C4.clean C5 R6[rolls back C3] + // Metadata: C1.delta C2.delta + // + // When R6 completes, C3.xxx will be deleted. When C5 completes, C4, C5 and R6 will be committed to Metadata Table + // in that order. R6 should be neglected as C3 was never committed to metadata table. + newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, false), true)) { + // Metadata disabled and no auto-commit + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateUpdates(newCommitTime, 10); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + // Not committed so left in inflight state + client.clean(); + client.syncTableMetadata(); + assertTrue(metadata(client).isInSync()); + validateMetadata(client); + } + + newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient<>(engineContext, getWriteConfig(true, true), true)) { + // Metadata enabled + // The previous commit will be rolled back automatically + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateUpdates(newCommitTime, 10); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + assertTrue(metadata(client).isInSync()); + validateMetadata(client); + } + } + + /** + * Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table + * timeline. + */ + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testManualRollbacks(HoodieTableType tableType) throws Exception { + init(tableType); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + // Setting to archive more aggressively on the Metadata Table than the Dataset + final int maxDeltaCommitsBeforeCompaction = 4; + final int minArchiveCommitsMetadata = 2; + final int minArchiveCommitsDataset = 4; + HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) + .archiveCommitsWith(minArchiveCommitsMetadata, minArchiveCommitsMetadata + 1).retainCommits(1) + .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1) + .retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build()) + .build(); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config)) { + // Initialize table with metadata + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + List records = dataGen.generateInserts(newCommitTime, 20); + client.startCommitWithTime(newCommitTime); + List writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + + // Perform multiple commits + for (int i = 1; i < 10; ++i) { + newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + if (i == 1) { + records = dataGen.generateInserts(newCommitTime, 5); + } else { + records = dataGen.generateUpdates(newCommitTime, 2); + } + client.startCommitWithTime(newCommitTime); + writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + } + + // We can only rollback those commits whose deltacommit have not been archived yet. + int numRollbacks = 0; + boolean exceptionRaised = false; + + List allInstants = metaClient.reloadActiveTimeline().getCommitsTimeline().getReverseOrderedInstants() + .collect(Collectors.toList()); + for (HoodieInstant instantToRollback : allInstants) { + try { + client.rollback(instantToRollback.getTimestamp()); + client.syncTableMetadata(); + ++numRollbacks; + } catch (HoodieMetadataException e) { + exceptionRaised = true; + break; + } + } + + assertTrue(exceptionRaised, "Rollback of archived instants should fail"); + // Since each rollback also creates a deltacommit, we can only support rolling back of half of the original + // instants present before rollback started. + assertTrue(numRollbacks >= Math.max(minArchiveCommitsDataset, minArchiveCommitsMetadata) / 2, + "Rollbacks of non archived instants should work"); + } } /** @@ -657,12 +797,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Table should sync only before the inflightActionTimestamp HoodieBackedTableMetadataWriter writer = (HoodieBackedTableMetadataWriter) SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); - assertEquals(writer.getLatestSyncedInstantTime().get(), beforeInflightActionTimestamp); + assertEquals(writer.getMetadataReader().getUpdateTime().get(), beforeInflightActionTimestamp); // Reader should sync to all the completed instants HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), client.getConfig().getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue()); - assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime); + assertEquals(((HoodieBackedTableMetadata)metadata).getReaderTime().get(), newCommitTime); // Remove the inflight instance holding back table sync fs.delete(inflightCleanPath, false); @@ -670,12 +810,12 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { writer = (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); - assertEquals(writer.getLatestSyncedInstantTime().get(), newCommitTime); + assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime); // Reader should sync to all the completed instants metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), client.getConfig().getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue()); - assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime); + assertEquals(writer.getMetadataReader().getUpdateTime().get(), newCommitTime); } // Enable metadata table and ensure it is synced @@ -693,7 +833,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } /** - * Instants on Metadata Table should be archived as per config. Metadata Table should be automatically compacted as per config. + * Instants on Metadata Table should be archived as per config but we always keep atlest the number of instants + * as on the dataset. Metadata Table should be automatically compacted as per config. */ @Test public void testCleaningArchivingAndCompaction() throws Exception { @@ -701,12 +842,13 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); final int maxDeltaCommitsBeforeCompaction = 4; + final int minArchiveLimit = 4; + final int maxArchiveLimit = 6; HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) - .archiveCommitsWith(6, 8).retainCommits(1) + .archiveCommitsWith(minArchiveLimit - 2, maxArchiveLimit - 2).retainCommits(1) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) - // don't archive the data timeline at all. - .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(Integer.MAX_VALUE - 1, Integer.MAX_VALUE) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveLimit, maxArchiveLimit) .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(true).build()) .build(); @@ -736,6 +878,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // ensure archiving has happened long numDataCompletedInstants = datasetMetaClient.getActiveTimeline().filterCompletedInstants().countInstants(); long numDeltaCommits = metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(); + assertTrue(numDeltaCommits >= minArchiveLimit); assertTrue(numDeltaCommits < numDataCompletedInstants, "Must have less delta commits than total completed instants on data timeline."); } @@ -1030,14 +1173,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // File sizes should be valid Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0)); - // Block sizes should be valid - Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0)); - List fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList()); - Collections.sort(fsBlockSizes); - List metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList()); - Collections.sort(metadataBlockSizes); - assertEquals(fsBlockSizes, metadataBlockSizes); - if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) { LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray())); LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray())); @@ -1054,6 +1189,14 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } } + // Block sizes should be valid + Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0)); + List fsBlockSizes = Arrays.stream(fsStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList()); + Collections.sort(fsBlockSizes); + List metadataBlockSizes = Arrays.stream(metaStatuses).map(FileStatus::getBlockSize).collect(Collectors.toList()); + Collections.sort(metadataBlockSizes); + assertEquals(fsBlockSizes, metadataBlockSizes); + assertEquals(fsFileNames.size(), metadataFilenames.size(), "Files within partition " + partition + " should match"); assertTrue(fsFileNames.equals(metadataFilenames), "Files within partition " + partition + " should match"); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 16f85fb36..44850b9e5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -64,7 +65,6 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected final HoodieMetadataConfig metadataConfig; // Directory used for Spillable Map when merging records protected final String spillableMapDirectory; - private String syncedInstantTime; protected boolean enabled; private TimelineMergedTableMetadata timelineMergedMetadata; @@ -84,9 +84,6 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { } else { this.metrics = Option.empty(); } - if (enabled) { - openTimelineScanner(); - } } /** @@ -298,29 +295,14 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected abstract Option> getRecordByKeyFromMetadata(String key); - private void openTimelineScanner() { + protected void openTimelineScanner(HoodieActiveTimeline metadataTableTimeline) { if (timelineMergedMetadata == null) { List unSyncedInstants = findInstantsToSyncForReader(); timelineMergedMetadata = - new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), null); - - syncedInstantTime = unSyncedInstants.isEmpty() ? getLatestDatasetInstantTime() - : unSyncedInstants.get(unSyncedInstants.size() - 1).getTimestamp(); + new TimelineMergedTableMetadata(datasetMetaClient, metadataTableTimeline, unSyncedInstants, getUpdateTime(), null); } } - /** - * Return the timestamp of the latest synced instant. - */ - @Override - public Option getSyncedInstantTime() { - if (!enabled) { - return Option.empty(); - } - - return Option.ofNullable(syncedInstantTime); - } - /** * Return the instants which are not-synced to the {@code HoodieTableMetadata}. * @@ -344,8 +326,19 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get()); } + public HoodieMetadataConfig getMetadataConfig() { + return metadataConfig; + } + protected String getLatestDatasetInstantTime() { return datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant() .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); } + + public Option getReaderTime() { + if (timelineMergedMetadata == null) { + return Option.empty(); + } + return timelineMergedMetadata.getSyncedInstantTime(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index ce1cf5502..bb3115ae3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -126,7 +126,7 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { } @Override - public Option getSyncedInstantTime() { + public Option getUpdateTime() { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 5234c4d89..554a16562 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -115,6 +116,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { this.metaClient = null; this.tableConfig = null; } + + if (enabled) { + openTimelineScanner(metaClient.getActiveTimeline()); + } } } @@ -272,6 +277,20 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { logRecordScanner = null; } + /** + * Return the timestamp of the latest synced instant. + */ + @Override + public Option getUpdateTime() { + if (!enabled) { + return Option.empty(); + } + + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); + return timeline.getDeltaCommitTimeline().filterCompletedInstants() + .lastInstant().map(HoodieInstant::getTimestamp); + } + /** * Return an ordered list of instants which have not been synced to the Metadata Table. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java index a3d0e2dfe..999ea8e2a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java @@ -18,11 +18,6 @@ package org.apache.hudi.metadata; -import java.io.IOException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; - import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -31,6 +26,11 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.exception.HoodieException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + /** * {@code HoodieTableFileSystemView} implementation that retrieved partition listings from the Metadata Table. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index 923a5a51b..3964cd100 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -68,6 +68,9 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { * @param basePath The base path to check */ static boolean isMetadataTable(String basePath) { + if (basePath.endsWith(Path.SEPARATOR)) { + basePath = basePath.substring(0, basePath.length() - 1); + } return basePath.endsWith(METADATA_TABLE_REL_PATH); } @@ -102,9 +105,11 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { Map getAllFilesInPartitions(List partitionPaths) throws IOException; /** - * Get the instant time to which the metadata is synced w.r.t data timeline. + * Get the instant time at which Metadata Table was last updated. + * + * This is the timestamp of the Instant on the dataset which was last synced to the Metadata Table. */ - Option getSyncedInstantTime(); + Option getUpdateTime(); boolean isInSync(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 594231225..14fe07b32 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -33,7 +34,7 @@ import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; - +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -61,11 +62,13 @@ public class HoodieTableMetadataUtil { * Converts a timeline instant to metadata table records. * * @param datasetMetaClient The meta client associated with the timeline instant + * @param metadataTableTimeline Current timeline of the Metadata Table * @param instant to fetch and convert to metadata table records * @return a list of metadata table records * @throws IOException */ - public static Option> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant, Option lastSyncTs) throws IOException { + public static Option> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, + HoodieActiveTimeline metadataTableTimeline, HoodieInstant instant, Option lastSyncTs) throws IOException { HoodieTimeline timeline = datasetMetaClient.getActiveTimeline(); Option> records = Option.empty(); ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced."); @@ -85,12 +88,12 @@ public class HoodieTableMetadataUtil { case HoodieTimeline.ROLLBACK_ACTION: HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( timeline.getInstantDetails(instant).get()); - records = Option.of(convertMetadataToRecords(rollbackMetadata, instant.getTimestamp(), lastSyncTs)); + records = Option.of(convertMetadataToRecords(metadataTableTimeline, rollbackMetadata, instant.getTimestamp(), lastSyncTs)); break; case HoodieTimeline.RESTORE_ACTION: HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( timeline.getInstantDetails(instant).get()); - records = Option.of(convertMetadataToRecords(restoreMetadata, instant.getTimestamp(), lastSyncTs)); + records = Option.of(convertMetadataToRecords(metadataTableTimeline, restoreMetadata, instant.getTimestamp(), lastSyncTs)); break; case HoodieTimeline.SAVEPOINT_ACTION: // Nothing to be done here @@ -213,21 +216,22 @@ public class HoodieTableMetadataUtil { * @param instantTime * @return a list of metadata table records */ - public static List convertMetadataToRecords(HoodieRestoreMetadata restoreMetadata, String instantTime, Option lastSyncTs) { + public static List convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline, + HoodieRestoreMetadata restoreMetadata, String instantTime, Option lastSyncTs) { Map> partitionToAppendedFiles = new HashMap<>(); Map> partitionToDeletedFiles = new HashMap<>(); restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { - rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)); + rms.forEach(rm -> processRollbackMetadata(metadataTableTimeline, rm, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs)); }); return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"); } - public static List convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String instantTime, Option lastSyncTs) { - + public static List convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline, + HoodieRollbackMetadata rollbackMetadata, String instantTime, Option lastSyncTs) { Map> partitionToAppendedFiles = new HashMap<>(); Map> partitionToDeletedFiles = new HashMap<>(); - processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs); + processRollbackMetadata(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs); return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback"); } @@ -236,27 +240,47 @@ public class HoodieTableMetadataUtil { * * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This * function will extract this change file for each partition. - * + * @param metadataTableTimeline Current timeline of the Metdata Table * @param rollbackMetadata {@code HoodieRollbackMetadata} * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. */ - private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, + private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata rollbackMetadata, Map> partitionToDeletedFiles, Map> partitionToAppendedFiles, Option lastSyncTs) { rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { + final String instantToRollback = rollbackMetadata.getCommitsRollback().get(0); // Has this rollback produced new files? boolean hasRollbackLogFiles = pm.getRollbackLogFiles() != null && !pm.getRollbackLogFiles().isEmpty(); boolean hasNonZeroRollbackLogFiles = hasRollbackLogFiles && pm.getRollbackLogFiles().values().stream().mapToLong(Long::longValue).sum() > 0; - // If commit being rolled back has not been synced to metadata table yet then there is no need to update metadata + + // If instant-to-rollback has not been synced to metadata table yet then there is no need to update metadata + // This can happen in two cases: + // Case 1: Metadata Table timeline is behind the instant-to-rollback. boolean shouldSkip = lastSyncTs.isPresent() - && HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), HoodieTimeline.GREATER_THAN, lastSyncTs.get()); + && HoodieTimeline.compareTimestamps(instantToRollback, HoodieTimeline.GREATER_THAN, lastSyncTs.get()); if (!hasNonZeroRollbackLogFiles && shouldSkip) { LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, given metadata table is already synced upto to %s", - rollbackMetadata.getCommitsRollback().get(0), lastSyncTs.get())); + instantToRollback, lastSyncTs.get())); + return; + } + + // Case 2: The instant-to-rollback was never committed to Metadata Table. This can happen if the instant-to-rollback + // was a failed commit (never completed) as only completed instants are synced to Metadata Table. + // But the required Metadata Table instants should not have been archived + HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantToRollback); + if (metadataTableTimeline.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) { + throw new HoodieMetadataException(String.format("The instant %s required to sync rollback of %s has been archived", + syncedInstant, instantToRollback)); + } + + shouldSkip = !metadataTableTimeline.containsInstant(syncedInstant); + if (!hasNonZeroRollbackLogFiles && shouldSkip) { + LOG.info(String.format("Skipping syncing of rollbackMetadata at %s, since this instant was never committed to Metadata Table", + instantToRollback)); return; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java index 9ba3f2607..b2aca1f11 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/TimelineMergedTableMetadata.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; @@ -49,16 +50,18 @@ public class TimelineMergedTableMetadata implements Serializable { private List instants; private Option lastSyncTs; private Set mergeKeyFilter; + private HoodieActiveTimeline metadataTableTimeline; // keep it a simple hash map, so it can be easily passed onto the executors, once merged. protected final Map> timelineMergedRecords; - public TimelineMergedTableMetadata(HoodieTableMetaClient metaClient, List instants, - Option lastSyncTs, Set mergeKeyFilter) { + public TimelineMergedTableMetadata(HoodieTableMetaClient metaClient, HoodieActiveTimeline metadataTableTimeline, + List instants, Option lastSyncTs, Set mergeKeyFilter) { this.metaClient = metaClient; this.instants = instants; this.lastSyncTs = lastSyncTs; this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet(); + this.metadataTableTimeline = metadataTableTimeline; this.timelineMergedRecords = new HashMap<>(); scan(); @@ -73,7 +76,8 @@ public class TimelineMergedTableMetadata implements Serializable { private void scan() { for (HoodieInstant instant : instants) { try { - Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, instant, lastSyncTs); + Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, + metadataTableTimeline, instant, lastSyncTs); if (records.isPresent()) { records.get().forEach(record -> processNextRecord(record)); } @@ -112,4 +116,15 @@ public class TimelineMergedTableMetadata implements Serializable { public Option> getRecordByKey(String key) { return Option.ofNullable((HoodieRecord) timelineMergedRecords.get(key)); } + + /** + * Returns the timestamp of the latest synced instant. + */ + public Option getSyncedInstantTime() { + if (instants.isEmpty()) { + return Option.empty(); + } + + return Option.of(instants.get(instants.size() - 1).getTimestamp()); + } }