From 76bc686a77a485544c9e75cfefa59fa021470a0c Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Thu, 12 Aug 2021 15:45:57 -0700 Subject: [PATCH] [HUDI-1292] Created a config to enable/disable syncing of metadata table. (#3427) * [HUDI-1292] Created a config to enable/disable syncing of metadata table. - Metadata Table should only be synced from a single pipeline to prevent conflicts. - Skip syncing metadata table for clustering and compaction - Renamed useFileListingMetadata Co-authored-by: Vinoth Chandar --- .../client/AbstractHoodieWriteClient.java | 8 ++- .../apache/hudi/config/HoodieWriteConfig.java | 4 +- .../HoodieBackedTableMetadataWriter.java | 4 +- .../hudi/table/HoodieTimelineArchiveLog.java | 2 +- ...ertOverwriteTableCommitActionExecutor.java | 2 +- .../hudi/client/SparkRDDWriteClient.java | 5 ++ .../functional/TestHoodieBackedMetadata.java | 52 ++++++++++++++++++- .../common/config/HoodieMetadataConfig.java | 18 ++++++- .../table/view/FileSystemViewManager.java | 4 +- .../hudi/metadata/BaseTableMetadata.java | 2 +- .../hudi/metadata/HoodieTableMetadata.java | 2 +- 11 files changed, 88 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 741b418e8..3680d5f56 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -403,7 +403,9 @@ public abstract class AbstractHoodieWriteClient { // If metadata table is enabled, do not archive instants which are more recent that the latest synced // instant on the metadata table. This is required for metadata table sync. - if (config.useFileListingMetadata()) { + if (config.isMetadataTableEnabled()) { try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(), config.getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) { Option lastSyncedInstantTime = tableMetadata.getSyncedInstantTime(); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java index 54259c941..1170f2f4e 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java @@ -52,7 +52,7 @@ public class JavaInsertOverwriteTableCommitActionExecutor> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeResult) { Map> partitionToExistingFileIds = new HashMap<>(); List partitionPaths = FSUtils.getAllPartitionPaths(context, - table.getMetaClient().getBasePath(), config.useFileListingMetadata(), + table.getMetaClient().getBasePath(), config.isMetadataTableEnabled(), config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); if (partitionPaths != null && partitionPaths.size() > 0) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 075ab1baa..79601f8fa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -447,6 +447,11 @@ public class SparkRDDWriteClient extends @Override public void syncTableMetadata() { + if (!config.getMetadataConfig().enableSync()) { + LOG.info("Metadata table sync is disabled in the config."); + return; + } + // Open up the metadata table again, for syncing try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context)) { LOG.info("Successfully synced to metadata table"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index e12c8bc29..22110d5ff 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -194,6 +194,54 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { } } + /** + * Test enable/disable sync via the config. + */ + @Test + public void testSyncConfig() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + // Create the metadata table + String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { + client.startCommitWithTime(firstCommitTime); + client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 2)), firstCommitTime); + client.syncTableMetadata(); + assertTrue(fs.exists(new Path(metadataTableBasePath))); + validateMetadata(client); + } + + // If sync is disabled, the table will not sync + HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true).enableMetrics(false).enableSync(false).build()).build(); + final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; + String secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, config, true)) { + client.startCommitWithTime(secondCommitTime); + client.insert(jsc.parallelize(dataGen.generateInserts(secondCommitTime, 2)), secondCommitTime); + client.syncTableMetadata(); + + // Metadata Table should not have synced + assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); + assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); + } + + // If sync is enabled, the table will sync + String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { + client.startCommitWithTime(thirdCommitTime); + client.insert(jsc.parallelize(dataGen.generateInserts(thirdCommitTime, 2)), thirdCommitTime); + client.syncTableMetadata(); + + // Metadata Table should have synced + assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); + assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); + assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime)))); + } + } + /** * Only valid partition directories are added to the metadata. */ @@ -932,7 +980,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { HoodieTableMetadata tableMetadata = metadata(client); assertNotNull(tableMetadata, "MetadataReader should have been initialized"); - if (!config.useFileListingMetadata()) { + if (!config.isMetadataTableEnabled()) { return; } @@ -1033,7 +1081,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // Validate write config for metadata table HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); - assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table"); + assertFalse(metadataWriteConfig.isMetadataTableEnabled(), "No metadata table for metadata table"); assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table"); // Metadata table should be in sync with the dataset diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 11a75b6c4..842d8fb43 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -44,6 +44,13 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Enable the internal metadata table which serves table metadata like level file listings"); + // Enable syncing the Metadata Table + public static final ConfigProperty METADATA_SYNC_ENABLE_PROP = ConfigProperty + .key(METADATA_PREFIX + ".sync.enable") + .defaultValue(true) + .sinceVersion("0.9.0") + .withDocumentation("Enable syncing of metadata table from actions on the dataset"); + // Validate contents of Metadata Table on each access against the actual filesystem public static final ConfigProperty METADATA_VALIDATE_PROP = ConfigProperty .key(METADATA_PREFIX + ".validate") @@ -137,10 +144,14 @@ public final class HoodieMetadataConfig extends HoodieConfig { return getBoolean(HoodieMetadataConfig.HOODIE_ASSUME_DATE_PARTITIONING_PROP); } - public boolean useFileListingMetadata() { + public boolean enabled() { return getBoolean(METADATA_ENABLE_PROP); } + public boolean enableSync() { + return enabled() && getBoolean(HoodieMetadataConfig.METADATA_SYNC_ENABLE_PROP); + } + public boolean validateFileListingMetadata() { return getBoolean(METADATA_VALIDATE_PROP); } @@ -174,6 +185,11 @@ public final class HoodieMetadataConfig extends HoodieConfig { return this; } + public Builder enableSync(boolean enable) { + metadataConfig.setValue(METADATA_SYNC_ENABLE_PROP, String.valueOf(enable)); + return this; + } + public Builder enableMetrics(boolean enableMetrics) { metadataConfig.setValue(METADATA_METRICS_ENABLE_PROP, String.valueOf(enableMetrics)); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 7eaed5f9e..32c7125e3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -160,7 +160,7 @@ public class FileSystemViewManager { HoodieTableMetaClient metaClient, SerializableSupplier metadataSupplier) { LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath()); HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - if (metadataConfig.useFileListingMetadata()) { + if (metadataConfig.enabled()) { ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view"); return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metadataSupplier.get()); @@ -181,7 +181,7 @@ public class FileSystemViewManager { HoodieMetadataConfig metadataConfig, HoodieTimeline timeline) { LOG.info("Creating InMemory based view for basePath " + metaClient.getBasePath()); - if (metadataConfig.useFileListingMetadata()) { + if (metadataConfig.enabled()) { return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig); } return new HoodieTableFileSystemView(metaClient, timeline); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index e408ad939..16f85fb36 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -78,7 +78,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { this.spillableMapDirectory = spillableMapDirectory; this.metadataConfig = metadataConfig; - this.enabled = metadataConfig.useFileListingMetadata(); + this.enabled = metadataConfig.enabled(); if (metadataConfig.enableMetrics()) { this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata"))); } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index 506792125..923a5a51b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -78,7 +78,7 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapPath, boolean reuse) { - if (metadataConfig.useFileListingMetadata()) { + if (metadataConfig.enabled()) { return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); } else { return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),