diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index fccb992ef..1338bfc46 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -228,8 +228,10 @@ public class RollbackUtils { // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. // But the index (global) might store the baseCommit of the base and not the requested, hence get the // baseCommit always by listing the file slice - Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath) - .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); + // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices() + Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), + true).collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); + return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { // Filter out stats without prevCommit since they are all inserts diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 1e68a9458..759bcc05c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -630,6 +631,122 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLazyRollbackOfFailedCommit(boolean rollbackUsingMarkers) throws Exception { + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + HoodieWriteConfig cfg = getWriteConfig(true, rollbackUsingMarkers); + HoodieWriteConfig autoCommitFalseCfg = getWriteConfig(false, rollbackUsingMarkers); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + // commit 1 + List records = insertRecords(client, dataGen, "001"); + // commit 2 to create log files + List updates1 = updateRecords(client, dataGen, "002", records, metaClient, cfg, true); + + // trigger a inflight commit 3 which will be later be rolled back explicitly. + SparkRDDWriteClient autoCommitFalseClient = getHoodieWriteClient(autoCommitFalseCfg); + List updates2 = updateRecords(autoCommitFalseClient, dataGen, "003", records, metaClient, autoCommitFalseCfg, false); + + // commit 4 successful (mimic multi-writer scenario) + List updates3 = updateRecords(client, dataGen, "004", records, metaClient, cfg, false); + + // trigger compaction + long numLogFiles = getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen); + doCompaction(autoCommitFalseClient, metaClient, cfg, numLogFiles); + long numLogFilesAfterCompaction = getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen); + assertNotEquals(numLogFiles, numLogFilesAfterCompaction); + + // rollback 3rd commit. + client.rollback("003"); + long numLogFilesAfterRollback = getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen); + // lazy rollback should have added the rollback block to previous file slice and not the latest. And so the latest slice's log file count should + // remain the same. + assertEquals(numLogFilesAfterRollback, numLogFilesAfterCompaction); + } + + private List insertRecords(SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, String commitTime) { + /* + * Write 1 (only inserts, written as base file) + */ + client.startCommitWithTime(commitTime); + + List records = dataGen.generateInserts(commitTime, 20); + JavaRDD writeRecords = jsc().parallelize(records, 1); + + List statuses = client.upsert(writeRecords, commitTime).collect(); + assertNoWriteErrors(statuses); + return records; + } + + private List updateRecords(SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, String commitTime, + List records, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, + boolean assertLogFiles) throws IOException { + client.startCommitWithTime(commitTime); + + records = dataGen.generateUpdates(commitTime, records); + JavaRDD writeRecords = jsc().parallelize(records, 1); + List statuses = client.upsert(writeRecords, commitTime).collect(); + assertNoWriteErrors(statuses); + + if (assertLogFiles) { + HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient); + table.getHoodieView().sync(); + TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView(); + + long numLogFiles = 0; + for (String partitionPath : dataGen.getPartitionPaths()) { + List allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); + assertEquals(1, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count()); + assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0)); + numLogFiles += allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + } + assertTrue(numLogFiles > 0); + } + return records; + } + + private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, long numLogFiles) throws IOException { + // Do a compaction + String instantTime = client.scheduleCompaction(Option.empty()).get().toString(); + JavaRDD writeStatuses = (JavaRDD) client.compact(instantTime); + + metaClient.reloadActiveTimeline(); + HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient); + String extension = table.getBaseFileExtension(); + assertEquals(numLogFiles, writeStatuses.map(status -> status.getStat().getPath().contains(extension)).count()); + assertEquals(numLogFiles, writeStatuses.count()); + client.commitCompaction(instantTime, writeStatuses, Option.empty()); + return numLogFiles; + } + + private long getNumLogFilesInLatestFileSlice(HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, HoodieTestDataGenerator dataGen) { + metaClient.reloadActiveTimeline(); + HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient); + table.getHoodieView().sync(); + TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView(); + + long numLogFiles = 0; + for (String partitionPath : dataGen.getPartitionPaths()) { + List allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); + numLogFiles += allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count(); + } + return numLogFiles; + } + + private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean rollbackUsingMarkers) { + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(autoCommit).withRollbackUsingMarkers(rollbackUsingMarkers) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024L) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(3) + .withAutoClean(false) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()); + return cfgBuilder.build(); + } + private SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) { try { return new HoodieTableFileSystemView(metaClient,