From a66fd40692dba064dfa71d277ef5f25e067e702b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 7 Mar 2022 12:38:27 -0800 Subject: [PATCH] [HUDI-3365] Make sure Metadata Table records are updated appropriately on HDFS (#4739) - This change makes sure MT records are updated appropriately on HDFS: previously after Log File append operations MT records were updated w/ just the size of the deltas being appended to the original files, which have been found to be the cause of issues in case of Rollbacks that were instead updating MT with records bearing the full file-size. - To make sure that we hedge against similar issues going f/w, this PR alleviates this discrepancy and streamlines the flow of MT table always ingesting records bearing full file-sizes. --- .../hudi/client/BaseHoodieWriteClient.java | 161 ++++++++++++++---- .../apache/hudi/io/HoodieAppendHandle.java | 2 + .../apache/hudi/io/HoodieCreateHandle.java | 25 +-- .../hudi/client/HoodieFlinkWriteClient.java | 41 ++--- .../hudi/client/HoodieJavaWriteClient.java | 43 ++--- .../upgrade/JavaUpgradeDowngradeHelper.java | 51 ++++++ .../hudi/client/SparkRDDWriteClient.java | 85 +++------ .../org/apache/hudi/table/TestCleaner.java | 4 + .../testutils/HoodieClientTestHarness.java | 17 +- .../hudi/common/util/CollectionUtils.java | 4 +- .../hudi/metadata/HoodieMetadataPayload.java | 88 +++++++--- .../metadata/HoodieTableMetadataUtil.java | 75 +++++--- .../common/testutils/FileCreateUtils.java | 2 +- .../common/testutils/HoodieTestTable.java | 2 + .../TestStreamWriteOperatorCoordinator.java | 20 ++- .../SparkRDDWriteClientOverride.java | 38 +++++ .../apache/hudi/functional/TestBootstrap.java | 6 +- .../hudi/functional/TestOrcBootstrap.java | 6 +- 18 files changed, 415 insertions(+), 255 deletions(-) create mode 100644 hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java create mode 100644 hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 7b67ff54a..eca2a3d67 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.async.AsyncArchiveService; import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.avro.model.HoodieCleanMetadata; @@ -45,6 +47,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; @@ -73,9 +76,8 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.action.savepoint.SavepointHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -107,15 +109,16 @@ public abstract class BaseHoodieWriteClient index; + private final SupportsUpgradeDowngrade upgradeDowngradeHelper; + private transient WriteOperationType operationType; + private transient HoodieWriteCommitCallback commitCallback; + protected final transient HoodieMetrics metrics; protected transient Timer.Context writeTimer = null; protected transient Timer.Context compactionTimer; protected transient Timer.Context clusteringTimer; - private transient WriteOperationType operationType; - private transient HoodieWriteCommitCallback commitCallback; protected transient AsyncCleanerService asyncCleanerService; protected transient AsyncArchiveService asyncArchiveService; protected final TransactionManager txnManager; @@ -125,25 +128,32 @@ public abstract class BaseHoodieWriteClient timelineService) { + public BaseHoodieWriteClient(HoodieEngineContext context, + HoodieWriteConfig writeConfig, + Option timelineService, + SupportsUpgradeDowngrade upgradeDowngradeHelper) { super(context, writeConfig, timelineService); this.metrics = new HoodieMetrics(config); this.index = createIndex(writeConfig); this.txnManager = new TransactionManager(config, fs); + this.upgradeDowngradeHelper = upgradeDowngradeHelper; } protected abstract HoodieIndex createIndex(HoodieWriteConfig writeConfig); @@ -291,7 +301,7 @@ public abstract class BaseHoodieWriteClient table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS); + HoodieTable table = initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)); rollbackFailedBootstrap(); table.bootstrap(context, extraMetadata); } @@ -299,7 +309,7 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); @@ -628,7 +638,7 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = initTable(WriteOperationType.UNKNOWN, Option.empty()); SavepointHelpers.validateSavepointPresence(table, savepointTime); restoreToInstant(savepointTime); SavepointHelpers.validateSavepointRestore(table, savepointTime); @@ -636,25 +646,11 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf); + HoodieTable table = initTable(WriteOperationType.UNKNOWN, Option.empty()); Option pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); return rollback(commitInstantTime, pendingRollbackInfo, false); } - /** - * @Deprecated - * Rollback the inflight record changes with the given commit time. This - * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)} - * Adding this api for backwards compatability. - * @param commitInstantTime Instant time of the commit - * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. - * @throws HoodieRollbackException if rollback cannot be performed successfully - */ - @Deprecated - public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException { - return rollback(commitInstantTime, Option.empty(), skipLocking); - } - /** * @Deprecated * Rollback the inflight record changes with the given commit time. This @@ -711,7 +707,7 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = initTable(WriteOperationType.UNKNOWN, Option.empty()); Option restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime); if (restorePlanOption.isPresent()) { HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime); @@ -988,7 +984,7 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf); List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty()); Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); @@ -1246,17 +1242,79 @@ public abstract class BaseHoodieWriteClient getTableAndInitCtx(WriteOperationType operationType, String instantTime); + protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime); /** - * Sets write schema from last instant since deletes may not have schema set in the config. + * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping + * operations such as: + * + * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on + * {@link #doInitTable(HoodieTableMetaClient, Option)} instead + * + *
    + *
  • Checking whether upgrade/downgrade is required
  • + *
  • Bootstrapping Metadata Table (if required)
  • + *
  • Initializing metrics contexts
  • + *
*/ + protected final HoodieTable initTable(WriteOperationType operationType, Option instantTime) { + HoodieTableMetaClient metaClient = createMetaClient(true); + // Setup write schemas for deletes + if (operationType == WriteOperationType.DELETE) { + setWriteSchemaForDeletes(metaClient); + } + + HoodieTable table; + + this.txnManager.beginTransaction(); + try { + tryUpgrade(metaClient, instantTime); + table = doInitTable(metaClient, instantTime); + } finally { + this.txnManager.endTransaction(); + } + + // Validate table properties + metaClient.validateTableProperties(config.getProps(), operationType); + // Make sure that FS View is in sync + table.getHoodieView().sync(); + + switch (operationType) { + case INSERT: + case INSERT_PREPPED: + case UPSERT: + case UPSERT_PREPPED: + case BULK_INSERT: + case BULK_INSERT_PREPPED: + case INSERT_OVERWRITE: + case INSERT_OVERWRITE_TABLE: + setWriteTimer(table); + break; + case CLUSTER: + clusteringTimer = metrics.getClusteringCtx(); + break; + case COMPACT: + compactionTimer = metrics.getCompactionCtx(); + break; + default: + } + + return table; + } + + /** + * Sets write schema from last instant since deletes may not have schema set in the config. + */ protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) { try { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -1301,4 +1359,33 @@ public abstract class BaseHoodieWriteClient table) { + String commitType = table.getMetaClient().getCommitActionType(); + if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) { + writeTimer = metrics.getCommitCtx(); + } else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + writeTimer = metrics.getDeltaCommitCtx(); + } + } + + private void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { + UpgradeDowngrade upgradeDowngrade = + new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper); + + if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { + // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits + List instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime); + + Map> pendingRollbacks = getPendingRollbackInfos(metaClient); + instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); + + rollbackFailedWrites(pendingRollbacks, true); + + new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper) + .run(HoodieTableVersion.current(), instantTime.orElse(null)); + + metaClient.reloadActiveTimeline(); + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 7eafe268b..db9083f9e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -417,6 +417,8 @@ public class HoodieAppendHandle extends writer = null; // update final size, once for all log files + // TODO we can actually deduce file size purely from AppendResult (based on offset and size + // of the appended block) for (WriteStatus status: statuses) { long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); status.getStat().setFileSizeInBytes(logFileSize); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 096c257b1..3e7e0b16e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -18,6 +18,10 @@ package org.apache.hudi.io; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -35,11 +39,6 @@ import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -235,20 +234,14 @@ public class HoodieCreateHandle extends stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setFileId(writeStatus.getFileId()); stat.setPath(new Path(config.getBasePath()), path); - stat.setTotalWriteBytes(computeTotalWriteBytes()); - stat.setFileSizeInBytes(computeFileSizeInBytes()); stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + + long fileSize = FSUtils.getFileSize(fs, path); + stat.setTotalWriteBytes(fileSize); + stat.setFileSizeInBytes(fileSize); + RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalCreateTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); } - - protected long computeTotalWriteBytes() throws IOException { - return FSUtils.getFileSize(fs, path); - } - - protected long computeFileSizeInBytes() throws IOException { - return FSUtils.getFileSize(fs, path); - } - } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 1f5d14af7..fb613309d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.data.HoodieList; @@ -62,9 +64,6 @@ import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.util.FlinkClientUtil; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +92,7 @@ public class HoodieFlinkWriteClient extends private Option metadataWriterOption = Option.empty(); public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { - super(context, writeConfig); + super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance()); this.bucketToHandles = new HashMap<>(); } @@ -136,7 +135,7 @@ public class HoodieFlinkWriteClient extends @Override public List upsert(List> records, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), @@ -152,7 +151,7 @@ public class HoodieFlinkWriteClient extends public List upsertPreppedRecords(List> preppedRecords, String instantTime) { // only used for metadata table, the upsert happens in single thread HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(preppedRecords.get(0), getConfig(), @@ -164,7 +163,7 @@ public class HoodieFlinkWriteClient extends @Override public List insert(List> records, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.INSERT, instantTime); + initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); // create the write handle if not exists @@ -187,7 +186,7 @@ public class HoodieFlinkWriteClient extends public List insertOverwrite( List> records, final String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime); + initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient()); // create the write handle if not exists @@ -206,7 +205,7 @@ public class HoodieFlinkWriteClient extends */ public List insertOverwriteTable( List> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime); + HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient()); // create the write handle if not exists @@ -239,7 +238,7 @@ public class HoodieFlinkWriteClient extends @Override public List delete(List keys, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.DELETE, instantTime); + initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime)); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); HoodieWriteMetadata> result = table.delete(context, instantTime, keys); return postWrite(result, instantTime, table); @@ -397,11 +396,9 @@ public class HoodieFlinkWriteClient extends } @Override - protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); - new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance()) - .run(HoodieTableVersion.current(), instantTime); - return getTableAndInitCtx(metaClient, operationType); + protected HoodieTable>, List, List> doInitTable(HoodieTableMetaClient metaClient, Option instantTime) { + // Create a Hoodie table which encapsulated the commits and files visible + return getHoodieTable(); } /** @@ -488,20 +485,6 @@ public class HoodieFlinkWriteClient extends return writeHandle; } - private HoodieTable>, List, List> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { - if (operationType == WriteOperationType.DELETE) { - setWriteSchemaForDeletes(metaClient); - } - // Create a Hoodie table which encapsulated the commits and files visible - HoodieFlinkTable table = getHoodieTable(); - if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { - writeTimer = metrics.getCommitCtx(); - } else { - writeTimer = metrics.getDeltaCommitCtx(); - } - return table; - } - public HoodieFlinkTable getHoodieTable() { return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index f365f2932..9de9298c2 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.data.HoodieList; @@ -30,7 +32,6 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -40,9 +41,7 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieJavaTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.table.upgrade.JavaUpgradeDowngradeHelper; import java.util.List; import java.util.Map; @@ -52,14 +51,14 @@ public class HoodieJavaWriteClient extends BaseHoodieWriteClient>, List, List> { public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - super(context, clientConfig); + super(context, clientConfig, JavaUpgradeDowngradeHelper.getInstance()); } public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, Option timelineService) { - super(context, writeConfig, timelineService); + super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance()); } @Override @@ -99,7 +98,7 @@ public class HoodieJavaWriteClient extends public List upsert(List> records, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.upsert(context, instantTime, records); @@ -113,7 +112,7 @@ public class HoodieJavaWriteClient extends public List upsertPreppedRecords(List> preppedRecords, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); + initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.upsertPrepped(context,instantTime, preppedRecords); @@ -123,7 +122,7 @@ public class HoodieJavaWriteClient extends @Override public List insert(List> records, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.INSERT, instantTime); + initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.insert(context, instantTime, records); @@ -137,7 +136,7 @@ public class HoodieJavaWriteClient extends public List insertPreppedRecords(List> preppedRecords, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); + initTable(WriteOperationType.INSERT_PREPPED, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.insertPrepped(context,instantTime, preppedRecords); @@ -169,7 +168,7 @@ public class HoodieJavaWriteClient extends String instantTime, Option>>> bulkInsertPartitioner) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime); + initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.bulkInsertPrepped(context, instantTime, preppedRecords, bulkInsertPartitioner); @@ -180,7 +179,7 @@ public class HoodieJavaWriteClient extends public List delete(List keys, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.DELETE, instantTime); + initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime)); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); HoodieWriteMetadata> result = table.delete(context,instantTime, keys); return postWrite(result, instantTime, table); @@ -233,23 +232,11 @@ public class HoodieJavaWriteClient extends } @Override - protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); + protected HoodieTable>, List, List> doInitTable(HoodieTableMetaClient metaClient, Option instantTime) { // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); - return getTableAndInitCtx(metaClient, operationType); + + // Create a Hoodie table which encapsulated the commits and files visible + return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); } - private HoodieTable>, List, List> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { - if (operationType == WriteOperationType.DELETE) { - setWriteSchemaForDeletes(metaClient); - } - // Create a Hoodie table which encapsulated the commits and files visible - HoodieJavaTable table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); - if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { - writeTimer = metrics.getCommitCtx(); - } else { - writeTimer = metrics.getDeltaCommitCtx(); - } - return table; - } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java new file mode 100644 index 000000000..e1c44d091 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.table.HoodieJavaTable; +import org.apache.hudi.table.HoodieTable; + +/** + * Java upgrade and downgrade helper + */ +public class JavaUpgradeDowngradeHelper implements SupportsUpgradeDowngrade { + + private static final JavaUpgradeDowngradeHelper SINGLETON_INSTANCE = + new JavaUpgradeDowngradeHelper(); + + private JavaUpgradeDowngradeHelper() {} + + public static JavaUpgradeDowngradeHelper getInstance() { + return SINGLETON_INSTANCE; + } + + @Override + public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieJavaTable.create(config, context); + } + + @Override + public String getPartitionColumns(HoodieWriteConfig config) { + return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index d51d25616..7e142d89c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -18,15 +18,15 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.TransactionUtils; -import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -35,7 +35,6 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -55,10 +54,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; -import org.apache.hudi.table.upgrade.UpgradeDowngrade; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; @@ -94,7 +89,7 @@ public class SparkRDDWriteClient extends public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { - super(context, writeConfig, timelineService); + super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance()); } /** @@ -147,13 +142,13 @@ public class SparkRDDWriteClient extends */ @Override public void bootstrap(Option> extraMetadata) { - getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).bootstrap(context, extraMetadata); + initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata); } @Override public JavaRDD upsert(JavaRDD> records, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.upsert(context, instantTime, records); @@ -166,7 +161,7 @@ public class SparkRDDWriteClient extends @Override public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); + initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.upsertPrepped(context,instantTime, preppedRecords); @@ -176,7 +171,7 @@ public class SparkRDDWriteClient extends @Override public JavaRDD insert(JavaRDD> records, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.INSERT, instantTime); + initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.insert(context,instantTime, records); @@ -186,7 +181,7 @@ public class SparkRDDWriteClient extends @Override public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); + initTable(WriteOperationType.INSERT_PREPPED, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.insertPrepped(context,instantTime, preppedRecords); @@ -201,7 +196,7 @@ public class SparkRDDWriteClient extends * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public HoodieWriteResult insertOverwrite(JavaRDD> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime); + HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient()); HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records); @@ -216,7 +211,7 @@ public class SparkRDDWriteClient extends * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public HoodieWriteResult insertOverwriteTable(JavaRDD> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime); + HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient()); HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records); @@ -231,7 +226,7 @@ public class SparkRDDWriteClient extends @Override public JavaRDD bulkInsert(JavaRDD> records, String instantTime, Option>>> userDefinedBulkInsertPartitioner) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime); + initTable(WriteOperationType.BULK_INSERT, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.BULK_INSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner); @@ -241,7 +236,7 @@ public class SparkRDDWriteClient extends @Override public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, String instantTime, Option>>> bulkInsertPartitioner) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime); + initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner); @@ -250,14 +245,14 @@ public class SparkRDDWriteClient extends @Override public JavaRDD delete(JavaRDD keys, String instantTime) { - HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime); + HoodieTable>, JavaRDD, JavaRDD> table = initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime)); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); HoodieWriteMetadata> result = table.delete(context,instantTime, keys); return postWrite(result, instantTime, table); } public HoodieWriteResult deletePartitions(List partitions, String instantTime) { - HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime); + HoodieTable>, JavaRDD, JavaRDD> table = initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime)); preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient()); HoodieWriteMetadata> result = table.deletePartitions(context, instantTime, partitions); return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); @@ -420,34 +415,14 @@ public class SparkRDDWriteClient extends } @Override - protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, - String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); - UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( - metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); - try { - this.txnManager.beginTransaction(); - if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { - // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits - List instantsToRollback = getInstantsToRollback( - metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)); - Map> pendingRollbacks = getPendingRollbackInfos(metaClient); - instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); - this.rollbackFailedWrites(pendingRollbacks, true); - new UpgradeDowngrade( - metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) - .run(HoodieTableVersion.current(), instantTime); - metaClient.reloadActiveTimeline(); - } - // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, - // if it didn't exist before - // See https://issues.apache.org/jira/browse/HUDI-3343 for more details - initializeMetadataTable(Option.of(instantTime)); - } finally { - this.txnManager.endTransaction(); - } - metaClient.validateTableProperties(config.getProps(), operationType); - return getTableAndInitCtx(metaClient, operationType, instantTime); + protected HoodieTable>, JavaRDD, JavaRDD> doInitTable(HoodieTableMetaClient metaClient, Option instantTime) { + // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, + // if it didn't exist before + // See https://issues.apache.org/jira/browse/HUDI-3343 for more details + initializeMetadataTable(instantTime); + + // Create a Hoodie table which encapsulated the commits and files visible + return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled()); } /** @@ -480,22 +455,6 @@ public class SparkRDDWriteClient extends } } - private HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx( - HoodieTableMetaClient metaClient, WriteOperationType operationType, String instantTime) { - if (operationType == WriteOperationType.DELETE) { - setWriteSchemaForDeletes(metaClient); - } - // Create a Hoodie table which encapsulated the commits and files visible - HoodieSparkTable table = HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled()); - if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { - writeTimer = metrics.getCommitCtx(); - } else { - writeTimer = metrics.getDeltaCommitCtx(); - } - table.getHoodieView().sync(); - return table; - } - @Override protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { // Create a Hoodie table after startTxn which encapsulated the commits and files visible. diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index f51a169dd..552e85af4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -1077,6 +1077,8 @@ public class TestCleaner extends HoodieClientTestBase { writeStat.setPartitionPath(partition); writeStat.setPath(partition + "/" + getBaseFilename(instantTime, newFileId)); writeStat.setFileId(newFileId); + writeStat.setTotalWriteBytes(1); + writeStat.setFileSizeInBytes(1); replaceMetadata.addWriteStat(partition, writeStat); } return Pair.of(requestedReplaceMetadata, replaceMetadata); @@ -1756,6 +1758,8 @@ public class TestCleaner extends HoodieClientTestBase { writeStat.setPartitionPath(partitionPath); writeStat.setPath(partitionPath + "/" + getBaseFilename(instantTime, f)); writeStat.setFileId(f); + writeStat.setTotalWriteBytes(1); + writeStat.setFileSizeInBytes(1); metadata.addWriteStat(partitionPath, writeStat); })); return metadata; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index f339f5ed9..71e4b4b4e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -102,6 +102,7 @@ import scala.Tuple2; import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertLinesMatch; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -617,21 +618,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im Collections.sort(fsFileNames); Collections.sort(metadataFilenames); - if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) { - LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray())); - LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray())); - - for (String fileName : fsFileNames) { - if (!metadataFilenames.contains(fileName)) { - LOG.error(partition + "FsFilename " + fileName + " not found in Meta data"); - } - } - for (String fileName : metadataFilenames) { - if (!fsFileNames.contains(fileName)) { - LOG.error(partition + "Metadata file " + fileName + " not found in original FS"); - } - } - } + assertLinesMatch(fsFileNames, metadataFilenames); assertEquals(fsStatuses.length, partitionToFilesMap.get(partitionPath.toString()).length); // Block sizes should be valid diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index 1a3d053e2..9741ceef3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -77,8 +77,8 @@ public class CollectionUtils { * NOTE: That values associated with overlapping keys from the second map, will override * values from the first one */ - public static Map combine(Map one, Map another) { - Map combined = new HashMap<>(one.size() + another.size()); + public static HashMap combine(Map one, Map another) { + HashMap combined = new HashMap<>(one.size() + another.size()); combined.putAll(one); combined.putAll(another); return combined; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 221b52e77..75f83b7c6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -52,6 +52,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -239,10 +240,22 @@ public class HoodieMetadataPayload implements HoodieRecordPayload> filesAdded, Option> filesDeleted) { Map fileInfo = new HashMap<>(); - filesAdded.ifPresent( - m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false)))); - filesDeleted.ifPresent( - m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); + filesAdded.ifPresent(filesMap -> + fileInfo.putAll( + filesMap.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, (entry) -> { + long fileSize = entry.getValue(); + // Assert that the file-size of the file being added is positive, since Hudi + // should not be creating empty files + checkState(fileSize > 0); + return new HoodieMetadataFileInfo(fileSize, false); + }))) + ); + filesDeleted.ifPresent(filesList -> + fileInfo.putAll( + filesList.stream().collect( + Collectors.toMap(Function.identity(), (ignored) -> new HoodieMetadataFileInfo(0L, true)))) + ); HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.getPartitionPath()); HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo); @@ -288,7 +301,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload combinedFileInfo = combineFilesystemMetadata(previousRecord); + Map combinedFileInfo = combineFileSystemMetadata(previousRecord); return new HoodieMetadataPayload(key, type, combinedFileInfo); case METADATA_TYPE_BLOOM_FILTER: HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord); @@ -392,28 +405,53 @@ public class HoodieMetadataPayload implements HoodieRecordPayload e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + + // First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } + // Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { - // If the filename wasnt present then we carry it forward - if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); - } else { - if (fileInfo.getIsDeleted()) { - // file deletion - combinedFileInfo.remove(filename); - } else { - // file appends. - combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); - }); - } - } + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { + combinedFileInfo.merge(key, fileInfo, + // Combine previous record w/ the new one, new records taking precedence over + // the old one + // + // NOTE: That if previous listing contains the file that is being deleted by the tombstone + // record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting + // listing as well as drop the tombstone itself. + // However, if file is not present in the previous record we have to persist tombstone + // record in the listing to make sure we carry forward information that this file + // was deleted. This special case could occur since the merging flow is 2-stage: + // - First we merge records from all of the delta log-files + // - Then we merge records from base-files with the delta ones (coming as a result + // of the previous step) + (oldFileInfo, newFileInfo) -> + // NOTE: We can’t assume that MT update records will be ordered the same way as actual + // FS operations (since they are not atomic), therefore MT record merging should be a + // _commutative_ & _associative_ operation (ie one that would work even in case records + // will get re-ordered), which is + // - Possible for file-sizes (since file-sizes will ever grow, we can simply + // take max of the old and new records) + // - Not possible for is-deleted flags* + // + // *However, we’re assuming that the case of concurrent write and deletion of the same + // file is _impossible_ -- it would only be possible with concurrent upsert and + // rollback operation (affecting the same log-file), which is implausible, b/c either + // of the following have to be true: + // - We’re appending to failed log-file (then the other writer is trying to + // rollback it concurrently, before it’s own write) + // - Rollback (of completed instant) is running concurrently with append (meaning + // that restore is running concurrently with a write, which is also nut supported + // currently) + newFileInfo.getIsDeleted() + ? null + : new HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), false)); }); } @@ -509,6 +547,14 @@ public class HoodieMetadataPayload implements HoodieRecordPayload filesystemMetadata) { + if (type == METADATA_TYPE_FILE_LIST) { + filesystemMetadata.forEach((fileName, fileInfo) -> { + checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0, "Existing files should have size > 0"); + }); + } + } + private static T getNestedFieldValue(GenericRecord record, String fieldName) { // NOTE: This routine is more lightweight than {@code HoodieAvroUtils.getNestedFieldVal} if (record.getSchema().getField(fieldName) == null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index e569baefb..8b37d0359 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -147,40 +148,58 @@ public class HoodieTableMetadataUtil { */ public static List convertMetadataToFilesPartitionRecords(HoodieCommitMetadata commitMetadata, String instantTime) { - List records = new LinkedList<>(); - List allPartitions = new LinkedList<>(); - commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; - allPartitions.add(partition); + List records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size()); - Map newFiles = new HashMap<>(writeStats.size()); - writeStats.forEach(hoodieWriteStat -> { - String pathWithPartition = hoodieWriteStat.getPath(); - if (pathWithPartition == null) { - // Empty partition - LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); - return; - } + // Add record bearing partitions list + ArrayList partitionsList = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); - int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1; - String filename = pathWithPartition.substring(offset); - long totalWriteBytes = newFiles.containsKey(filename) - ? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes() - : hoodieWriteStat.getTotalWriteBytes(); - newFiles.put(filename, totalWriteBytes); - }); - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( - partition, Option.of(newFiles), Option.empty()); - records.add(record); - }); + records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList)); - // New partitions created - HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions)); - records.add(record); + // Update files listing records for each individual partition + List> updatedPartitionFilesRecords = + commitMetadata.getPartitionToWriteStats().entrySet() + .stream() + .map(entry -> { + String partitionStatName = entry.getKey(); + List writeStats = entry.getValue(); + + String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; + + HashMap updatedFilesToSizesMapping = + writeStats.stream().reduce(new HashMap<>(writeStats.size()), + (map, stat) -> { + String pathWithPartition = stat.getPath(); + if (pathWithPartition == null) { + // Empty partition + LOG.warn("Unable to find path in write stat to update metadata table " + stat); + return map; + } + + int offset = partition.equals(NON_PARTITIONED_NAME) + ? (pathWithPartition.startsWith("/") ? 1 : 0) + : partition.length() + 1; + String filename = pathWithPartition.substring(offset); + + // Since write-stats are coming in no particular order, if the same + // file have previously been appended to w/in the txn, we simply pick max + // of the sizes as reported after every write, since file-sizes are + // monotonically increasing (ie file-size never goes down, unless deleted) + map.merge(filename, stat.getFileSizeInBytes(), Math::max); + + return map; + }, + CollectionUtils::combine); + + return HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(updatedFilesToSizesMapping), + Option.empty()); + }) + .collect(Collectors.toList()); + + records.addAll(updatedPartitionFilesRecords); LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType() + ". #partitions_updated=" + records.size()); + return records; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 2b1057fea..8f5e5ae96 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -285,7 +285,7 @@ public class FileCreateUtils { public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId) throws Exception { - createBaseFile(basePath, partitionPath, instantTime, fileId, 0); + createBaseFile(basePath, partitionPath, instantTime, fileId, 1); } public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId, long length) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index f78312217..6f49c6996 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -1057,6 +1057,7 @@ public class HoodieTestTable { writeStat.setPartitionPath(partition); writeStat.setPath(partition + "/" + fileName); writeStat.setTotalWriteBytes(fileIdInfo.getValue()); + writeStat.setFileSizeInBytes(fileIdInfo.getValue()); writeStats.add(writeStat); } } @@ -1082,6 +1083,7 @@ public class HoodieTestTable { writeStat.setPartitionPath(partition); writeStat.setPath(partition + "/" + fileName); writeStat.setTotalWriteBytes(fileIdInfo.getValue()[1]); + writeStat.setFileSizeInBytes(fileIdInfo.getValue()[1]); writeStats.add(writeStat); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index d86602ea9..6266c3052 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -18,6 +18,14 @@ package org.apache.hudi.sink; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieWriteStat; @@ -30,15 +38,6 @@ import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestUtils; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; -import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; -import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.util.FileUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -276,6 +275,9 @@ public class TestStreamWriteOperatorCoordinator { writeStat.setPartitionPath(partitionPath); writeStat.setFileId("fileId123"); writeStat.setPath("path123"); + writeStat.setFileSizeInBytes(123); + writeStat.setTotalWriteBytes(123); + writeStat.setNumWrites(1); writeStatus.setStat(writeStat); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java new file mode 100644 index 000000000..20982b5cd --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; + +// Sole purpose of this class is to provide access to otherwise API inaccessible from the tests. +// While it's certainly not a great pattern, it would require substantial test restructuring to +// eliminate such access to an internal API, so this is considered acceptable given it's very limited +// scope (w/in the current package) +class SparkRDDWriteClientOverride extends org.apache.hudi.client.SparkRDDWriteClient { + + public SparkRDDWriteClientOverride(HoodieEngineContext context, HoodieWriteConfig clientConfig) { + super(context, clientConfig); + } + + @Override + public void rollbackFailedBootstrap() { + super.rollbackFailedBootstrap(); + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index 86d304968..6b54765a0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -20,7 +20,6 @@ package org.apache.hudi.functional; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.avro.model.HoodieFileStatus; -import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; @@ -253,7 +252,8 @@ public class TestBootstrap extends HoodieClientTestBase { .withBootstrapParallelism(3) .withBootstrapModeSelector(bootstrapModeSelectorClass).build()) .build(); - SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); + + SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config); client.bootstrap(Option.empty()); checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true); @@ -272,7 +272,7 @@ public class TestBootstrap extends HoodieClientTestBase { assertFalse(index.useIndex()); // Run bootstrap again - client = new SparkRDDWriteClient(context, config); + client = new SparkRDDWriteClientOverride(context, config); client.bootstrap(Option.empty()); metaClient.reloadActiveTimeline(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index 6cd3ae333..330b6015b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -20,7 +20,6 @@ package org.apache.hudi.functional; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.avro.model.HoodieFileStatus; -import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; @@ -245,7 +244,8 @@ public class TestOrcBootstrap extends HoodieClientTestBase { .withBootstrapParallelism(3) .withBootstrapModeSelector(bootstrapModeSelectorClass).build()) .build(); - SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); + + SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config); client.bootstrap(Option.empty()); checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants, true); @@ -266,7 +266,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase { assertFalse(index.useIndex()); // Run bootstrap again - client = new SparkRDDWriteClient(context, config); + client = new SparkRDDWriteClientOverride(context, config); client.bootstrap(Option.empty()); metaClient.reloadActiveTimeline();