From 2f95967dfe0b5286bbb3b0f48a2de7cb8fcc7b7c Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Tue, 9 Nov 2021 07:26:20 -0800 Subject: [PATCH] [HUDI-2591] Bootstrap metadata table only if upgrade / downgrade is not required. (#3836) --- .../HoodieBackedTableMetadataWriter.java | 59 +++++++++++++------ .../FlinkHoodieBackedTableMetadataWriter.java | 7 ++- .../hudi/client/SparkRDDWriteClient.java | 26 +++++--- .../SparkHoodieBackedTableMetadataWriter.java | 41 +++++++++---- .../apache/hudi/table/HoodieSparkTable.java | 2 +- .../functional/TestHoodieBackedMetadata.java | 29 ++------- 6 files changed, 100 insertions(+), 64 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 48d6b948c..bf7bd8f31 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -104,16 +104,18 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta /** * Hudi backed table metadata writer. * - * @param hadoopConf - Hadoop configuration to use for the metadata writer - * @param writeConfig - Writer config - * @param engineContext - Engine context - * @param actionMetadata - Optional action metadata to help decide bootstrap operations - * @param - Action metadata types extending Avro generated SpecificRecordBase + * @param hadoopConf - Hadoop configuration to use for the metadata writer + * @param writeConfig - Writer config + * @param engineContext - Engine context + * @param actionMetadata - Optional action metadata to help decide bootstrap operations + * @param - Action metadata types extending Avro generated SpecificRecordBase + * @param inflightInstantTimestamp - Timestamp of any instant in progress */ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, - Option actionMetadata) { + Option actionMetadata, + Option inflightInstantTimestamp) { this.dataWriteConfig = writeConfig; this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(hadoopConf); @@ -137,7 +139,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta initRegistry(); this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); - initialize(engineContext, actionMetadata); + initialize(engineContext, actionMetadata, inflightInstantTimestamp); initTableMetadata(); } else { enabled = false; @@ -145,6 +147,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta } } + public HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, + HoodieEngineContext engineContext) { + this(hadoopConf, writeConfig, engineContext, Option.empty(), Option.empty()); + } + protected abstract void initRegistry(); /** @@ -234,11 +241,17 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta /** * Initialize the metadata table if it does not exist. - *

+ * * If the metadata table does not exist, then file and partition listing is used to bootstrap the table. + * + * @param engineContext + * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase + * @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored + * while deciding to bootstrap the metadata table. */ protected abstract void initialize(HoodieEngineContext engineContext, - Option actionMetadata); + Option actionMetadata, + Option inflightInstantTimestamp); public void initTableMetadata() { try { @@ -260,11 +273,13 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * @param dataMetaClient - Meta client for the data table * @param actionMetadata - Optional action metadata * @param - Action metadata types extending Avro generated SpecificRecordBase + * @param inflightInstantTimestamp - Timestamp of an instant in progress on the dataset. This instant is ignored * @throws IOException */ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, - Option actionMetadata) throws IOException { + Option actionMetadata, + Option inflightInstantTimestamp) throws IOException { HoodieTimer timer = new HoodieTimer().startTimer(); boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), @@ -291,7 +306,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta if (!exists) { // Initialize for the first time by listing partitions and files directly from the file system - if (bootstrapFromFilesystem(engineContext, dataMetaClient)) { + if (bootstrapFromFilesystem(engineContext, dataMetaClient, inflightInstantTimestamp)) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); } } @@ -347,23 +362,29 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * Initialize the Metadata Table by listing files and partitions from the file system. * * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset. + * @param inflightInstantTimestamp */ - private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient) throws IOException { + private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, + Option inflightInstantTimestamp) throws IOException { ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); // We can only bootstrap if there are no pending operations on the dataset - Option pendingDataInstant = Option.fromJavaOptional(dataMetaClient.getActiveTimeline() - .getReverseOrderedInstants().filter(i -> !i.isCompleted()).findFirst()); - if (pendingDataInstant.isPresent()) { + List pendingDataInstant = dataMetaClient.getActiveTimeline() + .getInstants().filter(i -> !i.isCompleted()) + .filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())) + .collect(Collectors.toList()); + + if (!pendingDataInstant.isEmpty()) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); - LOG.warn("Cannot bootstrap metadata table as operation is in progress in dataset: " + pendingDataInstant.get()); + LOG.warn("Cannot bootstrap metadata table as operation(s) are in progress on the dataset: " + + Arrays.toString(pendingDataInstant.toArray())); return false; } // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - // Otherwise, we use the latest commit timestamp. - String createInstantTime = dataMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst() - .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); + // Otherwise, we use the timestamp of the latest completed action. + String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants() + .getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); HoodieTableMetaClient.withPropertyBuilder() diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 8254d0b88..96cdcae7f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -62,7 +62,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, Option actionMetadata) { - super(hadoopConf, writeConfig, engineContext, actionMetadata); + super(hadoopConf, writeConfig, engineContext, actionMetadata, Option.empty()); } @Override @@ -78,10 +78,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad @Override protected void initialize(HoodieEngineContext engineContext, - Option actionMetadata) { + Option actionMetadata, + Option inflightInstantTimestamp) { try { if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata); + bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 4154dc152..891800bb9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -84,22 +84,32 @@ public class SparkRDDWriteClient extends @Deprecated public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) { - super(context, writeConfig); + this(context, writeConfig, Option.empty()); } @Deprecated public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, Option timelineService) { - super(context, writeConfig, timelineService); + this(context, writeConfig, timelineService); } public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { super(context, writeConfig, timelineService); + initializeMetadataTable(Option.empty()); + } + + private void initializeMetadataTable(Option inflightInstantTimestamp) { if (config.isMetadataTableEnabled()) { - // If the metadata table does not exist, it should be bootstrapped here - // TODO: Check if we can remove this requirement - auto bootstrap on commit - SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context); + // Defer bootstrap if upgrade / downgrade is pending + HoodieTableMetaClient metaClient = createMetaClient(true); + UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( + metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); + if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { + // TODO: Check if we can remove this requirement - auto bootstrap on commit + SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(), + inflightInstantTimestamp); + } } } @@ -213,7 +223,6 @@ public class SparkRDDWriteClient extends return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); } - /** * Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table. @@ -369,7 +378,7 @@ public class SparkRDDWriteClient extends private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD writeStatuses, HoodieTable>, JavaRDD, JavaRDD> table, String clusteringCommitTime) { - + List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); @@ -440,6 +449,9 @@ public class SparkRDDWriteClient extends upgradeDowngrade.run(HoodieTableVersion.current(), instantTime); } metaClient.reloadActiveTimeline(); + + // re-bootstrap metadata table if required + initializeMetadataTable(Option.of(instantTime)); } metaClient.validateTableProperties(config.getProps(), operationType); return getTableAndInitCtx(metaClient, operationType, instantTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 95ab7dc79..ad6f9d94a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -48,23 +48,41 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class); - public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, - HoodieEngineContext context) { - return create(conf, writeConfig, context, Option.empty()); - } - + /** + * Return a Spark based implementation of {@code HoodieTableMetadataWriter} which can be used to + * write to the metadata table. + * + * If the metadata table does not exist, an attempt is made to bootstrap it but there is no guarantted that + * table will end up bootstrapping at this time. + * + * @param conf + * @param writeConfig + * @param context + * @param actionMetadata + * @param inflightInstantTimestamp Timestamp of an instant which is in-progress. This instant is ignored while + * attempting to bootstrap the table. + * @return An instance of the {@code HoodieTableMetadataWriter} + */ public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, - Option actionMetadata) { - return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata); + Option actionMetadata, + Option inflightInstantTimestamp) { + return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, + inflightInstantTimestamp); + } + + public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, + HoodieEngineContext context) { + return create(conf, writeConfig, context, Option.empty(), Option.empty()); } SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, - Option actionMetadata) { - super(hadoopConf, writeConfig, engineContext, actionMetadata); + Option actionMetadata, + Option inflightInstantTimestamp) { + super(hadoopConf, writeConfig, engineContext, actionMetadata, inflightInstantTimestamp); } @Override @@ -84,7 +102,8 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad @Override protected void initialize(HoodieEngineContext engineContext, - Option actionMetadata) { + Option actionMetadata, + Option inflightInstantTimestamp) { try { metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> { if (registry instanceof DistributedRegistry) { @@ -94,7 +113,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad }); if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata); + bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index abbfd3167..f14d39c70 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -128,7 +128,7 @@ public abstract class HoodieSparkTable } if (isMetadataTableAvailable) { return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, - actionMetadata)); + actionMetadata, Option.empty())); } else { return Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index f8c1dfc87..f2aa2462b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -968,25 +968,19 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // set hoodie.table.version to 2 in hoodie.properties file changeTableVersion(HoodieTableVersion.TWO); - // With next commit the table should be deleted (as part of upgrade) + // With next commit the table should be deleted (as part of upgrade) and then re-bootstrapped automatically commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); metaClient.reloadActiveTimeline(); + FileStatus prevStatus = fs.getFileStatus(new Path(metadataTableBasePath)); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { records = dataGen.generateInserts(commitTimestamp, 5); client.startCommitWithTime(commitTimestamp); writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamp).collect(); assertNoWriteErrors(writeStatuses); } - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); - - // With next commit the table should be re-bootstrapped (currently in the constructor. To be changed) - commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - records = dataGen.generateInserts(commitTimestamp, 5); - client.startCommitWithTime(commitTimestamp); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamp).collect(); - assertNoWriteErrors(writeStatuses); - } + assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); + FileStatus currentStatus = fs.getFileStatus(new Path(metadataTableBasePath)); + assertTrue(currentStatus.getModificationTime() > prevStatus.getModificationTime()); initMetaClient(); assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); @@ -1060,7 +1054,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { .withProperties(properties) .build(); - // With next commit the table should be deleted (as part of upgrade) and partial commit should be rolled back. + // With next commit the table should be re-bootstrapped and partial commit should be rolled back. metaClient.reloadActiveTimeline(); commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { @@ -1069,17 +1063,6 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); assertNoWriteErrors(writeStatuses.collect()); } - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); - - // With next commit the table should be re-bootstrapped (currently in the constructor. To be changed) - commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { - records = dataGen.generateInserts(commitTimestamp, 5); - client.startCommitWithTime(commitTimestamp); - writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); - assertNoWriteErrors(writeStatuses.collect()); - } - assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); initMetaClient(); assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode());