From 11e64b2db0ddf8f816561f8442b373de15a26d71 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Tue, 22 Jun 2021 08:52:18 -0700 Subject: [PATCH] [HUDI-1717] Metadata Reader should merge all the un-synced but complete instants from the dataset timeline. (#3082) --- .../HoodieBackedTableMetadataWriter.java | 4 +- .../metadata/HoodieTableMetadataWriter.java | 6 +++ .../SparkHoodieBackedTableMetadataWriter.java | 19 ++++++++++ .../metadata/TestHoodieBackedMetadata.java | 38 +++++++++++++++++++ .../hudi/metadata/BaseTableMetadata.java | 34 +++++++++++++++-- .../metadata/HoodieBackedTableMetadata.java | 35 ++++++++--------- 6 files changed, 114 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 38a044799..034465d9b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -400,7 +400,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // (re) init the metadata for reading. initTableMetadata(); try { - List instantsToSync = metadata.findInstantsToSync(); + List instantsToSync = metadata.findInstantsToSyncForWriter(); if (instantsToSync.isEmpty()) { return; } @@ -411,7 +411,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta for (HoodieInstant instant : instantsToSync) { LOG.info("Syncing instant " + instant + " to metadata table"); - Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, metadata.getSyncedInstantTime()); + Option> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime()); if (records.isPresent()) { commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 02c5b9e64..1b02a3b92 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.util.Option; import java.io.Serializable; @@ -40,4 +41,9 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { void update(HoodieRestoreMetadata restoreMetadata, String instantTime); void update(HoodieRollbackMetadata rollbackMetadata, String instantTime); + + /** + * Return the timestamp of the latest instant synced to the metadata table. + */ + Option getLatestSyncedInstantTime(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 7c12a9e00..c014e8be5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -29,6 +29,8 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -132,6 +134,23 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad }); } + /** + * Return the timestamp of the latest instant synced. + * + * To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest + * delta-commit. + */ + @Override + public Option getLatestSyncedInstantTime() { + if (!enabled) { + return Option.empty(); + } + + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); + return timeline.getDeltaCommitTimeline().filterCompletedInstants() + .lastInstant().map(HoodieInstant::getTimestamp); + } + /** * Tag each record with the location. * diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index ab731f557..79ccd3764 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -491,6 +491,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Various table operations without metadata table enabled String restoreToInstant; + String inflightActionTimestamp; + String beforeInflightActionTimestamp; try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { // updates newCommitTime = HoodieActiveTimeline.createNewInstantTime(); @@ -523,6 +525,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { assertTrue(metadata(client).isInSync()); } + // Record a timestamp for creating an inflight instance for sync testing + inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime(); + beforeInflightActionTimestamp = newCommitTime; + // Deletes newCommitTime = HoodieActiveTimeline.createNewInstantTime(); records = dataGen.generateDeletes(newCommitTime, 5); @@ -554,9 +560,41 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { assertTrue(metadata(client).isInSync()); } + // If there is an incomplete operation, the Metadata Table is not updated beyond that operations but the + // in-memory merge should consider all the completed operations. + Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp)); + fs.create(inflightCleanPath).close(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { // Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details client.syncTableMetadata(); + + // Table should sync only before the inflightActionTimestamp + HoodieBackedTableMetadataWriter writer = + (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); + assertEquals(writer.getLatestSyncedInstantTime().get(), beforeInflightActionTimestamp); + + // Reader should sync to all the completed instants + HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), + client.getConfig().getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR); + assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime); + + // Remove the inflight instance holding back table sync + fs.delete(inflightCleanPath, false); + client.syncTableMetadata(); + + writer = + (HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context); + assertEquals(writer.getLatestSyncedInstantTime().get(), newCommitTime); + + // Reader should sync to all the completed instants + metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(), + client.getConfig().getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR); + assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime); + } + + // Enable metadata table and ensure it is synced + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { client.restoreToInstant(restoreToInstant); assertFalse(metadata(client).isInSync()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 0092853b7..85a4d69b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -62,6 +62,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected final HoodieMetadataConfig metadataConfig; // Directory used for Spillable Map when merging records protected final String spillableMapDirectory; + private String syncedInstantTime; protected boolean enabled; private TimelineMergedTableMetadata timelineMergedMetadata; @@ -277,17 +278,44 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { private void openTimelineScanner() { if (timelineMergedMetadata == null) { - List unSyncedInstants = findInstantsToSync(); + List unSyncedInstants = findInstantsToSyncForReader(); timelineMergedMetadata = new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), null); + + syncedInstantTime = unSyncedInstants.isEmpty() ? getLatestDatasetInstantTime() + : unSyncedInstants.get(unSyncedInstants.size() - 1).getTimestamp(); } } - protected abstract List findInstantsToSync(); + /** + * Return the timestamp of the latest synced instant. + */ + @Override + public Option getSyncedInstantTime() { + if (!enabled) { + return Option.empty(); + } + + return Option.ofNullable(syncedInstantTime); + } + + /** + * Return the instants which are not-synced to the {@code HoodieTableMetadata}. + * + * This is the list of all completed but un-synched instants. + */ + protected abstract List findInstantsToSyncForReader(); + + /** + * Return the instants which are not-synced to the {@code HoodieTableMetadataWriter}. + * + * This is the list of all completed but un-synched instants which do not have any incomplete instants in between them. + */ + protected abstract List findInstantsToSyncForWriter(); @Override public boolean isInSync() { - return enabled && findInstantsToSync().isEmpty(); + return enabled && findInstantsToSyncForWriter().isEmpty(); } protected HoodieEngineContext getEngineContext() { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 66e8f419e..f374d61b5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -265,7 +264,22 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { * Return an ordered list of instants which have not been synced to the Metadata Table. */ @Override - protected List findInstantsToSync() { + protected List findInstantsToSyncForReader() { + return findInstantsToSync(true); + } + + /** + * Return an ordered list of instants which have not been synced to the Metadata Table. + */ + @Override + protected List findInstantsToSyncForWriter() { + return findInstantsToSync(false); + } + + /** + * Return an ordered list of instants which have not been synced to the Metadata Table. + */ + private List findInstantsToSync(boolean ignoreIncompleteInstants) { initIfNeeded(); // if there are no instants yet, return empty list, since there is nothing to sync here. @@ -277,7 +291,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // are candidates for sync. String latestMetadataInstantTime = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp(); HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE); - Option earliestIncompleteInstant = candidateTimeline.filterInflightsAndRequested().firstInstant(); + Option earliestIncompleteInstant = ignoreIncompleteInstants ? Option.empty() + : candidateTimeline.filterInflightsAndRequested().firstInstant(); if (earliestIncompleteInstant.isPresent()) { return candidateTimeline.filterCompletedInstants() @@ -289,20 +304,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { } } - /** - * Return the timestamp of the latest compaction instant. - */ - @Override - public Option getSyncedInstantTime() { - if (!enabled) { - return Option.empty(); - } - - HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); - return timeline.getDeltaCommitTimeline().filterCompletedInstants() - .lastInstant().map(HoodieInstant::getTimestamp); - } - public boolean enabled() { return enabled; }