1
0

- Ensure that rollback instant is always created before the next commit instant.

This especially affects IncrementalPull for MOR tables since we can end up pulling in
  log blocks for uncommitted data
- Ensure that generated commit instants are 1 second apart
This commit is contained in:
Nishith Agarwal
2019-11-05 11:53:19 -08:00
committed by Balaji Varadarajan
parent 3a05edab01
commit f82e58994e
2 changed files with 36 additions and 15 deletions

View File

@@ -135,7 +135,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
} }
public static SparkConf registerClasses(SparkConf conf) { public static SparkConf registerClasses(SparkConf conf) {
conf.registerKryoClasses(new Class[] {HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
return conf; return conf;
} }
@@ -953,16 +953,26 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* Provides a new commit time for a write operation (insert/update) * Provides a new commit time for a write operation (insert/update)
*/ */
public String startCommit() { public String startCommit() {
String commitTime = HoodieActiveTimeline.createNewCommitTime(); // NOTE : Need to ensure that rollback is done before a new commit is started
startCommitWithTime(commitTime);
return commitTime;
}
public void startCommitWithTime(String instantTime) {
if (rollbackInFlight) { if (rollbackInFlight) {
// Only rollback inflight commit/delta-commits. Do not touch compaction commits // Only rollback inflight commit/delta-commits. Do not touch compaction commits
rollbackInflightCommits(); rollbackInflightCommits();
} }
String commitTime = HoodieActiveTimeline.createNewCommitTime();
startCommit(commitTime);
return commitTime;
}
public void startCommitWithTime(String instantTime) {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackInFlight) {
// Only rollback inflight commit/delta-commits. Do not touch compaction commits
rollbackInflightCommits();
}
startCommit(instantTime);
}
private void startCommit(String instantTime) {
logger.info("Generate a new instant time " + instantTime); logger.info("Generate a new instant time " + instantTime);
HoodieTableMetaClient metaClient = createMetaClient(true); HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are pending compactions, their instantTime must not be greater than that of this instant time // if there are pending compactions, their instantTime must not be greater than that of this instant time
@@ -978,7 +988,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
activeTimeline.createInflight(new HoodieInstant(true, commitActionType, instantTime)); activeTimeline.createInflight(new HoodieInstant(true, commitActionType, instantTime));
} }
/** /**
* Schedules a new compaction instant * Schedules a new compaction instant
*/ */

View File

@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@@ -66,12 +67,21 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient; protected HoodieTableMetaClient metaClient;
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
/** /**
* Returns next commit time in the {@link #COMMIT_FORMATTER} format. * Returns next commit time in the {@link #COMMIT_FORMATTER} format.
* Ensures each commit time is atleast 1 second apart since we create COMMIT times at second granularity
*/ */
public static String createNewCommitTime() { public static String createNewCommitTime() {
return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime = null;
do {
newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
} while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL));
return newCommitTime;
});
return lastInstantTime.get();
} }
protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set<String> includedExtensions) { protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set<String> includedExtensions) {
@@ -99,7 +109,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
* *
* @deprecated * @deprecated
*/ */
public HoodieActiveTimeline() {} public HoodieActiveTimeline() {
}
/** /**
* This method is only used when this object is deserialized in a spark executor. * This method is only used when this object is deserialized in a spark executor.
@@ -112,7 +123,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
/** /**
* Get all instants (commits, delta commits) that produce new data, in the active timeline * * Get all instants (commits, delta commits) that produce new data, in the active timeline *
*
*/ */
public HoodieTimeline getCommitsTimeline() { public HoodieTimeline getCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION));
@@ -258,7 +268,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return readDataFromPath(detailPath); return readDataFromPath(detailPath);
} }
/** BEGIN - COMPACTION RELATED META-DATA MANAGEMENT **/ /**
* BEGIN - COMPACTION RELATED META-DATA MANAGEMENT
**/
public Option<byte[]> getInstantAuxiliaryDetails(HoodieInstant instant) { public Option<byte[]> getInstantAuxiliaryDetails(HoodieInstant instant) {
Path detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); Path detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());