diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index c879c06cc..8deaeb220 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -839,7 +839,7 @@ public class HoodieWriteClient implements Seriali // Remove interleaving pending compactions before rolling back commits pendingCompactionToRollback.forEach(this::deletePendingCompaction); - List stats = table.rollback(jsc, commitsToRollback); + List stats = table.rollback(jsc, commitsToRollback, true); // cleanup index entries commitsToRollback.forEach(s -> { @@ -1206,8 +1206,9 @@ public class HoodieWriteClient implements Seriali * @param inflightInstant Inflight Compaction Instant * @param table Hoodie Table */ - private void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException { - table.rollback(jsc, ImmutableList.copyOf(new String[] { inflightInstant.getTimestamp() })); + @VisibleForTesting + void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException { + table.rollback(jsc, ImmutableList.copyOf(new String[] { inflightInstant.getTimestamp() }), false); // Revert instant state file table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 346d083a7..af272448b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -347,7 +347,7 @@ public class HoodieCopyOnWriteTable extends Hoodi } @Override - public List rollback(JavaSparkContext jsc, List commits) + public List rollback(JavaSparkContext jsc, List commits, boolean deleteInstants) throws IOException { String actionType = metaClient.getCommitActionType(); HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); @@ -375,13 +375,29 @@ public class HoodieCopyOnWriteTable extends Hoodi // clean temporary data files cleanTemporaryDataFiles(jsc); - // Remove the rolled back inflight commits - commits.stream().map(s -> new HoodieInstant(true, actionType, s)) - .forEach(activeTimeline::deleteInflight); - logger.info("Deleted inflight commits " + commits); + // Delete Inflight instants if enabled + deleteInflightInstants(deleteInstants, activeTimeline, + commits.stream().map(s -> new HoodieInstant(true, actionType, s)).collect(Collectors.toList())); return stats; } + /** + * Delete Inflight instants if enabled + * @param deleteInstants Enable Deletion of Inflight instants + * @param activeTimeline Hoodie active timeline + * @param instantsToBeDeleted Instants to be deleted + */ + protected static void deleteInflightInstants(boolean deleteInstants, HoodieActiveTimeline activeTimeline, + List instantsToBeDeleted) { + // Remove the rolled back inflight commits + if (deleteInstants) { + instantsToBeDeleted.forEach(activeTimeline::deleteInflight); + logger.info("Deleted inflight commits " + instantsToBeDeleted); + } else { + logger.warn("Rollback finished without deleting inflight instant files. Instants=" + instantsToBeDeleted); + } + } + /** * Finalize the written data files * diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index de1e563e9..29280a49e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -165,7 +165,7 @@ public class HoodieMergeOnReadTable extends } @Override - public List rollback(JavaSparkContext jsc, List commits) + public List rollback(JavaSparkContext jsc, List commits, boolean deleteInstants) throws IOException { //At the moment, MOR table type does not support nested rollbacks @@ -274,11 +274,13 @@ public class HoodieMergeOnReadTable extends return hoodieRollbackStats; }).collect(Collectors.toList())).flatMap(List::iterator).filter(Objects::nonNull).collect(); - commitsAndCompactions.entrySet().stream().map( - entry -> new HoodieInstant(true, entry.getValue().getAction(), - entry.getValue().getTimestamp())).forEach(this.getActiveTimeline()::deleteInflight); - logger - .debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); + // Delete Inflight instants if enabled + deleteInflightInstants(deleteInstants, this.getActiveTimeline(), + commitsAndCompactions.entrySet().stream().map( + entry -> new HoodieInstant(true, entry.getValue().getAction(), entry.getValue().getTimestamp())) + .collect(Collectors.toList())); + + logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); return allRollbackStats; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 2f51cbc8c..749fc6cb2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -251,9 +251,9 @@ public abstract class HoodieTable implements Seri /** * Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) * Atomically unpublish this commit (2) clean indexing data (3) clean new generated parquet files - * / log blocks (4) Finally, delete ..commit or ..inflight file + * / log blocks (4) Finally, delete ..commit or ..inflight file if deleteInstants = true */ - public abstract List rollback(JavaSparkContext jsc, List commits) + public abstract List rollback(JavaSparkContext jsc, List commits, boolean deleteInstants) throws IOException; /** diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java index dca6d9d69..0df9da859 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java @@ -33,6 +33,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.CompactionUtils; @@ -51,6 +52,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.junit.Test; @@ -78,6 +80,64 @@ public class TestAsyncCompaction extends TestHoodieClientBase { super.tearDown(); } + @Test + public void testRollbackForInflightCompaction() throws Exception { + // Rollback inflight compaction + HoodieWriteConfig cfg = getConfig(false); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + + String firstInstantTime = "001"; + String secondInstantTime = "004"; + String compactionInstantTime = "005"; + + int numRecs = 2000; + + List records = dataGen.generateInserts(firstInstantTime, numRecs); + runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), + records, cfg, true, new ArrayList<>()); + + // Schedule compaction but do not run them + scheduleCompaction(compactionInstantTime, client, cfg); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + + HoodieInstant pendingCompactionInstant = + metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get(); + assertTrue("Pending Compaction instant has expected instant time", + pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); + assertTrue("Pending Compaction instant has expected state", + pendingCompactionInstant.getState().equals(State.REQUESTED)); + + moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg); + + // Reload and rollback inflight compaction + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + hoodieTable.rollback(jsc, Arrays.asList(compactionInstantTime), false); + + client.rollbackInflightCompaction( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + pendingCompactionInstant = metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline() + .getInstants().findFirst().get(); + assertEquals("compaction", pendingCompactionInstant.getAction()); + assertEquals(State.REQUESTED, pendingCompactionInstant.getState()); + assertEquals(compactionInstantTime, pendingCompactionInstant.getTimestamp()); + + // We indirectly test for the race condition where a inflight instant was first deleted then created new. Every + // time this happens, the pending compaction instant file in Hoodie Meta path becomes an empty file (Note: Hoodie + // reads compaction plan from aux path which is untouched). TO test for regression, we simply get file status + // and look at the file size + FileStatus fstatus = + metaClient.getFs().getFileStatus(new Path(metaClient.getMetaPath(), pendingCompactionInstant.getFileName())); + assertTrue(fstatus.getLen() > 0); + } + + private Path getInstantPath(HoodieTableMetaClient metaClient, String timestamp, String action, State state) { + HoodieInstant instant = new HoodieInstant(state, action, timestamp); + return new Path(metaClient.getMetaPath(), instant.getFileName()); + } + @Test public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { // Rollback inflight ingestion when there is pending compaction diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index f3a9616dd..2ef3f7960 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -634,14 +634,14 @@ public class TestCleaner extends TestHoodieClientBase { HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - table.rollback(jsc, Collections.emptyList()); + table.rollback(jsc, Collections.emptyList(), true); assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles()); config = HoodieWriteConfig.newBuilder().withPath(basePath).withUseTempFolderCopyOnWriteForCreate(true) .withUseTempFolderCopyOnWriteForMerge(false).build(); table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - table.rollback(jsc, Collections.emptyList()); + table.rollback(jsc, Collections.emptyList(), true); assertEquals("All temp files are deleted.", 0, getTotalTempFiles()); }