diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java index 00d270b6a..ce11d895a 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java @@ -80,6 +80,7 @@ public class DeltaSync implements Serializable { protected static volatile Logger log = LogManager.getLogger(DeltaSync.class); public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key"; /** * Delta Sync Config @@ -245,7 +246,9 @@ public class DeltaSync implements Serializable { if (lastCommit.isPresent()) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); - if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { + if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { + resumeCheckpointStr = Optional.of(cfg.checkpoint); + } else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) { resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); } else { throw new HoodieDeltaStreamerException( @@ -257,6 +260,10 @@ public class DeltaSync implements Serializable { HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, cfg.storageType, cfg.targetTableName, "archived"); } + + if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) { + resumeCheckpointStr = Optional.of(cfg.checkpoint); + } log.info("Checkpoint to resume from : " + resumeCheckpointStr); final Optional> avroRDDOptional; @@ -348,6 +355,9 @@ public class DeltaSync implements Serializable { if (!hasErrors || cfg.commitOnErrors) { HashMap checkpointCommitMetadata = new HashMap<>(); checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); + if (cfg.checkpoint != null) { + checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint); + } if (hasErrors) { log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 1d0aca35f..c49f21758 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -254,6 +254,13 @@ public class HoodieDeltaStreamer implements Serializable { + "This flag disables it ") public Boolean forceDisableCompaction = false; + /** + * Resume Delta Streamer from this checkpoint. + */ + @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.") + public String checkpoint = null; + + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false;