diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 8deaeb220..d9f779861 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -34,6 +34,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRollingStat; import com.uber.hoodie.common.model.HoodieRollingStatMetadata; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -503,7 +504,6 @@ public class HoodieWriteClient implements Seriali updateMetadataAndRollingStats(actionType, metadata, stats); - // Finalize write final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); try { @@ -587,6 +587,9 @@ public class HoodieWriteClient implements Seriali if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); + } String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp(); logger.info("Savepointing latest commit " + latestCommit); @@ -611,6 +614,9 @@ public class HoodieWriteClient implements Seriali public boolean savepoint(String commitTime, String user, String comment) { HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); + } Optional cleanInstant = table.getCompletedCleanTimeline().lastInstant(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, @@ -672,6 +678,9 @@ public class HoodieWriteClient implements Seriali public void deleteSavepoint(String savepointTime) { HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + throw new UnsupportedOperationException("Savepointing is not supported or MergeOnRead table types"); + } HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, @@ -696,30 +705,25 @@ public class HoodieWriteClient implements Seriali * Otherwise, async compactor could fail with errors * * @param compactionTime - delete the compaction time - * @return */ - private void deletePendingCompaction(String compactionTime) { + private void deleteRequestedCompaction(String compactionTime) { HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieInstant compactionRequestedInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime); - HoodieInstant compactionInflightInstant = - new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionTime); boolean isCompactionInstantInRequestedState = table.getActiveTimeline().filterPendingCompactionTimeline() .containsInstant(compactionRequestedInstant); - boolean isCompactionInstantInInflightState = table.getActiveTimeline().filterPendingCompactionTimeline() - .containsInstant(compactionInflightInstant); - + HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); + if (commitTimeline.empty() && !commitTimeline + .findInstantsAfter(compactionTime, Integer.MAX_VALUE).empty()) { + throw new HoodieRollbackException( + "Found commits after time :" + compactionTime + ", please rollback greater commits first"); + } if (isCompactionInstantInRequestedState) { activeTimeline.deleteCompactionRequested(compactionRequestedInstant); - } else if (isCompactionInstantInInflightState) { - activeTimeline.revertCompactionInflightToRequested(compactionInflightInstant); - activeTimeline.deleteCompactionRequested(compactionRequestedInstant); } else { - logger.error("No compaction present " + compactionTime); - throw new IllegalArgumentException("No compaction present " + compactionTime); + throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime); } logger.info("Compaction " + compactionTime + " deleted"); } @@ -776,8 +780,55 @@ public class HoodieWriteClient implements Seriali return true; } - private void rollback(List commits) { - if (commits.isEmpty()) { + /** + * NOTE : This action requires all writers (ingest and compact) to a dataset to be stopped before proceeding. + * Revert the (inflight/committed) record changes for all commits after the provided @param. + * Three steps: (1) Atomically unpublish this commit (2) clean indexing data, (3) clean new generated parquet/log + * files and/or append rollback to existing log files. (4) Finally delete .commit, .inflight, .compaction.inflight + * or .compaction.requested file + */ + public void restoreToCommit(final String commitTime) throws HoodieRollbackException { + + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + // Get all the commits on the timeline after the provided commit time + List commitsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants() + .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), commitTime)) + .collect(Collectors.toList()); + // reverse the commits to descending order of commit time + Collections.reverse(commitsToRollback); + commitsToRollback.stream().forEach(instant -> { + switch (instant.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.DELTA_COMMIT_ACTION: + rollback(instant.getTimestamp()); + break; + case HoodieTimeline.COMPACTION_ACTION: + if (instant.isRequested()) { + // TODO : Get file status and create a rollback stat and file + // TODO : Delete the .aux files along with the instant file, okay for now since the archival process will + // delete these files when it does not see a corresponding instant file under .hoodie + deleteRequestedCompaction(instant.getTimestamp()); + logger.info("Deleted pending scheduled compaction " + instant.getTimestamp()); + } else { + rollback(instant.getTimestamp()); + } + break; + default: + throw new IllegalArgumentException("invalid action name " + instant.getAction()); + } + try { + // Ensure unique rollback instants for seconds granularity + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new HoodieRollbackException("unable to rollback instant " + instant, e); + } + }); + } + + private void rollback(List commitsToRollback) { + if (commitsToRollback.isEmpty()) { logger.info("List of commits to rollback is empty"); return; } @@ -797,7 +848,7 @@ public class HoodieWriteClient implements Seriali // Check if any of the commits is a savepoint - do not allow rollback on those commits List savepoints = table.getCompletedSavepointTimeline().getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - commits.forEach(s -> { + commitsToRollback.forEach(s -> { if (savepoints.contains(s)) { throw new HoodieRollbackException( "Could not rollback a savepointed commit. Delete savepoint first before rolling back" @@ -805,11 +856,6 @@ public class HoodieWriteClient implements Seriali } }); - List pendingCompactionToRollback = - commits.stream().filter(pendingCompactions::contains).collect(Collectors.toList()); - List commitsToRollback = - commits.stream().filter(c -> !pendingCompactions.contains(c)).collect(Collectors.toList()); - try { if (commitTimeline.empty() && inflightCommitTimeline.empty()) { // nothing to rollback @@ -836,11 +882,10 @@ public class HoodieWriteClient implements Seriali + ", please rollback greater commits first"); } - // Remove interleaving pending compactions before rolling back commits - pendingCompactionToRollback.forEach(this::deletePendingCompaction); - List stats = table.rollback(jsc, commitsToRollback, true); + logger.info("Deleted inflight commits " + commitsToRollback); + // cleanup index entries commitsToRollback.forEach(s -> { if (!index.rollbackCommit(s)) { @@ -974,8 +1019,6 @@ public class HoodieWriteClient implements Seriali /** * Schedules a new compaction instant - * @param extraMetadata - * @return */ public Optional scheduleCompaction(Optional> extraMetadata) throws IOException { String instantTime = HoodieActiveTimeline.createNewCommitTime(); @@ -986,8 +1029,9 @@ public class HoodieWriteClient implements Seriali /** * Schedules a new compaction instant with passed-in instant time - * @param instantTime Compaction Instant Time - * @param extraMetadata Extra Metadata to be stored + * + * @param instantTime Compaction Instant Time + * @param extraMetadata Extra Metadata to be stored */ public boolean scheduleCompactionAtInstant(String instantTime, Optional> extraMetadata) throws IOException { @@ -1003,8 +1047,8 @@ public class HoodieWriteClient implements Seriali // Committed and pending compaction instants should have strictly lower timestamps List conflictingInstants = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants().filter(instant -> - HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime, - HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList()); + HoodieTimeline.compareTimestamps(instant.getTimestamp(), instantTime, + HoodieTimeline.GREATER_OR_EQUAL)).collect(Collectors.toList()); Preconditions.checkArgument(conflictingInstants.isEmpty(), "Following instants have timestamps >= compactionInstant. Instants :" + conflictingInstants); @@ -1023,9 +1067,8 @@ public class HoodieWriteClient implements Seriali /** * Performs Compaction for the workload stored in instant-time - * @param compactionInstantTime Compaction Instant Time - * @return - * @throws IOException + * + * @param compactionInstantTime Compaction Instant Time */ public JavaRDD compact(String compactionInstantTime) throws IOException { return compact(compactionInstantTime, config.shouldAutoCommit()); @@ -1113,9 +1156,8 @@ public class HoodieWriteClient implements Seriali /** * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time - * @param compactionInstantTime Compaction Instant Time - * @return - * @throws IOException + * + * @param compactionInstantTime Compaction Instant Time */ private JavaRDD compact(String compactionInstantTime, boolean autoCommit) throws IOException { // Create a Hoodie table which encapsulated the commits and files visible @@ -1145,9 +1187,9 @@ public class HoodieWriteClient implements Seriali /** * Perform compaction operations as specified in the compaction commit file * - * @param compactionInstant Compacton Instant time + * @param compactionInstant Compacton Instant time * @param activeTimeline Active Timeline - * @param autoCommit Commit after compaction + * @param autoCommit Commit after compaction * @return RDD of Write Status */ private JavaRDD runCompaction( @@ -1173,11 +1215,11 @@ public class HoodieWriteClient implements Seriali /** * Commit Compaction and track metrics * - * @param compactedStatuses Compaction Write status - * @param table Hoodie Table + * @param compactedStatuses Compaction Write status + * @param table Hoodie Table * @param compactionCommitTime Compaction Commit Time - * @param autoCommit Auto Commit - * @param extraMetadata Extra Metadata to store + * @param autoCommit Auto Commit + * @param extraMetadata Extra Metadata to store */ protected void commitCompaction(JavaRDD compactedStatuses, HoodieTable table, String compactionCommitTime, boolean autoCommit, Optional> extraMetadata) { @@ -1202,13 +1244,14 @@ public class HoodieWriteClient implements Seriali } /** - * Rollback partial compactions + * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file + * * @param inflightInstant Inflight Compaction Instant * @param table Hoodie Table */ @VisibleForTesting void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException { - table.rollback(jsc, ImmutableList.copyOf(new String[] { inflightInstant.getTimestamp() }), false); + table.rollback(jsc, ImmutableList.copyOf(new String[]{inflightInstant.getTimestamp()}), false); // Revert instant state file table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } @@ -1305,4 +1348,5 @@ public class HoodieWriteClient implements Seriali throw new HoodieCommitException("Unable to save rolling stats"); } } + } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index af272448b..c7a387af8 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -16,7 +16,6 @@ package com.uber.hoodie.table; -import com.google.common.collect.Maps; import com.google.common.hash.Hashing; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.avro.model.HoodieCompactionPlan; @@ -203,9 +202,9 @@ public class HoodieCopyOnWriteTable extends Hoodi AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); BoundedInMemoryExecutor wrapper = null; try (ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) - .withConf(getHadoopConf()).build()) { + .withConf(getHadoopConf()).build()) { wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader), - new UpdateHandler(upsertHandle), x -> x); + new UpdateHandler(upsertHandle), x -> x); wrapper.execute(); } catch (Exception e) { throw new HoodieException(e); @@ -313,18 +312,17 @@ public class HoodieCopyOnWriteTable extends Hoodi * Common method used for cleaning out parquet files under a partition path during rollback of a * set of commits */ - protected Map deleteCleanedFiles(String partitionPath, List commits) + protected Map deleteCleanedFiles(Map results, String partitionPath, + PathFilter filter) throws IOException { - Map results = Maps.newHashMap(); - // PathFilter to get all parquet files and log files that need to be deleted - PathFilter filter = (path) -> { - if (path.toString().contains(".parquet")) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commits.contains(fileCommitTime); - } - return false; - }; - deleteCleanedFiles(results, partitionPath, filter); + logger.info("Cleaning path " + partitionPath); + FileSystem fs = getMetaClient().getFs(); + FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), partitionPath), filter); + for (FileStatus file : toBeDeleted) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + logger.info("Delete file " + file.getPath() + "\t" + success); + } return results; } @@ -332,11 +330,18 @@ public class HoodieCopyOnWriteTable extends Hoodi * Common method used for cleaning out parquet files under a partition path during rollback of a * set of commits */ - protected Map deleteCleanedFiles(Map results, String partitionPath, - PathFilter filter) + protected Map deleteCleanedFiles(Map results, List commits, String + partitionPath) throws IOException { logger.info("Cleaning path " + partitionPath); FileSystem fs = getMetaClient().getFs(); + PathFilter filter = (path) -> { + if (path.toString().contains(".parquet")) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commits.contains(fileCommitTime); + } + return false; + }; FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), partitionPath), filter); for (FileStatus file : toBeDeleted) { boolean success = fs.delete(file.getPath(), false); @@ -367,9 +372,10 @@ public class HoodieCopyOnWriteTable extends Hoodi config.shouldAssumeDatePartitioning())) .map((Function) partitionPath -> { // Scan all partitions files with this commit time - Map results = deleteCleanedFiles(partitionPath, commits); + final Map filesToDeletedStatus = new HashMap<>(); + deleteCleanedFiles(filesToDeletedStatus, commits, partitionPath); return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) - .withDeletedFileResults(results).build(); + .withDeletedFileResults(filesToDeletedStatus).build(); }).collect(); // clean temporary data files diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 83c9fb64c..109b440cb 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -52,7 +52,16 @@ import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -168,10 +177,15 @@ public class HoodieMergeOnReadTable extends public List rollback(JavaSparkContext jsc, List commits, boolean deleteInstants) throws IOException { - //At the moment, MOR table type does not support nested rollbacks + // At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental + // feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback + // (commitToRollback). + // NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is + // required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks. if (commits.size() > 1) { - throw new UnsupportedOperationException("Nested Rollbacks are not supported"); + throw new UnsupportedOperationException("Bulk Nested Rollbacks are not supported"); } + // Atomically un-publish all non-inflight commits Map commitsAndCompactions = this.getActiveTimeline() .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION)).getInstants() @@ -188,6 +202,7 @@ public class HoodieMergeOnReadTable extends config.shouldAssumeDatePartitioning())) .map((Function>) partitionPath -> commits.stream().map(commit -> { HoodieInstant instant = commitsAndCompactions.get(commit); + HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); HoodieRollbackStat hoodieRollbackStats = null; // Need to put the path filter here since Filter is not serializable // PathFilter to get all parquet files and log files that need to be deleted @@ -203,14 +218,43 @@ public class HoodieMergeOnReadTable extends return false; }; + final Map filesToDeletedStatus = new HashMap<>(); + switch (instant.getAction()) { case HoodieTimeline.COMMIT_ACTION: + try { + // Rollback of a commit should delete the newly created parquet files along with any log + // files created with this as baseCommit. This is required to support multi-rollbacks in a MOR table. + super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); + hoodieRollbackStats = HoodieRollbackStat.newBuilder() + .withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus).build(); + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to rollback for commit " + commit, io); + } case HoodieTimeline.COMPACTION_ACTION: try { - Map results = super - .deleteCleanedFiles(partitionPath, Collections.singletonList(commit)); - hoodieRollbackStats = HoodieRollbackStat.newBuilder() - .withPartitionPath(partitionPath).withDeletedFileResults(results).build(); + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = !activeTimeline.getDeltaCommitTimeline() + .filterCompletedInstants().findInstantsAfter(commit, 1).empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created parquet files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + super.deleteCleanedFiles(filesToDeletedStatus, commits, partitionPath); + hoodieRollbackStats = HoodieRollbackStat.newBuilder() + .withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus).build(); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter); + hoodieRollbackStats = HoodieRollbackStat.newBuilder() + .withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus).build(); + } break; } catch (IOException io) { throw new UncheckedIOException("Failed to rollback for commit " + commit, io); @@ -246,7 +290,6 @@ public class HoodieMergeOnReadTable extends .get(), HoodieCommitMetadata.class); // read commit file and (either append delete blocks or delete file) - final Map filesToDeletedStatus = new HashMap<>(); Map filesToNumBlocksRollback = new HashMap<>(); // In case all data was inserts and the commit failed, delete the file belonging to that commit @@ -457,7 +500,7 @@ public class HoodieMergeOnReadTable extends .filter(wStat -> { // Filter out stats without prevCommit since they are all inserts return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT && wStat.getPrevCommit() != null - && !deletedFiles.contains(wStat.getFileId()); + && !deletedFiles.contains(wStat.getFileId()); }).forEach(wStat -> { HoodieLogFormat.Writer writer = null; String baseCommitTime = wStat.getPrevCommit(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 93227c49c..a6fdfedb4 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -33,6 +33,7 @@ import com.uber.hoodie.common.minicluster.HdfsTestService; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRollingStat; @@ -564,6 +565,178 @@ public class TestMergeOnReadTable { }).findAny().isPresent()); } + @Test + public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { + + HoodieWriteConfig cfg = getConfig(false); + final HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + List allCommits = new ArrayList<>(); + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + allCommits.add(newCommitTime); + client.startCommitWithTime(newCommitTime); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + JavaRDD writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + List statuses = writeStatusJavaRDD.collect(); + assertNoWriteErrors(statuses); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); + + Optional deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp()); + + Optional commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = roView.getLatestDataFiles(); + assertTrue(!dataFilesToRead.findAny().isPresent()); + + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); + dataFilesToRead = roView.getLatestDataFiles(); + assertTrue("ReadOptimizedTableView should list the parquet files we wrote in the delta commit", + dataFilesToRead.findAny().isPresent()); + + /** + * Write 2 (inserts + updates) + */ + newCommitTime = "002"; + allCommits.add(newCommitTime); + // WriteClient with custom config (disable small file handling) + HoodieWriteClient nClient = new HoodieWriteClient(jsc, HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withAutoCommit(false).withAssumeDatePartitioning(true).withCompactionConfig(HoodieCompactionConfig.newBuilder() + .compactionSmallFileSize(1 * 1024).withInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1 * 1024).build()) + .forTable("test-trip-table").build()); + nClient.startCommitWithTime(newCommitTime); + + List copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); + copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); + + List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); + assertEquals(recordsRead.size(), 200); + + statuses = nClient.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + nClient.commit(newCommitTime, writeStatusJavaRDD); + copyOfRecords.clear(); + + + // Schedule a compaction + /** + * Write 3 (inserts + updates) + */ + newCommitTime = "003"; + allCommits.add(newCommitTime); + client.startCommitWithTime(newCommitTime); + + List newInserts = dataGen.generateInserts(newCommitTime, 100); + records = dataGen.generateUpdates(newCommitTime, records); + records.addAll(newInserts); + writeRecords = jsc.parallelize(records, 1); + + writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + statuses = writeStatusJavaRDD.collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + + String compactionInstantTime = "004"; + allCommits.add(compactionInstantTime); + client.scheduleCompactionAtInstant(compactionInstantTime, Optional.empty()); + + // Compaction commit + /** + * Write 4 (updates) + */ + newCommitTime = "005"; + allCommits.add(newCommitTime); + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + writeRecords = jsc.parallelize(records, 1); + + writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); + client.commit(newCommitTime, writeStatusJavaRDD); + statuses = writeStatusJavaRDD.collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + + compactionInstantTime = "006"; + allCommits.add(compactionInstantTime); + client.scheduleCompactionAtInstant(compactionInstantTime, Optional.empty()); + JavaRDD ws = client.compact(compactionInstantTime); + client.commitCompaction(compactionInstantTime, ws, Optional.empty()); + + allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); + + final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get() + .getTimestamp(); + + assertTrue(roView.getLatestDataFiles().filter(file -> { + if (compactedCommitTime.equals(file.getCommitTime())) { + return true; + } else { + return false; + } + }).findAny().isPresent()); + + /** + * Write 5 (updates) + */ + newCommitTime = "007"; + allCommits.add(newCommitTime); + client.startCommitWithTime(newCommitTime); + copyOfRecords = new ArrayList<>(records); + copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); + copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); + + statuses = client.upsert(jsc.parallelize(copyOfRecords, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + client.commit(newCommitTime, writeStatusJavaRDD); + copyOfRecords.clear(); + + // Rollback latest commit first + client.restoreToCommit("000"); + + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + roView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + dataFilesToRead = roView.getLatestDataFiles(); + assertTrue(!dataFilesToRead.findAny().isPresent()); + HoodieTableFileSystemView.RealtimeView rtView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitTimeline().filterCompletedInstants(), allFiles); + List fileGroups = ((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors + .toList()); + assertTrue(fileGroups.isEmpty()); + } + + @Test public void testUpsertPartitioner() throws Exception { HoodieWriteConfig cfg = getConfig(true); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index 4cd8ba709..98dea4479 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -98,7 +98,7 @@ public interface HoodieTimeline extends Serializable { HoodieTimeline filterCompletedAndCompactionInstants(); /** - * Filter this timeline to just include inflight and requested compaction instants + * Filter this timeline to just include requested and inflight compaction instants * @return */ HoodieTimeline filterPendingCompactionTimeline(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 3640ad636..2b86448ce 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -262,6 +262,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { Preconditions.checkArgument(inflightInstant.isInflight()); HoodieInstant requestedInstant = new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp()); + // Pass empty data since it is read from the corresponding .aux/.compaction instant file transitionState(inflightInstant, requestedInstant, Optional.empty()); return requestedInstant; } @@ -310,7 +311,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName()); try { - // open a new file and write the commit metadata in + // Re-create the .inflight file by opening a new file and write the commit metadata in Path inflightCommitFile = new Path(metaClient.getMetaPath(), fromInstant.getFileName()); createFileInMetaPath(fromInstant.getFileName(), data); boolean success = metaClient.getFs().rename(inflightCommitFile, commitFilePath);