1
0

[HUDI-2947] Fixing checkpoint fetch in detlastreamer (#4485)

* Fixing checkpoint fetch in detlastreamer

* Addressing comments
This commit is contained in:
Sivabalan Narayanan
2022-01-07 11:38:58 -05:00
committed by GitHub
parent b1df60672b
commit 2e561defe9
3 changed files with 72 additions and 42 deletions

View File

@@ -328,35 +328,7 @@ public class DeltaSync implements Serializable {
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
if (commitTimelineOpt.isPresent()) {
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
|| !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
// if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one.
Option<String> prevCheckpoint = getPreviousCheckpoint(commitTimelineOpt.get());
if (prevCheckpoint.isPresent()) {
resumeCheckpointStr = prevCheckpoint;
} else {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
}
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
}
}
resumeCheckpointStr = getCheckpointToResume(commitTimelineOpt);
} else {
// initialize the table for the first time.
String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
@@ -459,12 +431,55 @@ public class DeltaSync implements Serializable {
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}
protected Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws IOException {
return timeline.getReverseOrderedInstants().map(instant -> {
/**
* Process previous commit metadata and checkpoint configs set by user to determine the checkpoint to resume from.
* @param commitTimelineOpt commit timeline of interest.
* @return the checkpoint to resume from if applicable.
* @throws IOException
*/
private Option<String> getCheckpointToResume(Option<HoodieTimeline> commitTimelineOpt) throws IOException {
Option<String> resumeCheckpointStr = Option.empty();
Option<HoodieInstant> lastCommit = commitTimelineOpt.get().lastInstant();
if (lastCommit.isPresent()) {
// if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one.
Option<HoodieCommitMetadata> commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get());
if (commitMetadataOption.isPresent()) {
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
|| !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key());
}
} else if (cfg.checkpoint != null) { // getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set.
resumeCheckpointStr = Option.of(cfg.checkpoint);
}
}
return resumeCheckpointStr;
}
protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
return (Option<HoodieCommitMetadata>) timeline.getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
return Option.ofNullable(commitMetadata.getMetadata(CHECKPOINT_KEY));
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY)) || !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
return Option.of(commitMetadata);
} else {
return Option.empty();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
}

View File

@@ -292,14 +292,23 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
}
static void addCommitToTimeline(HoodieTableMetaClient metaCient, Map<String, String> extraMetadata) throws IOException {
addCommitToTimeline(metaCient, WriteOperationType.UPSERT, HoodieTimeline.COMMIT_ACTION, extraMetadata);
}
static void addReplaceCommitToTimeline(HoodieTableMetaClient metaCient, Map<String, String> extraMetadata) throws IOException {
addCommitToTimeline(metaCient, WriteOperationType.CLUSTER, HoodieTimeline.REPLACE_COMMIT_ACTION, extraMetadata);
}
static void addCommitToTimeline(HoodieTableMetaClient metaCient, WriteOperationType writeOperationType, String commitActiontype,
Map<String, String> extraMetadata) throws IOException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
commitMetadata.setOperationType(WriteOperationType.UPSERT);
commitMetadata.setOperationType(writeOperationType);
extraMetadata.forEach((k,v) -> commitMetadata.getExtraMetadata().put(k, v));
String commitTime = HoodieActiveTimeline.createNewInstantTime();
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, commitTime));
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime));
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, commitActiontype, commitTime));
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime));
metaCient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime),
new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}

View File

@@ -1758,14 +1758,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "abc");
addCommitToTimeline(metaClient, extraMetadata);
metaClient.reloadActiveTimeline();
assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "abc");
assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline()
.getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "abc");
addCommitToTimeline(metaClient, Collections.emptyMap());
metaClient.reloadActiveTimeline();
extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "def");
addCommitToTimeline(metaClient, extraMetadata);
metaClient.reloadActiveTimeline();
assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "def");
assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline()
.getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "def");
// add a replace commit which does not have CEHCKPOINT_KEY. Deltastreamer should be able to go back and pick the right checkpoint.
addReplaceCommitToTimeline(metaClient, Collections.emptyMap());
metaClient.reloadActiveTimeline();
assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline()
.getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "def");
}
class TestDeltaSync extends DeltaSync {
@@ -1776,8 +1782,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient);
}
protected Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws IOException {
return super.getPreviousCheckpoint(timeline);
protected Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException {
return super.getLatestCommitMetadataWithValidCheckpointInfo(timeline);
}
}