1
0

[HUDI-2841] Fixing lazy rollback for MOR with list based strategy (#4110)

This commit is contained in:
Sivabalan Narayanan
2021-11-25 16:06:04 -05:00
committed by GitHub
parent 6a0f079866
commit 8e1379384a
2 changed files with 121 additions and 2 deletions

View File

@@ -228,8 +228,10 @@ public class RollbackUtils {
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the base and not the requested, hence get the
// baseCommit always by listing the file slice
Map<String, String> fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
// With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices()
Map<String, String> fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(),
true).collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -630,6 +631,122 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLazyRollbackOfFailedCommit(boolean rollbackUsingMarkers) throws Exception {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
HoodieWriteConfig cfg = getWriteConfig(true, rollbackUsingMarkers);
HoodieWriteConfig autoCommitFalseCfg = getWriteConfig(false, rollbackUsingMarkers);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
// commit 1
List<HoodieRecord> records = insertRecords(client, dataGen, "001");
// commit 2 to create log files
List<HoodieRecord> updates1 = updateRecords(client, dataGen, "002", records, metaClient, cfg, true);
// trigger a inflight commit 3 which will be later be rolled back explicitly.
SparkRDDWriteClient autoCommitFalseClient = getHoodieWriteClient(autoCommitFalseCfg);
List<HoodieRecord> updates2 = updateRecords(autoCommitFalseClient, dataGen, "003", records, metaClient, autoCommitFalseCfg, false);
// commit 4 successful (mimic multi-writer scenario)
List<HoodieRecord> updates3 = updateRecords(client, dataGen, "004", records, metaClient, cfg, false);
// trigger compaction
long numLogFiles = getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen);
doCompaction(autoCommitFalseClient, metaClient, cfg, numLogFiles);
long numLogFilesAfterCompaction = getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen);
assertNotEquals(numLogFiles, numLogFilesAfterCompaction);
// rollback 3rd commit.
client.rollback("003");
long numLogFilesAfterRollback = getNumLogFilesInLatestFileSlice(metaClient, cfg, dataGen);
// lazy rollback should have added the rollback block to previous file slice and not the latest. And so the latest slice's log file count should
// remain the same.
assertEquals(numLogFilesAfterRollback, numLogFilesAfterCompaction);
}
private List<HoodieRecord> insertRecords(SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, String commitTime) {
/*
* Write 1 (only inserts, written as base file)
*/
client.startCommitWithTime(commitTime);
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, commitTime).collect();
assertNoWriteErrors(statuses);
return records;
}
private List<HoodieRecord> updateRecords(SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, String commitTime,
List<HoodieRecord> records, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg,
boolean assertLogFiles) throws IOException {
client.startCommitWithTime(commitTime);
records = dataGen.generateUpdates(commitTime, records);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, commitTime).collect();
assertNoWriteErrors(statuses);
if (assertLogFiles) {
HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
table.getHoodieView().sync();
TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
assertEquals(1, allSlices.stream().filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count());
assertTrue(allSlices.stream().anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
}
assertTrue(numLogFiles > 0);
}
return records;
}
private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, long numLogFiles) throws IOException {
// Do a compaction
String instantTime = client.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> writeStatuses = (JavaRDD<WriteStatus>) client.compact(instantTime);
metaClient.reloadActiveTimeline();
HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
String extension = table.getBaseFileExtension();
assertEquals(numLogFiles, writeStatuses.map(status -> status.getStat().getPath().contains(extension)).count());
assertEquals(numLogFiles, writeStatuses.count());
client.commitCompaction(instantTime, writeStatuses, Option.empty());
return numLogFiles;
}
private long getNumLogFilesInLatestFileSlice(HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, HoodieTestDataGenerator dataGen) {
metaClient.reloadActiveTimeline();
HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);
table.getHoodieView().sync();
TableFileSystemView.SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> allSlices = tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
numLogFiles += allSlices.stream().filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
}
return numLogFiles;
}
private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean rollbackUsingMarkers) {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(autoCommit).withRollbackUsingMarkers(rollbackUsingMarkers)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024L)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(3)
.withAutoClean(false)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build());
return cfgBuilder.build();
}
private SyncableFileSystemView getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) {
try {
return new HoodieTableFileSystemView(metaClient,