diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 96a7c25de..25e14bdb0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -328,35 +328,7 @@ public class DeltaSync implements Serializable { // Retrieve the previous round checkpoints, if any Option resumeCheckpointStr = Option.empty(); if (commitTimelineOpt.isPresent()) { - Option lastCommit = commitTimelineOpt.get().lastInstant(); - if (lastCommit.isPresent()) { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); - if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) - || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) { - resumeCheckpointStr = Option.of(cfg.checkpoint); - } else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) { - //if previous checkpoint is an empty string, skip resume use Option.empty() - resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); - } else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, - HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) { - // if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one. - Option prevCheckpoint = getPreviousCheckpoint(commitTimelineOpt.get()); - if (prevCheckpoint.isPresent()) { - resumeCheckpointStr = prevCheckpoint; - } else { - throw new HoodieDeltaStreamerException( - "Unable to find previous checkpoint. Please double check if this table " - + "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" - + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" - + commitMetadata.toJsonString()); - } - } - // KAFKA_CHECKPOINT_TYPE will be honored only for first batch. - if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { - props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key()); - } - } + resumeCheckpointStr = getCheckpointToResume(commitTimelineOpt); } else { // initialize the table for the first time. String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props); @@ -459,12 +431,55 @@ public class DeltaSync implements Serializable { return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); } - protected Option getPreviousCheckpoint(HoodieTimeline timeline) throws IOException { - return timeline.getReverseOrderedInstants().map(instant -> { + /** + * Process previous commit metadata and checkpoint configs set by user to determine the checkpoint to resume from. + * @param commitTimelineOpt commit timeline of interest. + * @return the checkpoint to resume from if applicable. + * @throws IOException + */ + private Option getCheckpointToResume(Option commitTimelineOpt) throws IOException { + Option resumeCheckpointStr = Option.empty(); + Option lastCommit = commitTimelineOpt.get().lastInstant(); + if (lastCommit.isPresent()) { + // if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one. + Option commitMetadataOption = getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()); + if (commitMetadataOption.isPresent()) { + HoodieCommitMetadata commitMetadata = commitMetadataOption.get(); + if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) + || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) { + resumeCheckpointStr = Option.of(cfg.checkpoint); + } else if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) { + //if previous checkpoint is an empty string, skip resume use Option.empty() + resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); + } else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, + HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) { + throw new HoodieDeltaStreamerException( + "Unable to find previous checkpoint. Please double check if this table " + + "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" + + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" + + commitMetadata.toJsonString()); + } + // KAFKA_CHECKPOINT_TYPE will be honored only for first batch. + if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { + props.remove(KafkaOffsetGen.Config.KAFKA_CHECKPOINT_TYPE.key()); + } + } else if (cfg.checkpoint != null) { // getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set. + resumeCheckpointStr = Option.of(cfg.checkpoint); + } + } + return resumeCheckpointStr; + } + + protected Option getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException { + return (Option) timeline.getReverseOrderedInstants().map(instant -> { try { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - return Option.ofNullable(commitMetadata.getMetadata(CHECKPOINT_KEY)); + if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY)) || !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { + return Option.of(commitMetadata); + } else { + return Option.empty(); + } } catch (IOException e) { throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java index 06898db92..b7e6f1870 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java @@ -292,14 +292,23 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { } static void addCommitToTimeline(HoodieTableMetaClient metaCient, Map extraMetadata) throws IOException { + addCommitToTimeline(metaCient, WriteOperationType.UPSERT, HoodieTimeline.COMMIT_ACTION, extraMetadata); + } + + static void addReplaceCommitToTimeline(HoodieTableMetaClient metaCient, Map extraMetadata) throws IOException { + addCommitToTimeline(metaCient, WriteOperationType.CLUSTER, HoodieTimeline.REPLACE_COMMIT_ACTION, extraMetadata); + } + + static void addCommitToTimeline(HoodieTableMetaClient metaCient, WriteOperationType writeOperationType, String commitActiontype, + Map extraMetadata) throws IOException { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - commitMetadata.setOperationType(WriteOperationType.UPSERT); + commitMetadata.setOperationType(writeOperationType); extraMetadata.forEach((k,v) -> commitMetadata.getExtraMetadata().put(k, v)); String commitTime = HoodieActiveTimeline.createNewInstantTime(); - metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, commitTime)); - metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime)); + metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, commitActiontype, commitTime)); + metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime)); metaCient.getActiveTimeline().saveAsComplete( - new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime), + new HoodieInstant(HoodieInstant.State.INFLIGHT, commitActiontype, commitTime), Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index be9655124..187499188 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1758,14 +1758,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "abc"); addCommitToTimeline(metaClient, extraMetadata); metaClient.reloadActiveTimeline(); - assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "abc"); + assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline() + .getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "abc"); - addCommitToTimeline(metaClient, Collections.emptyMap()); - metaClient.reloadActiveTimeline(); extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "def"); addCommitToTimeline(metaClient, extraMetadata); metaClient.reloadActiveTimeline(); - assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "def"); + assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline() + .getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "def"); + + // add a replace commit which does not have CEHCKPOINT_KEY. Deltastreamer should be able to go back and pick the right checkpoint. + addReplaceCommitToTimeline(metaClient, Collections.emptyMap()); + metaClient.reloadActiveTimeline(); + assertEquals(testDeltaSync.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline() + .getCommitsTimeline()).get().getMetadata(CHECKPOINT_KEY), "def"); } class TestDeltaSync extends DeltaSync { @@ -1776,8 +1782,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient); } - protected Option getPreviousCheckpoint(HoodieTimeline timeline) throws IOException { - return super.getPreviousCheckpoint(timeline); + protected Option getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws IOException { + return super.getLatestCommitMetadataWithValidCheckpointInfo(timeline); } }