Fixing bug introducted in rollback for MOR table type with inserts into log files
This commit is contained in:
committed by
vinoth chandar
parent
a6fe96fdfe
commit
34ab54a9d3
@@ -54,6 +54,7 @@ import java.util.Iterator;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@@ -216,11 +217,14 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
|
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
|
||||||
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
|
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
|
||||||
|
|
||||||
// In case all data was inserts and the commit failed, there is no partition stats
|
// In case all data was inserts and the commit failed, delete the file belonging to that commit
|
||||||
if (commitMetadata.getPartitionToWriteStats().size() == 0) {
|
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
|
||||||
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
final Set<String> deletedFiles = filesToDeletedStatus.entrySet().stream()
|
||||||
|
.map(entry -> {
|
||||||
|
Path filePath = entry.getKey().getPath();
|
||||||
|
return FSUtils.getFileIdFromFilePath(filePath);
|
||||||
|
}).collect(Collectors.toSet());
|
||||||
// append rollback blocks for updates
|
// append rollback blocks for updates
|
||||||
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
|
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
|
||||||
// This needs to be done since GlobalIndex at the moment does not store the latest commit time
|
// This needs to be done since GlobalIndex at the moment does not store the latest commit time
|
||||||
@@ -231,16 +235,9 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
.filter(wStat -> {
|
.filter(wStat -> {
|
||||||
if (wStat != null
|
if (wStat != null
|
||||||
&& wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
|
&& wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
|
||||||
&& wStat.getPrevCommit() != null) {
|
&& wStat.getPrevCommit() != null && !deletedFiles.contains(wStat.getFileId())) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
// we do not know fileIds for inserts (first inserts are either log files or parquet files),
|
|
||||||
// delete all files for the corresponding failed commit, if present (same as COW)
|
|
||||||
try {
|
|
||||||
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
|
|
||||||
} catch (IOException io) {
|
|
||||||
throw new UncheckedIOException(io);
|
|
||||||
}
|
|
||||||
return false;
|
return false;
|
||||||
})
|
})
|
||||||
.forEach(wStat -> {
|
.forEach(wStat -> {
|
||||||
|
|||||||
@@ -51,6 +51,8 @@ import com.uber.hoodie.index.HoodieIndex.IndexType;
|
|||||||
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
|
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -389,10 +391,10 @@ public class TestMergeOnReadTable {
|
|||||||
@Test
|
@Test
|
||||||
public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
|
public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
|
||||||
|
|
||||||
HoodieWriteConfig cfg = getConfig(true);
|
HoodieWriteConfig cfg = getConfig(false);
|
||||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||||
|
|
||||||
// Test delta commit rollback (with all log files)
|
// Test delta commit rollback
|
||||||
/**
|
/**
|
||||||
* Write 1 (only inserts)
|
* Write 1 (only inserts)
|
||||||
*/
|
*/
|
||||||
@@ -403,7 +405,9 @@ public class TestMergeOnReadTable {
|
|||||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||||
|
|
||||||
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
|
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
|
||||||
|
client.commit(newCommitTime, writeStatusJavaRDD);
|
||||||
|
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
@@ -428,56 +432,99 @@ public class TestMergeOnReadTable {
|
|||||||
dataFilesToRead.findAny().isPresent());
|
dataFilesToRead.findAny().isPresent());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write 2 (updates)
|
* Write 2 (inserts + updates - testing failed delta commit)
|
||||||
*/
|
*/
|
||||||
newCommitTime = "002";
|
final String commitTime1 = "002";
|
||||||
client.startCommitWithTime(newCommitTime);
|
// WriteClient with custom config (disable small file handling)
|
||||||
|
client = new HoodieWriteClient(jsc, HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||||
|
.withSchema(TRIP_EXAMPLE_SCHEMA)
|
||||||
|
.withParallelism(2, 2)
|
||||||
|
.withAutoCommit(false).withAssumeDatePartitioning(true).withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||||
|
.compactionSmallFileSize(1 * 1024).withInlineCompaction(false)
|
||||||
|
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||||
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1 * 1024).build())
|
||||||
|
.forTable("test-trip-table").build());
|
||||||
|
client.startCommitWithTime(commitTime1);
|
||||||
|
|
||||||
records = dataGen.generateUpdates(newCommitTime, records);
|
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
|
||||||
|
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
|
||||||
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
|
||||||
// Verify there are no errors
|
|
||||||
assertNoWriteErrors(statuses);
|
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
|
||||||
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
|
||||||
assertTrue(deltaCommit.isPresent());
|
|
||||||
assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp());
|
|
||||||
|
|
||||||
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
|
||||||
assertFalse(commit.isPresent());
|
|
||||||
|
|
||||||
List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||||
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||||
|
|
||||||
assertEquals(recordsRead.size(), 200);
|
assertEquals(recordsRead.size(), 200);
|
||||||
|
|
||||||
// Test delta commit rollback
|
statuses = client.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect();
|
||||||
client.rollback(newCommitTime);
|
// Verify there are no errors
|
||||||
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
|
// Test failed delta commit rollback
|
||||||
|
client.rollback(commitTime1);
|
||||||
|
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||||
|
// After rollback, there should be no parquet file with the failed commit time
|
||||||
|
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
|
||||||
|
.contains(commitTime1)).collect(Collectors.toList()).size(), 0);
|
||||||
|
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||||
|
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||||
|
assertEquals(recordsRead.size(), 200);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write 3 (inserts + updates - testing successful delta commit)
|
||||||
|
*/
|
||||||
|
final String commitTime2 = "002";
|
||||||
|
client = new HoodieWriteClient(jsc, cfg);
|
||||||
|
client.startCommitWithTime(commitTime2);
|
||||||
|
|
||||||
|
copyOfRecords = new ArrayList<>(records);
|
||||||
|
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
|
||||||
|
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
|
||||||
|
|
||||||
|
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||||
|
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||||
|
assertEquals(recordsRead.size(), 200);
|
||||||
|
|
||||||
|
writeStatusJavaRDD = client.upsert(writeRecords, commitTime2);
|
||||||
|
client.commit(commitTime2, writeStatusJavaRDD);
|
||||||
|
statuses = writeStatusJavaRDD.collect();
|
||||||
|
// Verify there are no errors
|
||||||
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
|
// Test successful delta commit rollback
|
||||||
|
client.rollback(commitTime2);
|
||||||
|
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||||
|
// After rollback, there should be no parquet file with the failed commit time
|
||||||
|
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
|
||||||
|
.contains(commitTime2)).collect(Collectors.toList()).size(), 0);
|
||||||
|
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
|
||||||
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||||
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||||
|
// check that the number of records read is still correct after rollback operation
|
||||||
assertEquals(recordsRead.size(), 200);
|
assertEquals(recordsRead.size(), 200);
|
||||||
|
|
||||||
//Test compaction commit rollback
|
// Test compaction commit rollback
|
||||||
/**
|
/**
|
||||||
* Write 2 (updates)
|
* Write 4 (updates)
|
||||||
*/
|
*/
|
||||||
newCommitTime = "003";
|
newCommitTime = "003";
|
||||||
client.startCommitWithTime(newCommitTime);
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
records = dataGen.generateUpdates(newCommitTime, 400);
|
records = dataGen.generateUpdates(newCommitTime, records);
|
||||||
|
|
||||||
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
|
||||||
|
client.commit(newCommitTime, writeStatusJavaRDD);
|
||||||
|
statuses = writeStatusJavaRDD.collect();
|
||||||
|
// Verify there are no errors
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
|
|
||||||
String compactionCommit = client.startCompaction();
|
String compactionCommit = client.startCompaction();
|
||||||
client.compact(compactionCommit);
|
JavaRDD<WriteStatus> writeStatus = client.compact(compactionCommit);
|
||||||
|
client.commitCompaction(compactionCommit, writeStatus);
|
||||||
|
|
||||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
|
|||||||
@@ -213,6 +213,16 @@ public class FSUtils {
|
|||||||
return matcher.group(1);
|
return matcher.group(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the file is a parquet file of a log file. Then get the fileId appropriately.
|
||||||
|
*/
|
||||||
|
public static String getFileIdFromFilePath(Path filePath) {
|
||||||
|
if (FSUtils.isLogFile(filePath)) {
|
||||||
|
return FSUtils.getFileIdFromLogPath(filePath);
|
||||||
|
}
|
||||||
|
return FSUtils.getFileId(filePath.getName());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the first part of the file name in the log file. That will be the fileId. Log file do not
|
* Get the first part of the file name in the log file. That will be the fileId. Log file do not
|
||||||
* have commitTime in the file name.
|
* have commitTime in the file name.
|
||||||
|
|||||||
Reference in New Issue
Block a user