From 34ab54a9d3bbb9bc1c405b4a621d1a98c0782cc4 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Mon, 9 Jul 2018 16:58:05 -0700 Subject: [PATCH] Fixing bug introducted in rollback for MOR table type with inserts into log files --- .../hoodie/table/HoodieMergeOnReadTable.java | 21 ++-- .../hoodie/table/TestMergeOnReadTable.java | 101 +++++++++++++----- .../com/uber/hoodie/common/util/FSUtils.java | 10 ++ 3 files changed, 93 insertions(+), 39 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 8ae0684b4..109397363 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -54,6 +54,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -216,11 +217,14 @@ public class HoodieMergeOnReadTable extends final Map filesToDeletedStatus = new HashMap<>(); Map filesToNumBlocksRollback = new HashMap<>(); - // In case all data was inserts and the commit failed, there is no partition stats - if (commitMetadata.getPartitionToWriteStats().size() == 0) { - super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); - } + // In case all data was inserts and the commit failed, delete the file belonging to that commit + super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); + final Set deletedFiles = filesToDeletedStatus.entrySet().stream() + .map(entry -> { + Path filePath = entry.getKey().getPath(); + return FSUtils.getFileIdFromFilePath(filePath); + }).collect(Collectors.toSet()); // append rollback blocks for updates if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { // This needs to be done since GlobalIndex at the moment does not store the latest commit time @@ -231,16 +235,9 @@ public class HoodieMergeOnReadTable extends .filter(wStat -> { if (wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT - && wStat.getPrevCommit() != null) { + && wStat.getPrevCommit() != null && !deletedFiles.contains(wStat.getFileId())) { return true; } - // we do not know fileIds for inserts (first inserts are either log files or parquet files), - // delete all files for the corresponding failed commit, if present (same as COW) - try { - super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); - } catch (IOException io) { - throw new UncheckedIOException(io); - } return false; }) .forEach(wStat -> { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 7b033d121..863ff1998 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -51,6 +51,8 @@ import com.uber.hoodie.index.HoodieIndex.IndexType; import com.uber.hoodie.index.bloom.HoodieBloomIndex; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -389,10 +391,10 @@ public class TestMergeOnReadTable { @Test public void testRollbackWithDeltaAndCompactionCommit() throws Exception { - HoodieWriteConfig cfg = getConfig(true); + HoodieWriteConfig cfg = getConfig(false); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); - // Test delta commit rollback (with all log files) + // Test delta commit rollback /** * Write 1 (only inserts) */ @@ -403,7 +405,9 @@ public class TestMergeOnReadTable { List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); - List statuses = client.upsert(writeRecords, newCommitTime).collect(); + JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + List statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); @@ -428,56 +432,99 @@ public class TestMergeOnReadTable { dataFilesToRead.findAny().isPresent()); /** - * Write 2 (updates) + * Write 2 (inserts + updates - testing failed delta commit) */ - newCommitTime = "002"; - client.startCommitWithTime(newCommitTime); + final String commitTime1 = "002"; + // WriteClient with custom config (disable small file handling) + client = new HoodieWriteClient(jsc, HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withAutoCommit(false).withAssumeDatePartitioning(true).withCompactionConfig(HoodieCompactionConfig.newBuilder() + .compactionSmallFileSize(1 * 1024).withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1 * 1024).build()) + .forTable("test-trip-table").build()); + client.startCommitWithTime(commitTime1); - records = dataGen.generateUpdates(newCommitTime, records); - - statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); - assertTrue(deltaCommit.isPresent()); - assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp()); - - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); - assertFalse(commit.isPresent()); + List copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); + copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); - assertEquals(recordsRead.size(), 200); - // Test delta commit rollback - client.rollback(newCommitTime); + statuses = client.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + // Test failed delta commit rollback + client.rollback(commitTime1); + allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + // After rollback, there should be no parquet file with the failed commit time + Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName() + .contains(commitTime1)).collect(Collectors.toList()).size(), 0); + dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); + assertEquals(recordsRead.size(), 200); + + + /** + * Write 3 (inserts + updates - testing successful delta commit) + */ + final String commitTime2 = "002"; + client = new HoodieWriteClient(jsc, cfg); + client.startCommitWithTime(commitTime2); + + copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords); + copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200)); + + dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); + assertEquals(recordsRead.size(), 200); + + writeStatusJavaRDD = client.upsert(writeRecords, commitTime2); + client.commit(commitTime2, writeStatusJavaRDD); + statuses = writeStatusJavaRDD.collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + // Test successful delta commit rollback + client.rollback(commitTime2); + allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + // After rollback, there should be no parquet file with the failed commit time + Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName() + .contains(commitTime2)).collect(Collectors.toList()).size(), 0); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); - + // check that the number of records read is still correct after rollback operation assertEquals(recordsRead.size(), 200); - //Test compaction commit rollback + // Test compaction commit rollback /** - * Write 2 (updates) + * Write 4 (updates) */ newCommitTime = "003"; client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 400); + records = dataGen.generateUpdates(newCommitTime, records); - statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + statuses = writeStatusJavaRDD.collect(); + // Verify there are no errors assertNoWriteErrors(statuses); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); String compactionCommit = client.startCompaction(); - client.compact(compactionCommit); + JavaRDD writeStatus = client.compact(compactionCommit); + client.commitCompaction(compactionCommit, writeStatus); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 437a8cfd2..f71f6b4cc 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -213,6 +213,16 @@ public class FSUtils { return matcher.group(1); } + /** + * Check if the file is a parquet file of a log file. Then get the fileId appropriately. + */ + public static String getFileIdFromFilePath(Path filePath) { + if (FSUtils.isLogFile(filePath)) { + return FSUtils.getFileIdFromLogPath(filePath); + } + return FSUtils.getFileId(filePath.getName()); + } + /** * Get the first part of the file name in the log file. That will be the fileId. Log file do not * have commitTime in the file name.