diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index b65060fa2..3e6b7ab49 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -501,7 +501,7 @@ public abstract class AbstractHoodieWriteClient rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false); + Option rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, + commitInstantOpt.get(), false); if (rollbackPlanOption.isPresent()) { // execute rollback - HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true); + HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, + skipLocking); if (timerContext != null) { long durationInMs = metrics.getDurationInMs(timerContext.stop()); metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); @@ -644,7 +652,19 @@ public abstract class AbstractHoodieWriteClient rollbackFailedWrites()); - HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime); + HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking)); + HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime, skipLocking); if (timerContext != null && metadata != null) { long durationMs = metrics.getDurationInMs(timerContext.stop()); metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); @@ -675,7 +698,17 @@ public abstract class AbstractHoodieWriteClient table = createTable(config, hadoopConf); - List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy()); - rollbackFailedWrites(instantsToRollback); + List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), + Option.empty()); + rollbackFailedWrites(instantsToRollback, skipLocking); return true; } - protected void rollbackFailedWrites(List instantsToRollback) { + protected void rollbackFailedWrites(List instantsToRollback, boolean skipLocking) { for (String instant : instantsToRollback) { if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { rollbackFailedBootstrap(); break; } else { - rollback(instant); + rollback(instant, skipLocking); } } // Delete any heartbeat files for already rolled back commits @@ -822,11 +864,17 @@ public abstract class AbstractHoodieWriteClient getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy) { + protected List getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option curInstantTime) { Stream inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient) .getReverseOrderedInstants(); if (cleaningPolicy.isEager()) { - return inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> { + if (curInstantTime.isPresent()) { + return !entry.equals(curInstantTime.get()); + } else { + return true; + } + }).collect(Collectors.toList()); } else if (cleaningPolicy.isLazy()) { return inflightInstantsStream.filter(instant -> { try { @@ -975,7 +1023,7 @@ public abstract class AbstractHoodieWriteClient table) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); table.scheduleRollback(context, commitTime, inflightInstant, false); - table.rollback(context, commitTime, inflightInstant, false); + table.rollback(context, commitTime, inflightInstant, false, false); table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 6de40a7b5..a6c14e6d2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -424,7 +424,7 @@ public abstract class HoodieTable implem * * @return information on cleaned file slices */ - public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime); + public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking); /** * Schedule rollback for the instant time. @@ -452,7 +452,8 @@ public abstract class HoodieTable implem public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, - boolean deleteInstants); + boolean deleteInstants, + boolean skipLocking); /** * Create a savepoint at the specified instant, so that the table can be restored @@ -480,7 +481,7 @@ public abstract class HoodieTable implem public void rollbackInflightCompaction(HoodieInstant inflightInstant) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); scheduleRollback(context, commitTime, inflightInstant, false); - rollback(context, commitTime, inflightInstant, false); + rollback(context, commitTime, inflightInstant, false, false); getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 1b229ca2f..a445fd3cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -60,10 +60,16 @@ public class CleanActionExecutor extends private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class); private final TransactionManager txnManager; + private final boolean skipLocking; public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime) { + this(context, config, table, instantTime, false); + } + + public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, boolean skipLocking) { super(context, config, table, instantTime); this.txnManager = new TransactionManager(config, table.getMetaClient().getFs()); + this.skipLocking = skipLocking; } static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException { @@ -214,11 +220,17 @@ public class CleanActionExecutor extends * @param cleanMetadata instance of {@link HoodieCleanMetadata} to be applied to metadata. */ private void writeMetadata(HoodieCleanMetadata cleanMetadata) { - try { - this.txnManager.beginTransaction(Option.empty(), Option.empty()); - writeTableMetadata(cleanMetadata); - } finally { - this.txnManager.endTransaction(); + if (config.isMetadataTableEnabled()) { + try { + if (!skipLocking) { + this.txnManager.beginTransaction(Option.empty(), Option.empty()); + } + writeTableMetadata(cleanMetadata); + } finally { + if (!skipLocking) { + this.txnManager.endTransaction(); + } + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java index 2e3b1483e..1116ef9a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java @@ -58,6 +58,7 @@ public class CopyOnWriteRestoreActionExecutor table, String instantTime, HoodieInstant instantToRollback, - boolean deleteInstants) { + boolean deleteInstants, + boolean skipLocking) { this(context, config, table, instantTime, instantToRollback, deleteInstants, - false, config.shouldRollbackUsingMarkers()); + false, config.shouldRollbackUsingMarkers(), skipLocking); } public BaseRollbackActionExecutor(HoodieEngineContext context, @@ -77,7 +79,8 @@ public abstract class BaseRollbackActionExecutor table, String instantTime, HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); + boolean deleteInstants, + boolean skipLocking) { + super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking); } public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context, @@ -54,8 +55,9 @@ public class CopyOnWriteRollbackActionExecutor table, String instantTime, HoodieInstant commitInstant, - boolean deleteInstants) { - super(context, config, table, instantTime, commitInstant, deleteInstants); + boolean deleteInstants, + boolean skipLocking) { + super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking); } public MergeOnReadRollbackActionExecutor(HoodieEngineContext context, @@ -54,8 +55,9 @@ public class MergeOnReadRollbackActionExecutor } @Override - public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { + public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) { return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); } @Override - public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, + boolean deleteInstants, boolean skipLocking) { + return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index b165c844c..56a14da4c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -119,8 +119,10 @@ public class HoodieFlinkMergeOnReadTable } @Override - public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, + boolean deleteInstants, boolean skipLocking) { + return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, + skipLocking).execute(); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 99cf413a3..9d96ca1de 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -192,7 +192,7 @@ public class HoodieJavaCopyOnWriteTable extends H @Override public HoodieCleanMetadata clean(HoodieEngineContext context, - String cleanInstantTime) { + String cleanInstantTime, boolean skipLocking) { return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); } @@ -200,9 +200,10 @@ public class HoodieJavaCopyOnWriteTable extends H public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, - boolean deleteInstants) { + boolean deleteInstants, + boolean skipLocking) { return new CopyOnWriteRollbackActionExecutor( - context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a1a5c8552..4100b0463 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -424,7 +424,7 @@ public class SparkRDDWriteClient extends this.txnManager.beginTransaction(); try { // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits - this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER)); + this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)), true); new UpgradeDowngrade( metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.current(), instantTime); @@ -434,6 +434,7 @@ public class SparkRDDWriteClient extends } else { upgradeDowngrade.run(HoodieTableVersion.current(), instantTime); } + metaClient.reloadActiveTimeline(); } metaClient.validateTableProperties(config.getProps(), operationType); return getTableAndInitCtx(metaClient, operationType, instantTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 6f5611f88..e458d845a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -256,13 +256,15 @@ public class HoodieSparkCopyOnWriteTable } @Override - public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) { - return new CleanActionExecutor(context, config, this, cleanInstantTime).execute(); + public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) { + return new CleanActionExecutor(context, config, this, cleanInstantTime, skipLocking).execute(); } @Override - public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, + boolean deleteInstants, boolean skipLocking) { + return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, + deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index 30984e010..d0bc96924 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -159,8 +159,9 @@ public class HoodieSparkMergeOnReadTable extends public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, - boolean deleteInstants) { - return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + boolean deleteInstants, + boolean skipLocking) { + return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index e0c61e157..de757a080 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -264,7 +264,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { @EnumSource(HoodieTableType.class) public void testInsertUpsertCluster(HoodieTableType tableType) throws Exception { init(tableType); - doWriteOperation(testTable,"0000001", INSERT); + doWriteOperation(testTable, "0000001", INSERT); doWriteOperation(testTable, "0000002"); doClusterAndValidate(testTable, "0000003"); if (tableType == MERGE_ON_READ) { @@ -638,6 +638,51 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(writeClients[0]); } + /** + * Tests that when inline cleaning is enabled and with auto commit set to true, there is no double locking. + * bcoz, auto clean is triggered within post commit which is already happening within a lock. + * + * @throws Exception + */ + @Test + public void testMultiWriterForDoubleLocking() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4).build()) + .withAutoCommit(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withProperties(properties) + .build(); + + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig); + String partitionPath = dataGen.getPartitionPaths()[0]; + for (int j = 0; j < 6; j++) { + String newCommitTime = "000000" + j; + List records = dataGen.generateInsertsForPartition(newCommitTime, 100, partitionPath); + writeClient.startCommitWithTime(newCommitTime); + JavaRDD writeStatuses = writeClient.insert(jsc.parallelize(records, 1), newCommitTime); + writeClient.commit(newCommitTime, writeStatuses); + } + + // Ensure all commits were synced to the Metadata Table + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + LOG.warn("total commits in metadata table " + metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants()); + + // 6 commits and 2 cleaner commits. + assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + // Validation + validateMetadata(writeClient); + } + /** * Lets say clustering commit succeeded in metadata table, but failed before committing to datatable. * Next time, when clustering kicks in, hudi will rollback pending clustering and re-attempt the clustering with same instant time. @@ -924,6 +969,92 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); } + /** + * When table needs to be upgraded and when multi writer is enabled, hudi rollsback partial commits. Upgrade itself is happening + * within a lock and hence rollback should not lock again. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, InterruptedException { + init(HoodieTableType.COPY_ON_WRITE, false); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + // Perform a commit. This should bootstrap the metadata table with latest version. + List records; + JavaRDD writeStatuses; + String commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withProperties(properties) + .build(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + records = dataGen.generateInserts(commitTimestamp, 5); + client.startCommitWithTime(commitTimestamp); + writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); + client.commit(commitTimestamp, writeStatuses); + } + + // Metadata table should have been bootstrapped + assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); + FileStatus oldStatus = fs.getFileStatus(new Path(metadataTableBasePath)); + + // trigger partial commit + metaClient.reloadActiveTimeline(); + commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + records = dataGen.generateInserts(commitTimestamp, 5); + client.startCommitWithTime(commitTimestamp); + writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); + } + + // set hoodie.table.version to 2 in hoodie.properties file + changeTableVersion(HoodieTableVersion.TWO); + writeConfig = getWriteConfigBuilder(true, true, false) + .withRollbackUsingMarkers(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build()) + .withProperties(properties) + .build(); + + // With next commit the table should be deleted (as part of upgrade) and partial commit should be rolled back. + metaClient.reloadActiveTimeline(); + commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + records = dataGen.generateInserts(commitTimestamp, 5); + client.startCommitWithTime(commitTimestamp); + writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); + assertNoWriteErrors(writeStatuses.collect()); + } + assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); + + // With next commit the table should be re-bootstrapped (currently in the constructor. To be changed) + commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + records = dataGen.generateInserts(commitTimestamp, 5); + client.startCommitWithTime(commitTimestamp); + writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); + assertNoWriteErrors(writeStatuses.collect()); + } + assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); + + initMetaClient(); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); + assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); + FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath)); + assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime()); + } + /** * Test various error scenarios. */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 063b55686..72f6a0795 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -1308,7 +1308,7 @@ public class TestCleaner extends HoodieClientTestBase { metaClient.reloadActiveTimeline(); HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"); table.scheduleRollback(context, "001", rollbackInstant, false); - table.rollback(context, "001", rollbackInstant, true); + table.rollback(context, "001", rollbackInstant, true, false); final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length; assertEquals(0, numTempFilesAfter, "All temp files are deleted."); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 2e93602c4..3225dcd04 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -88,7 +88,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false); HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); - CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true); + CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true, + false); List hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback(rollbackPlan); // assert hoodieRollbackStats @@ -169,7 +170,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false); HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); - CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false); + CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false, + false); Map rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata(); //3. assert the rollback stat diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 06f70f21c..38be873e5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -99,7 +99,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT table, "003", rollBackInstant, - true); + true, + false); //3. assert the rollback stat Map rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata(); assertEquals(2, rollbackMetadata.size()); @@ -148,7 +149,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT rollBackInstant, true, true, - true).execute(); + true, + false).execute(); }); }