From 29574af239ae4596034a17999484ed069ec7123f Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 29 Oct 2021 12:14:39 -0400 Subject: [PATCH] [HUDI-2573] Fixing double locking with multi-writers (#3827) - There are two code paths, where we are taking double locking. this was added as part of adding data table locks to update metadata table. Fixing those flows to avoid taking locks if a parent transaction already acquired a lock. --- .../client/AbstractHoodieWriteClient.java | 80 ++++++++--- .../org/apache/hudi/table/HoodieTable.java | 7 +- .../action/clean/CleanActionExecutor.java | 22 ++- .../CopyOnWriteRestoreActionExecutor.java | 1 + .../MergeOnReadRestoreActionExecutor.java | 1 + .../rollback/BaseRollbackActionExecutor.java | 26 ++-- .../CopyOnWriteRollbackActionExecutor.java | 10 +- .../MergeOnReadRollbackActionExecutor.java | 10 +- .../table/HoodieFlinkCopyOnWriteTable.java | 7 +- .../table/HoodieFlinkMergeOnReadTable.java | 6 +- .../table/HoodieJavaCopyOnWriteTable.java | 7 +- .../hudi/client/SparkRDDWriteClient.java | 3 +- .../table/HoodieSparkCopyOnWriteTable.java | 10 +- .../table/HoodieSparkMergeOnReadTable.java | 5 +- .../functional/TestHoodieBackedMetadata.java | 133 +++++++++++++++++- .../org/apache/hudi/table/TestCleaner.java | 2 +- ...TestCopyOnWriteRollbackActionExecutor.java | 6 +- ...TestMergeOnReadRollbackActionExecutor.java | 6 +- 18 files changed, 281 insertions(+), 61 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index b65060fa2..3e6b7ab49 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -501,7 +501,7 @@ public abstract class AbstractHoodieWriteClient rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false); + Option rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, + commitInstantOpt.get(), false); if (rollbackPlanOption.isPresent()) { // execute rollback - HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true); + HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, + skipLocking); if (timerContext != null) { long durationInMs = metrics.getDurationInMs(timerContext.stop()); metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); @@ -644,7 +652,19 @@ public abstract class AbstractHoodieWriteClient rollbackFailedWrites()); - HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime); + HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking)); + HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime, skipLocking); if (timerContext != null && metadata != null) { long durationMs = metrics.getDurationInMs(timerContext.stop()); metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); @@ -675,7 +698,17 @@ public abstract class AbstractHoodieWriteClient table = createTable(config, hadoopConf); - List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy()); - rollbackFailedWrites(instantsToRollback); + List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), + Option.empty()); + rollbackFailedWrites(instantsToRollback, skipLocking); return true; } - protected void rollbackFailedWrites(List instantsToRollback) { + protected void rollbackFailedWrites(List instantsToRollback, boolean skipLocking) { for (String instant : instantsToRollback) { if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { rollbackFailedBootstrap(); break; } else { - rollback(instant); + rollback(instant, skipLocking); } } // Delete any heartbeat files for already rolled back commits @@ -822,11 +864,17 @@ public abstract class AbstractHoodieWriteClient getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy) { + protected List getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option curInstantTime) { Stream inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient) .getReverseOrderedInstants(); if (cleaningPolicy.isEager()) { - return inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> { + if (curInstantTime.isPresent()) { + return !entry.equals(curInstantTime.get()); + } else { + return true; + } + }).collect(Collectors.toList()); } else if (cleaningPolicy.isLazy()) { return inflightInstantsStream.filter(instant -> { try { @@ -975,7 +1023,7 @@ public abstract class AbstractHoodieWriteClient table) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); table.scheduleRollback(context, commitTime, inflightInstant, false); - table.rollback(context, commitTime, inflightInstant, false); + table.rollback(context, commitTime, inflightInstant, false, false); table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 6de40a7b5..a6c14e6d2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -424,7 +424,7 @@ public abstract class HoodieTable implem * * @return information on cleaned file slices */ - public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime); + public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking); /** * Schedule rollback for the instant time. @@ -452,7 +452,8 @@ public abstract class HoodieTable implem public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, - boolean deleteInstants); + boolean deleteInstants, + boolean skipLocking); /** * Create a savepoint at the specified instant, so that the table can be restored @@ -480,7 +481,7 @@ public abstract class HoodieTable implem public void rollbackInflightCompaction(HoodieInstant inflightInstant) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); scheduleRollback(context, commitTime, inflightInstant, false); - rollback(context, commitTime, inflightInstant, false); + rollback(context, commitTime, inflightInstant, false, false); getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 1b229ca2f..a445fd3cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -60,10 +60,16 @@ public class CleanActionExecutor extends private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class); private final TransactionManager txnManager; + private final boolean skipLocking; public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime) { + this(context, config, table, instantTime, false); + } + + public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, boolean skipLocking) { super(context, config, table, instantTime); this.txnManager = new TransactionManager(config, table.getMetaClient().getFs()); + this.skipLocking = skipLocking; } static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { @@ -214,11 +220,17 @@ public class CleanActionExecutor extends * @param cleanMetadata instance of {@link HoodieCleanMetadata} to be applied to metadata. */ private void writeMetadata(HoodieCleanMetadata cleanMetadata) { - try { - this.txnManager.beginTransaction(Option.empty(), Option.empty()); - writeTableMetadata(cleanMetadata); - } finally { - this.txnManager.endTransaction(); + if (config.isMetadataTableEnabled()) { + try { + if (!skipLocking) { + this.txnManager.beginTransaction(Option.empty(), Option.empty()); + } + writeTableMetadata(cleanMetadata); + } finally { + if (!skipLocking) { + this.txnManager.endTransaction(); + } + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java index 2e3b1483e..1116ef9a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java @@ -58,6 +58,7 @@ public class CopyOnWriteRestoreActionExecutor table, String instantTime, HoodieInstant instantToRollback, - boolean deleteInstants) { + boolean deleteInstants, + boolean skipLocking) { this(context, config, table, instantTime, instantToRollback, deleteInstants, - false, config.shouldRollbackUsingMarkers()); + false, config.shouldRollbackUsingMarkers(), skipLocking); } public BaseRollbackActionExecutor(HoodieEngineContext context, @@ -77,7 +79,8 @@ public abstract class BaseRollbackActionExecutor table, String instantTime, HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); + boolean deleteInstants, + boolean skipLocking) { + super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking); } public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context, @@ -54,8 +55,9 @@ public class CopyOnWriteRollbackActionExecutor table, String instantTime, HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); + boolean deleteInstants, + boolean skipLocking) { + super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking); } public MergeOnReadRollbackActionExecutor(HoodieEngineContext context, @@ -54,8 +55,9 @@ public class MergeOnReadRollbackActionExecutor } @Override - public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { + public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) { return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); } @Override - public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, + boolean deleteInstants, boolean skipLocking) { + return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index b165c844c..56a14da4c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -119,8 +119,10 @@ public class HoodieFlinkMergeOnReadTable } @Override - public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, + boolean deleteInstants, boolean skipLocking) { + return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, + skipLocking).execute(); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 99cf413a3..9d96ca1de 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -192,7 +192,7 @@ public class HoodieJavaCopyOnWriteTable extends H @Override public HoodieCleanMetadata clean(HoodieEngineContext context, - String cleanInstantTime) { + String cleanInstantTime, boolean skipLocking) { return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); } @@ -200,9 +200,10 @@ public class HoodieJavaCopyOnWriteTable extends H public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, - boolean deleteInstants) { + boolean deleteInstants, + boolean skipLocking) { return new CopyOnWriteRollbackActionExecutor( - context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a1a5c8552..4100b0463 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -424,7 +424,7 @@ public class SparkRDDWriteClient extends this.txnManager.beginTransaction(); try { // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits - this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER)); + this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)), true); new UpgradeDowngrade( metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.current(), instantTime); @@ -434,6 +434,7 @@ public class SparkRDDWriteClient extends } else { upgradeDowngrade.run(HoodieTableVersion.current(), instantTime); } + metaClient.reloadActiveTimeline(); } metaClient.validateTableProperties(config.getProps(), operationType); return getTableAndInitCtx(metaClient, operationType, instantTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 6f5611f88..e458d845a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -256,13 +256,15 @@ public class HoodieSparkCopyOnWriteTable } @Override - public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { - return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); + public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) { + return new CleanActionExecutor(context, config, this, cleanInstantTime, skipLocking).execute(); } @Override - public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, + boolean deleteInstants, boolean skipLocking) { + return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, + deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index 30984e010..d0bc96924 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -159,8 +159,9 @@ public class HoodieSparkMergeOnReadTable extends public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, - boolean deleteInstants) { - return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + boolean deleteInstants, + boolean skipLocking) { + return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index e0c61e157..de757a080 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -264,7 +264,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { @EnumSource(HoodieTableType.class) public void testInsertUpsertCluster(HoodieTableType tableType) throws Exception { init(tableType); - doWriteOperation(testTable,"0000001", INSERT); + doWriteOperation(testTable, "0000001", INSERT); doWriteOperation(testTable, "0000002"); doClusterAndValidate(testTable, "0000003"); if (tableType == MERGE_ON_READ) { @@ -638,6 +638,51 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(writeClients[0]); } + /** + * Tests that when inline cleaning is enabled and with auto commit set to true, there is no double locking. + * bcoz, auto clean is triggered within post commit which is already happening within a lock. + * + * @throws Exception + */ + @Test + public void testMultiWriterForDoubleLocking() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4).build()) + .withAutoCommit(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withProperties(properties) + .build(); + + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig); + String partitionPath = dataGen.getPartitionPaths()[0]; + for (int j = 0; j < 6; j++) { + String newCommitTime = "000000" + j; + List records = dataGen.generateInsertsForPartition(newCommitTime, 100, partitionPath); + writeClient.startCommitWithTime(newCommitTime); + JavaRDD writeStatuses = writeClient.insert(jsc.parallelize(records, 1), newCommitTime); + writeClient.commit(newCommitTime, writeStatuses); + } + + // Ensure all commits were synced to the Metadata Table + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + LOG.warn("total commits in metadata table " + metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants()); + + // 6 commits and 2 cleaner commits. + assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + // Validation + validateMetadata(writeClient); + } + /** * Lets say clustering commit succeeded in metadata table, but failed before committing to datatable. * Next time, when clustering kicks in, hudi will rollback pending clustering and re-attempt the clustering with same instant time. @@ -924,6 +969,92 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); } + /** + * When table needs to be upgraded and when multi writer is enabled, hudi rollsback partial commits. Upgrade itself is happening + * within a lock and hence rollback should not lock again. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, InterruptedException { + init(HoodieTableType.COPY_ON_WRITE, false); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + // Perform a commit. This should bootstrap the metadata table with latest version. + List records; + JavaRDD writeStatuses; + String commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withProperties(properties) + .build(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + records = dataGen.generateInserts(commitTimestamp, 5); + client.startCommitWithTime(commitTimestamp); + writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); + client.commit(commitTimestamp, writeStatuses); + } + + // Metadata table should have been bootstrapped + assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); + FileStatus oldStatus = fs.getFileStatus(new Path(metadataTableBasePath)); + + // trigger partial commit + metaClient.reloadActiveTimeline(); + commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + records = dataGen.generateInserts(commitTimestamp, 5); + client.startCommitWithTime(commitTimestamp); + writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); + } + + // set hoodie.table.version to 2 in hoodie.properties file + changeTableVersion(HoodieTableVersion.TWO); + writeConfig = getWriteConfigBuilder(true, true, false) + .withRollbackUsingMarkers(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withProperties(properties) + .build(); + + // With next commit the table should be deleted (as part of upgrade) and partial commit should be rolled back. + metaClient.reloadActiveTimeline(); + commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + records = dataGen.generateInserts(commitTimestamp, 5); + client.startCommitWithTime(commitTimestamp); + writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); + assertNoWriteErrors(writeStatuses.collect()); + } + assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); + + // With next commit the table should be re-bootstrapped (currently in the constructor. To be changed) + commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + records = dataGen.generateInserts(commitTimestamp, 5); + client.startCommitWithTime(commitTimestamp); + writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); + assertNoWriteErrors(writeStatuses.collect()); + } + assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); + + initMetaClient(); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); + assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); + FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath)); + assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime()); + } + /** * Test various error scenarios. */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 063b55686..72f6a0795 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -1308,7 +1308,7 @@ public class TestCleaner extends HoodieClientTestBase { metaClient.reloadActiveTimeline(); HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"); table.scheduleRollback(context, "001", rollbackInstant, false); - table.rollback(context, "001", rollbackInstant, true); + table.rollback(context, "001", rollbackInstant, true, false); final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length; assertEquals(0, numTempFilesAfter, "All temp files are deleted."); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 2e93602c4..3225dcd04 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -88,7 +88,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false); HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); - CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true); + CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true, + false); List hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback(rollbackPlan); // assert hoodieRollbackStats @@ -169,7 +170,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false); HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); - CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false); + CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false, + false); Map rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata(); //3. assert the rollback stat diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 06f70f21c..38be873e5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -99,7 +99,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT table, "003", rollBackInstant, - true); + true, + false); //3. assert the rollback stat Map rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata(); assertEquals(2, rollbackMetadata.size()); @@ -148,7 +149,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT rollBackInstant, true, true, - true).execute(); + true, + false).execute(); }); }