1
0

HUDI-175 - add an option to manually override the DeltaStreamer checkpoint (#798)

- Add cli option to allow override the checkpoint using `--checkpoint` 
- Persist overridden checkpoint into commit metadata
This commit is contained in:
eisig
2019-07-30 01:40:02 +08:00
committed by vinoth chandar
parent 9265c7cc36
commit e0648de2ef
2 changed files with 18 additions and 1 deletions

View File

@@ -80,6 +80,7 @@ public class DeltaSync implements Serializable {
protected static volatile Logger log = LogManager.getLogger(DeltaSync.class);
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
/**
* Delta Sync Config
@@ -245,7 +246,9 @@ public class DeltaSync implements Serializable {
if (lastCommit.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class);
if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
if (cfg.checkpoint != null && !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
resumeCheckpointStr = Optional.of(cfg.checkpoint);
} else if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
resumeCheckpointStr = Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else {
throw new HoodieDeltaStreamerException(
@@ -257,6 +260,10 @@ public class DeltaSync implements Serializable {
HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
cfg.storageType, cfg.targetTableName, "archived");
}
if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
resumeCheckpointStr = Optional.of(cfg.checkpoint);
}
log.info("Checkpoint to resume from : " + resumeCheckpointStr);
final Optional<JavaRDD<GenericRecord>> avroRDDOptional;
@@ -348,6 +355,9 @@ public class DeltaSync implements Serializable {
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
if (cfg.checkpoint != null) {
checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
}
if (hasErrors) {
log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="

View File

@@ -254,6 +254,13 @@ public class HoodieDeltaStreamer implements Serializable {
+ "This flag disables it ")
public Boolean forceDisableCompaction = false;
/**
* Resume Delta Streamer from this checkpoint.
*/
@Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
public String checkpoint = null;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;