diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index a147df751..326bdaca0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -135,7 +135,7 @@ public class HoodieWriteClient extends AbstractHo } public static SparkConf registerClasses(SparkConf conf) { - conf.registerKryoClasses(new Class[] {HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); + conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); return conf; } @@ -276,7 +276,7 @@ public class HoodieWriteClient extends AbstractHo * @param records HoodieRecords to insert * @param commitTime Commit Time handle * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted - * into hoodie. + * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsert(JavaRDD> records, final String commitTime, @@ -309,7 +309,7 @@ public class HoodieWriteClient extends AbstractHo * @param preppedRecords HoodieRecords to insert * @param commitTime Commit Time handle * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted - * into hoodie. + * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime, @@ -953,16 +953,26 @@ public class HoodieWriteClient extends AbstractHo * Provides a new commit time for a write operation (insert/update) */ public String startCommit() { - String commitTime = HoodieActiveTimeline.createNewCommitTime(); - startCommitWithTime(commitTime); - return commitTime; - } - - public void startCommitWithTime(String instantTime) { + // NOTE : Need to ensure that rollback is done before a new commit is started if (rollbackInFlight) { // Only rollback inflight commit/delta-commits. Do not touch compaction commits rollbackInflightCommits(); } + String commitTime = HoodieActiveTimeline.createNewCommitTime(); + startCommit(commitTime); + return commitTime; + } + + public void startCommitWithTime(String instantTime) { + // NOTE : Need to ensure that rollback is done before a new commit is started + if (rollbackInFlight) { + // Only rollback inflight commit/delta-commits. Do not touch compaction commits + rollbackInflightCommits(); + } + startCommit(instantTime); + } + + private void startCommit(String instantTime) { logger.info("Generate a new instant time " + instantTime); HoodieTableMetaClient metaClient = createMetaClient(true); // if there are pending compactions, their instantTime must not be greater than that of this instant time @@ -978,7 +988,6 @@ public class HoodieWriteClient extends AbstractHo activeTimeline.createInflight(new HoodieInstant(true, commitActionType, instantTime)); } - /** * Schedules a new compaction instant */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 3e1f5d6e6..eb82c6b2a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Stream; import org.apache.hadoop.fs.FSDataInputStream; @@ -66,12 +67,21 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; + private static AtomicReference lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); /** * Returns next commit time in the {@link #COMMIT_FORMATTER} format. + * Ensures each commit time is atleast 1 second apart since we create COMMIT times at second granularity */ public static String createNewCommitTime() { - return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + lastInstantTime.updateAndGet((oldVal) -> { + String newCommitTime = null; + do { + newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + } while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL)); + return newCommitTime; + }); + return lastInstantTime.get(); } protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set includedExtensions) { @@ -99,7 +109,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * * @deprecated */ - public HoodieActiveTimeline() {} + public HoodieActiveTimeline() { + } /** * This method is only used when this object is deserialized in a spark executor. @@ -112,7 +123,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { /** * Get all instants (commits, delta commits) that produce new data, in the active timeline * - * */ public HoodieTimeline getCommitsTimeline() { return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); @@ -139,7 +149,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { /** * Get only pure commits (inflight and completed) in the active timeline */ - public HoodieTimeline getCommitTimeline() { + public HoodieTimeline getCommitTimeline() { return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION)); } @@ -258,7 +268,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return readDataFromPath(detailPath); } - /** BEGIN - COMPACTION RELATED META-DATA MANAGEMENT **/ + /** + * BEGIN - COMPACTION RELATED META-DATA MANAGEMENT + **/ public Option getInstantAuxiliaryDetails(HoodieInstant instant) { Path detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());