From f82e58994e046d92916d45f7ec921e2fb6ba26ef Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Tue, 5 Nov 2019 11:53:19 -0800 Subject: [PATCH] - Ensure that rollback instant is always created before the next commit instant. This especially affects IncrementalPull for MOR tables since we can end up pulling in log blocks for uncommitted data - Ensure that generated commit instants are 1 second apart --- .../org/apache/hudi/HoodieWriteClient.java | 29 ++++++++++++------- .../table/timeline/HoodieActiveTimeline.java | 22 ++++++++++---- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index a147df751..326bdaca0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -135,7 +135,7 @@ public class HoodieWriteClient extends AbstractHo } public static SparkConf registerClasses(SparkConf conf) { - conf.registerKryoClasses(new Class[] {HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); + conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); return conf; } @@ -276,7 +276,7 @@ public class HoodieWriteClient extends AbstractHo * @param records HoodieRecords to insert * @param commitTime Commit Time handle * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted - * into hoodie. + * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsert(JavaRDD> records, final String commitTime, @@ -309,7 +309,7 @@ public class HoodieWriteClient extends AbstractHo * @param preppedRecords HoodieRecords to insert * @param commitTime Commit Time handle * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted - * into hoodie. + * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime, @@ -953,16 +953,26 @@ public class HoodieWriteClient extends AbstractHo * Provides a new commit time for a write operation (insert/update) */ public String startCommit() { - String commitTime = HoodieActiveTimeline.createNewCommitTime(); - startCommitWithTime(commitTime); - return commitTime; - } - - public void startCommitWithTime(String instantTime) { + // NOTE : Need to ensure that rollback is done before a new commit is started if (rollbackInFlight) { // Only rollback inflight commit/delta-commits. Do not touch compaction commits rollbackInflightCommits(); } + String commitTime = HoodieActiveTimeline.createNewCommitTime(); + startCommit(commitTime); + return commitTime; + } + + public void startCommitWithTime(String instantTime) { + // NOTE : Need to ensure that rollback is done before a new commit is started + if (rollbackInFlight) { + // Only rollback inflight commit/delta-commits. Do not touch compaction commits + rollbackInflightCommits(); + } + startCommit(instantTime); + } + + private void startCommit(String instantTime) { logger.info("Generate a new instant time " + instantTime); HoodieTableMetaClient metaClient = createMetaClient(true); // if there are pending compactions, their instantTime must not be greater than that of this instant time @@ -978,7 +988,6 @@ public class HoodieWriteClient extends AbstractHo activeTimeline.createInflight(new HoodieInstant(true, commitActionType, instantTime)); } - /** * Schedules a new compaction instant */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 3e1f5d6e6..eb82c6b2a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Stream; import org.apache.hadoop.fs.FSDataInputStream; @@ -66,12 +67,21 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; + private static AtomicReference lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); /** * Returns next commit time in the {@link #COMMIT_FORMATTER} format. + * Ensures each commit time is atleast 1 second apart since we create COMMIT times at second granularity */ public static String createNewCommitTime() { - return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + lastInstantTime.updateAndGet((oldVal) -> { + String newCommitTime = null; + do { + newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + } while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL)); + return newCommitTime; + }); + return lastInstantTime.get(); } protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set includedExtensions) { @@ -99,7 +109,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * * @deprecated */ - public HoodieActiveTimeline() {} + public HoodieActiveTimeline() { + } /** * This method is only used when this object is deserialized in a spark executor. @@ -112,7 +123,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { /** * Get all instants (commits, delta commits) that produce new data, in the active timeline * - * */ public HoodieTimeline getCommitsTimeline() { return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); @@ -139,7 +149,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { /** * Get only pure commits (inflight and completed) in the active timeline */ - public HoodieTimeline getCommitTimeline() { + public HoodieTimeline getCommitTimeline() { return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION)); } @@ -258,7 +268,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return readDataFromPath(detailPath); } - /** BEGIN - COMPACTION RELATED META-DATA MANAGEMENT **/ + /** + * BEGIN - COMPACTION RELATED META-DATA MANAGEMENT + **/ public Option getInstantAuxiliaryDetails(HoodieInstant instant) { Path detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());